From 87686e5ff0164d158a02fab50c81af61baa8cbc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido?= Date: Thu, 27 Dec 2018 18:46:55 -0200 Subject: [PATCH] Bugfixes in pwfts, distributed and composite --- pyFTS/common/FuzzySet.py | 21 ++++++++-------- pyFTS/common/fts.py | 9 ++++--- pyFTS/distributed/dispy.py | 3 +-- pyFTS/distributed/spark.py | 45 ++++++++++++++++----------------- pyFTS/models/pwfts.py | 19 +++++++------- pyFTS/tests/general.py | 18 +++++++------ pyFTS/tests/multivariate.py | 50 ++++++++++++++++++++++++++++++++++++- pyFTS/tests/spark.py | 15 +++++++---- 8 files changed, 119 insertions(+), 61 deletions(-) diff --git a/pyFTS/common/FuzzySet.py b/pyFTS/common/FuzzySet.py index 92df9de..932fc72 100644 --- a/pyFTS/common/FuzzySet.py +++ b/pyFTS/common/FuzzySet.py @@ -28,16 +28,17 @@ class FuzzySet(object): self.Z = None """Partition function in respect to the membership function""" - if self.mf == Membership.gaussmf: - self.lower = parameters[0] - parameters[1]*3 - self.upper = parameters[0] + parameters[1]*3 - elif self.mf == Membership.sigmf: - k = (parameters[1] / (2 * parameters[0])) - self.lower = parameters[1] - k - self.upper = parameters[1] + k - else: - self.lower = min(parameters) - self.upper = max(parameters) + if parameters is not None: + if self.mf == Membership.gaussmf: + self.lower = parameters[0] - parameters[1]*3 + self.upper = parameters[0] + parameters[1]*3 + elif self.mf == Membership.sigmf: + k = (parameters[1] / (2 * parameters[0])) + self.lower = parameters[1] - k + self.upper = parameters[1] + k + else: + self.lower = min(parameters) + self.upper = max(parameters) self.metadata = {} diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index d527a4b..ef0ceb3 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -322,7 +322,7 @@ class FTS(object): dump = kwargs.get('dump', None) - num_batches = kwargs.get('num_batches', None) + num_batches = kwargs.get('num_batches', 1) save = kwargs.get('save_model', False) # save model on disk @@ -334,7 +334,7 @@ class FTS(object): batch_save_interval = kwargs.get('batch_save_interval', 10) - if distributed is not None: + if distributed is not None and distributed: if distributed == 'dispy': from pyFTS.distributed import dispy @@ -345,9 +345,10 @@ class FTS(object): batch_save_interval=batch_save_interval) elif distributed == 'spark': from pyFTS.distributed import spark + url = kwargs.get('url', 'spark://192.168.0.110:7077') + app = kwargs.get('app', 'pyFTS') - spark.distributed_train(self, data, self.partitioner, - url='spark://192.168.0.110:7077', app='pyFTS') + spark.distributed_train(self, data, url=url, app=app) else: if dump == 'time': diff --git a/pyFTS/distributed/dispy.py b/pyFTS/distributed/dispy.py index 91366b2..24dbc26 100644 --- a/pyFTS/distributed/dispy.py +++ b/pyFTS/distributed/dispy.py @@ -20,7 +20,6 @@ def stop_dispy_cluster(cluster, http_server): cluster.close() - def simple_model_train(model, data, parameters): model.train(data, **parameters) return model @@ -50,7 +49,7 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches= else: ndata = data[ct - model.order: ct + batch_size] - tmp_model = fts_method(str(bcount)) + tmp_model = fts_method() tmp_model.clone_parameters(model) diff --git a/pyFTS/distributed/spark.py b/pyFTS/distributed/spark.py index b7c0865..1e60a64 100644 --- a/pyFTS/distributed/spark.py +++ b/pyFTS/distributed/spark.py @@ -13,23 +13,6 @@ import os os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3' -conf = None - - -def get_conf(url, app): - """ - - - :param url: - :param app: - :return: - """ - if conf is None: - conf = SparkConf() - conf.setMaster(url) - conf.setAppName(app) - - return conf def get_partitioner(shared_partitioner): @@ -47,7 +30,7 @@ def get_partitioner(shared_partitioner): return fs_tmp -def slave_train(data): +def slave_train(data, shared_method, shared_partitioner, shared_order): """ :param data: @@ -64,13 +47,29 @@ def slave_train(data): return [(k, model.flrgs[k]) for k in model.flrgs] -def distributed_train(model, data, partitioner, url='spark://192.168.0.110:7077', app='pyFTS'): - with SparkContext(conf=get_conf(url=url, app=app)) as context: - shared_partitioner = context.broadcast(partitioner.sets) +def distributed_train(model, data, url='spark://192.168.0.110:7077', app='pyFTS'): + """ - flrgs = context.parallelize(data).mapPartitions(slave_train) - model = hofts.WeightedHighOrderFTS(partitioner=partitioner, order=shared_order.value) + :param model: + :param data: + :param url: + :param app: + :return: + """ + + conf = SparkConf() + conf.setMaster(url) + conf.setAppName(app) + + with SparkContext(conf=conf) as context: + shared_partitioner = context.broadcast(model.partitioner.sets) + shared_order = context.broadcast(model.order) + shared_method = context.broadcast(type(model)) + + func = lambda x: slave_train(x, shared_method, shared_partitioner, shared_order) + + flrgs = context.parallelize(data).mapPartitions(func) for k in flrgs.collect(): model.append_rule(k[1]) diff --git a/pyFTS/models/pwfts.py b/pyFTS/models/pwfts.py index 105dda3..147ca85 100644 --- a/pyFTS/models/pwfts.py +++ b/pyFTS/models/pwfts.py @@ -16,7 +16,6 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG): def __init__(self, order): super(ProbabilisticWeightedFLRG, self).__init__(order) self.RHS = {} - self.rhs_count = {} self.frequency_count = 0.0 self.Z = None @@ -43,11 +42,11 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG): return tmp def rhs_unconditional_probability(self, c): - return self.rhs_count[c] / self.frequency_count + return self.RHS[c] / self.frequency_count def rhs_conditional_probability(self, x, sets, uod, nbins): total = 0.0 - for rhs in self.RHS: + for rhs in self.RHS.keys(): set = sets[rhs] wi = self.rhs_unconditional_probability(rhs) mv = set.membership(x) / set.partition_function(uod, nbins=nbins) @@ -68,28 +67,30 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG): '''Return the expectation of the PWFLRG, the weighted sum''' if self.midpoint is None: self.midpoint = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].centroid - for s in self.RHS])) + for s in self.RHS.keys()])) return self.midpoint def get_upper(self, sets): if self.upper is None: - self.upper = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].upper for s in self.RHS])) + self.upper = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].upper + for s in self.RHS.keys()])) return self.upper def get_lower(self, sets): if self.lower is None: - self.lower = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].lower for s in self.RHS])) + self.lower = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].lower + for s in self.RHS.keys()])) return self.lower def __str__(self): tmp2 = "" - for c in sorted(self.RHS): + for c in sorted(self.RHS.keys()): if len(tmp2) > 0: tmp2 = tmp2 + ", " - tmp2 = tmp2 + "(" + str(round(self.rhs_count[c] / self.frequency_count, 3)) + ")" + c + tmp2 = tmp2 + "(" + str(round(self.RHS[c] / self.frequency_count, 3)) + ")" + c return self.get_key() + " -> " + tmp2 @@ -530,7 +531,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): def __str__(self): tmp = self.name + ":\n" - for r in sorted(self.flrgs): + for r in sorted(self.flrgs.keys()): p = round(self.flrgs[r].frequency_count / self.global_frequency_count, 3) tmp = tmp + "(" + str(p) + ") " + str(self.flrgs[r]) + "\n" return tmp diff --git a/pyFTS/tests/general.py b/pyFTS/tests/general.py index 9f28d5f..868a6cd 100644 --- a/pyFTS/tests/general.py +++ b/pyFTS/tests/general.py @@ -25,20 +25,24 @@ from pyFTS.data import TAIEX, SP500, NASDAQ, Malaysia, Enrollments from pyFTS.partitioners import Grid from pyFTS.models import pwfts, tsaur -train = TAIEX.get_data()[:1000] -test = TAIEX.get_data()[1000:1200] +dataset = pd.read_csv('/home/petronio/Downloads/kalang.csv', sep=',') -partitioner = Grid.GridPartitioner(data=train, npart=35) +dataset['date'] = pd.to_datetime(dataset["date"], format='%Y-%m-%d %H:%M:%S') -#model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner) #, order=2, lags=[3,4]) -model = tsaur.MarkovWeightedFTS(partitioner=partitioner) -model.fit(train) +train_uv = dataset['value'].values[:24505] +test_uv = dataset['value'].values[24505:] + +partitioner = Grid.GridPartitioner(data=train_uv, npart=35) + +model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner) #, order=2, lags=[3,4]) +#model = tsaur.MarkovWeightedFTS(partitioner=partitioner) +model.fit(train_uv) from pyFTS.benchmarks import benchmarks as bchmk print(model) -print(model.forecast(test)) +print(model.forecast(test_uv)) #distributions = model.predict(y[800:820]) diff --git a/pyFTS/tests/multivariate.py b/pyFTS/tests/multivariate.py index f0f09e4..5522454 100644 --- a/pyFTS/tests/multivariate.py +++ b/pyFTS/tests/multivariate.py @@ -18,6 +18,53 @@ from pyFTS.models.multivariate import common, variable, mvfts, cmvfts from pyFTS.models.seasonal import partitioner as seasonal from pyFTS.models.seasonal.common import DateTime +dataset = pd.read_csv('/home/petronio/Downloads/kalang.csv', sep=',') + +dataset['date'] = pd.to_datetime(dataset["date"], format='%Y-%m-%d %H:%M:%S') + +train_uv = dataset['value'].values[:24505] +test_uv = dataset['value'].values[24505:] + +train_mv = dataset.iloc[:24505] +test_mv = dataset.iloc[24505:] + +print(train_mv) + +sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k)+'hs' for k in range(0,24)]} + +vhour = variable.Variable("Hour", data_label="date", partitioner=seasonal.TimeGridPartitioner, npart=24, + data=train_mv, partitioner_specific=sp) + +vvalue = variable.Variable("Pollution", data_label="value", alias='value', + partitioner=Grid.GridPartitioner, npart=35, + data=train_mv) + +parameters = [ + {},{}, + {'order':2, 'knn': 1}, + {'order':2, 'knn': 2}, + {'order':2, 'knn': 3}, +] + +for ct, method in enumerate([mvfts.MVFTS, wmvfts.WeightedMVFTS, + cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS]): + print(method) + model = method(**parameters[ct]) + model.shortname += str(ct) + model.append_variable(vhour) + model.append_variable(vvalue) + model.target_variable = vvalue + model.fit(train_mv) + + Util.persist_obj(model, model.shortname) + + forecasts = model.predict(test_mv.iloc[:100]) + + print(model) + + + +''' from pyFTS.data import henon df = henon.get_dataframe(iterations=1000) @@ -47,4 +94,5 @@ ax[1][0].scatter(forecasts['y'].values,forecasts['x'].values) ax[1][1].plot(test['y'].values) ax[1][1].plot(forecasts['y'].values) -print(forecasts) \ No newline at end of file +print(forecasts) +''' \ No newline at end of file diff --git a/pyFTS/tests/spark.py b/pyFTS/tests/spark.py index 3029d38..ff7e36f 100644 --- a/pyFTS/tests/spark.py +++ b/pyFTS/tests/spark.py @@ -13,15 +13,20 @@ import os os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3' -conf = SparkConf() -conf.setMaster('spark://192.168.0.110:7077') -conf.setAppName('pyFTS') - data = TAIEX.get_data() fs = Grid.GridPartitioner(data=data, npart=50) +model = hofts.WeightedHighOrderFTS(partitioner=fs, order=2) +model.fit(data, distributed='spark', url='spark://192.168.0.110:7077') +#model.fit(data, distributed='dispy', nodes=['192.168.0.110']) + +print(model) + + + +''' def fun(x): return (x, x % 2) @@ -76,7 +81,7 @@ with SparkContext(conf=conf) as sc: print(model) - +'''