From 354a3131c9b359ad65b104dd3a1fafcb467635d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido?= Date: Thu, 20 Jun 2019 17:38:36 -0300 Subject: [PATCH] Refactoring to remove FTS.sets attribure; Refactoring to move Partitioner.extract to a class method; Bugfixes in distributed fit and predict --- pyFTS/benchmarks/benchmarks.py | 58 ++++++++++++ pyFTS/common/Util.py | 24 +++-- pyFTS/common/fts.py | 35 +++---- pyFTS/distributed/dispy.py | 33 ++++++- pyFTS/hyperparam/Evolutionary.py | 36 ++++--- pyFTS/models/chen.py | 4 +- pyFTS/models/ifts.py | 12 +-- pyFTS/models/ismailefendi.py | 6 +- pyFTS/models/nonstationary/cvfts.py | 1 + pyFTS/models/nonstationary/nsfts.py | 16 ++-- pyFTS/models/sadaei.py | 6 +- pyFTS/models/seasonal/partitioner.py | 5 +- pyFTS/models/tsaur.py | 6 +- pyFTS/models/yu.py | 8 +- pyFTS/partitioners/partitioner.py | 6 +- pyFTS/tests/distributed.py | 135 ++++++--------------------- pyFTS/tests/general.py | 65 +++++++++++-- pyFTS/tests/hyperparam.py | 20 ++-- 18 files changed, 266 insertions(+), 210 deletions(-) diff --git a/pyFTS/benchmarks/benchmarks.py b/pyFTS/benchmarks/benchmarks.py index 8a76d2d..8e6459e 100644 --- a/pyFTS/benchmarks/benchmarks.py +++ b/pyFTS/benchmarks/benchmarks.py @@ -824,6 +824,17 @@ def run_probabilistic2(fts_method, order, partitioner_method, partitions, transf return ret +def common_process_time_jobs(conn, data, job): + dta = deepcopy(data) + dta.append(job['steps']) + dta.append(job['method']) + for key in ["time"]: + if key in job: + data2 = deepcopy(dta) + data2.extend([key, job[key]]) + bUtil.insert_benchmark(data2, conn) + + def common_process_point_jobs(conn, data, job): dta = deepcopy(data) dta.append(job['steps']) @@ -1416,3 +1427,50 @@ def pftsExploreOrderAndPartitions(data,save=False, file=None): cUtil.show_and_save_image(fig, file, save) + +def train_test_time(data, windowsize, train=0.8, **kwargs): + import time + + tag = __pop('tag', None, kwargs) + steps = __pop('steps', 0, kwargs) + dataset = __pop('dataset', None, kwargs) + + partitions = __pop('partitions', 10, kwargs) + + fts_methods = __pop('methods', [], kwargs) + + file = kwargs.get('file', "benchmarks.db") + + inc = __pop("inc", 0.1, kwargs) + + conn = bUtil.open_benchmark_db(file) + + for ct, train, test in cUtil.sliding_window(data, windowsize, train, inc=inc, **kwargs): + partitioner = Grid.GridPartitioner(data=train, npart=partitions) + for id, fts_method in enumerate(fts_methods): + print(dataset, fts_method, ct) + times = [] + model = fts_method(partitioner = partitioner, **kwargs) + _start = time.time() + model.fit(train, **kwargs) + _end = time.time() + times.append( _end - _start ) + _start = time.time() + model.predict(train, **kwargs) + _end = time.time() + times.append(_end - _start) + for ct, method in enumerate(['train','test']): + job = { + 'steps': steps, 'method': method, 'time': times[ct], + 'model': model.shortname, 'transformation': None, + 'order': model.order, 'partitioner': partitioner.name, + 'partitions': partitions, 'size': len(model) + } + + data = bUtil.process_common_data2(dataset, tag, 'train', job) + common_process_time_jobs(conn, data, job) + + + conn.close() + + diff --git a/pyFTS/common/Util.py b/pyFTS/common/Util.py index 35a21d0..174c009 100644 --- a/pyFTS/common/Util.py +++ b/pyFTS/common/Util.py @@ -13,8 +13,6 @@ from pyFTS.probabilistic import ProbabilityDistribution from pyFTS.common import Transformations - - def plot_compared_intervals_ahead(original, models, colors, distributions, time_from, time_to, intervals = True, save=False, file=None, tam=[20, 5], resolution=None, cmap='Blues', linewidth=1.5): @@ -126,8 +124,9 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]): handles0, labels0 = ax.get_legend_handles_labels() ax.legend(handles0, labels0) + def plot_distribution(ax, cmap, probabilitydist, fig, time_from, reference_data=None): - ''' + """ Plot forecasted ProbabilityDistribution objects on a matplotlib axis :param ax: matplotlib axis @@ -137,7 +136,7 @@ def plot_distribution(ax, cmap, probabilitydist, fig, time_from, reference_data= :param time_from: starting time (on x axis) to begin the plots :param reference_data: :return: - ''' + """ from matplotlib.patches import Rectangle from matplotlib.collections import PatchCollection patches = [] @@ -163,7 +162,7 @@ def plot_distribution(ax, cmap, probabilitydist, fig, time_from, reference_data= def plot_distribution2(probabilitydist, data, **kwargs): - ''' + """ Plot distributions over the time (x-axis) :param probabilitydist: the forecasted probability distributions to plot @@ -173,7 +172,7 @@ def plot_distribution2(probabilitydist, data, **kwargs): :keyword cmap: a matplotlib colormap name, the default value is 'Blues' :keyword quantiles: the list of quantiles intervals to plot, e. g. [.05, .25, .75, .95] :keyword median: a boolean value indicating if the median value will be plot. - ''' + """ import matplotlib.colorbar as cbar import matplotlib.cm as cm @@ -225,7 +224,7 @@ def plot_distribution2(probabilitydist, data, **kwargs): def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False, ls='-', linewidth=1): - ''' + """ Plot forecasted intervals on matplotlib :param axis: matplotlib axis @@ -237,7 +236,7 @@ def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False :param ls: matplotlib line style :param linewidth: matplotlib width :return: - ''' + """ lower = [kk[0] for kk in intervals] upper = [kk[1] for kk in intervals] mi = min(lower) * 0.95 @@ -252,7 +251,7 @@ def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False def plot_interval2(intervals, data, **kwargs): - ''' + """ Plot forecasted intervals on matplotlib :param intervals: list of forecasted intervals @@ -263,7 +262,7 @@ def plot_interval2(intervals, data, **kwargs): :keyword typeonlegend: :keyword ls: matplotlib line style :keyword linewidth: matplotlib width - ''' + """ l = len(intervals) @@ -296,7 +295,7 @@ def plot_interval2(intervals, data, **kwargs): def plot_rules(model, size=[5, 5], axis=None, rules_by_axis=None, columns=1): - ''' + """ Plot the FLRG rules of a FTS model on a matplotlib axis :param model: FTS model @@ -305,7 +304,7 @@ def plot_rules(model, size=[5, 5], axis=None, rules_by_axis=None, columns=1): :param rules_by_axis: number of rules plotted by column :param columns: number of columns :return: - ''' + """ if axis is None and rules_by_axis is None: rows = 1 elif axis is None and rules_by_axis is not None: @@ -412,7 +411,6 @@ def uniquefilename(name): return name + str(current_milli_time()) - def show_and_save_image(fig, file, flag, lgd=None): """ Show and image and save on file diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index 8595f97..f9e9cc6 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -11,9 +11,6 @@ class FTS(object): """ Create a Fuzzy Time Series model """ - - self.sets = {} - """The list of fuzzy sets used on this model""" self.flrgs = {} """The list of Fuzzy Logical Relationship Groups - FLRG""" self.order = kwargs.get('order',1) @@ -83,8 +80,8 @@ class FTS(object): """ best = {"fuzzyset": "", "membership": 0.0} - for f in self.sets: - fset = self.sets[f] + for f in self.partitioner.sets: + fset = self.partitioner.sets[f] if best["membership"] <= fset.membership(data): best["fuzzyset"] = fset.name best["membership"] = fset.membership(data) @@ -129,12 +126,12 @@ class FTS(object): else: distributed = False - if distributed is None or distributed == False: + if 'type' in kwargs: + type = kwargs.pop("type") + else: + type = 'point' - if 'type' in kwargs: - type = kwargs.pop("type") - else: - type = 'point' + if distributed is None or distributed == False: steps_ahead = kwargs.get("steps_ahead", None) @@ -321,25 +318,19 @@ class FTS(object): self.original_min = np.nanmin(data) self.original_max = np.nanmax(data) - if 'sets' in kwargs: - self.sets = kwargs.pop('sets') - if 'partitioner' in kwargs: self.partitioner = kwargs.pop('partitioner') - if not self.is_wrapper: - if (self.sets is None or len(self.sets) == 0) and not self.benchmark_only and not self.is_multivariate: - if self.partitioner is not None: - self.sets = self.partitioner.sets - else: - raise Exception("Fuzzy sets were not provided for the model. Use 'sets' parameter or 'partitioner'. ") + if not self.is_wrapper and not self.benchmark_only: + if self.partitioner is None: + raise Exception("Fuzzy sets were not provided for the model. Use 'partitioner' parameter. ") if 'order' in kwargs: self.order = kwargs.pop('order') dump = kwargs.get('dump', None) - num_batches = kwargs.get('num_batches', 1) + num_batches = kwargs.get('num_batches', 10) save = kwargs.get('save_model', False) # save model on disk @@ -419,6 +410,8 @@ class FTS(object): """ self.order = model.order + self.partitioner = model.partitioner + self.lags = model.lags self.shortname = model.shortname self.name = model.name self.detail = model.detail @@ -434,8 +427,6 @@ class FTS(object): self.transformations_param = model.transformations_param self.original_max = model.original_max self.original_min = model.original_min - self.partitioner = model.partitioner - self.sets = model.sets self.auto_update = model.auto_update self.benchmark_only = model.benchmark_only self.indexer = model.indexer diff --git a/pyFTS/distributed/dispy.py b/pyFTS/distributed/dispy.py index f9c0eab..5b03cf5 100644 --- a/pyFTS/distributed/dispy.py +++ b/pyFTS/distributed/dispy.py @@ -4,6 +4,13 @@ import numpy as np def start_dispy_cluster(method, nodes): + """ + Start a new Dispy cluster on 'nodes' to execute the method 'method' + + :param method: function to be executed on each cluster node + :param nodes: list of node names or IP's. + :return: the dispy cluster instance and the http_server for monitoring + """ cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000) @@ -13,6 +20,13 @@ def start_dispy_cluster(method, nodes): def stop_dispy_cluster(cluster, http_server): + """ + Stop a dispy cluster and http_server + + :param cluster: + :param http_server: + :return: + """ cluster.wait() # wait for all jobs to finish cluster.print_status() @@ -21,7 +35,23 @@ def stop_dispy_cluster(cluster, http_server): cluster.close() +def get_number_of_cpus(cluster): + cpus = 0 + for dispy_node in cluster.status().nodes: + cpus += dispy_node.cpus + + return cpus + + def simple_model_train(model, data, parameters): + """ + Cluster function that receives a FTS instance 'model' and train using the 'data' and 'parameters' + + :param model: a FTS instance + :param data: training dataset + :param parameters: parameters for the training process + :return: the trained model + """ model.train(data, **parameters) return model @@ -38,7 +68,8 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches= cluster, http_server = start_dispy_cluster(train_method, nodes) - print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now())) + print("[{0: %H:%M:%S}] Distrituted Train Started with {1} CPU's" + .format(datetime.datetime.now(), get_number_of_cpus(cluster))) jobs = [] n = len(data) diff --git a/pyFTS/hyperparam/Evolutionary.py b/pyFTS/hyperparam/Evolutionary.py index efb86db..7f43ce8 100644 --- a/pyFTS/hyperparam/Evolutionary.py +++ b/pyFTS/hyperparam/Evolutionary.py @@ -10,8 +10,8 @@ import random from pyFTS.common import Util from pyFTS.benchmarks import Measures from pyFTS.partitioners import Grid, Entropy # , Huarng -from pyFTS.models import hofts from pyFTS.common import Membership +from pyFTS.models import hofts, ifts, pwfts from pyFTS.hyperparam import Util as hUtil from pyFTS.distributed import dispy as dUtil @@ -71,7 +71,7 @@ def initial_population(n): return pop -def phenotype(individual, train, fts_method=hofts.WeightedHighOrderFTS, parameters={}): +def phenotype(individual, train, fts_method, parameters={}): """ Instantiate the genotype, creating a fitted model with the genotype hyperparameters @@ -81,6 +81,8 @@ def phenotype(individual, train, fts_method=hofts.WeightedHighOrderFTS, paramete :param parameters: dict with model specific arguments for fit method. :return: a fitted FTS model """ + from pyFTS.models import hofts, ifts, pwfts + if individual['mf'] == 1: mf = Membership.trimf elif individual['mf'] == 2: @@ -117,6 +119,7 @@ def evaluate(dataset, individual, **kwargs): :param parameters: dict with model specific arguments for fit method. :return: a tuple (len_lags, rmse) with the parsimony fitness value and the accuracy fitness value """ + from pyFTS.models import hofts, ifts, pwfts from pyFTS.common import Util from pyFTS.benchmarks import Measures from pyFTS.hyperparam.Evolutionary import phenotype, __measures @@ -306,7 +309,7 @@ def mutation(individual, **kwargs): return individual -def elitism(population, new_population): +def elitism(population, new_population, **kwargs): """ Elitism operation, always select the best individual of the population and discard the worst @@ -418,12 +421,12 @@ def GeneticAlgorithm(dataset, **kwargs): # Selection for j in range(int(npop * psel)): - new_population.append(selection_operator(population)) + new_population.append(selection_operator(population, **kwargs)) # Crossover new = [] for j in range(int(npop * pcross)): - new.append(crossover_operator(new_population)) + new.append(crossover_operator(new_population, **kwargs)) new_population.extend(new) @@ -431,7 +434,7 @@ def GeneticAlgorithm(dataset, **kwargs): for ct, individual in enumerate(new_population): rnd = random.uniform(0, 1) if rnd < pmut: - new_population[ct] = mutation_operator(individual) + new_population[ct] = mutation_operator(individual, **kwargs) # Evaluation if collect_statistics: @@ -472,7 +475,7 @@ def GeneticAlgorithm(dataset, **kwargs): # Elitism if _elitism: - population = elitism_operator(population, new_population) + population = elitism_operator(population, new_population, **kwargs) population = population[:npop] @@ -503,22 +506,22 @@ def GeneticAlgorithm(dataset, **kwargs): return best, statistics -def process_experiment(result, datasetname, conn): - log_result(conn, datasetname, result['individual']) - persist_statistics(result['statistics']) +def process_experiment(fts_method, result, datasetname, conn): + log_result(conn, datasetname, fts_method, result['individual']) + persist_statistics(datasetname, result['statistics']) return result['individual'] -def persist_statistics(statistics): +def persist_statistics(datasetname, statistics): import json - with open('statistics{}.json'.format(time.time()), 'w') as file: + with open('statistics_{}.json'.format(datasetname), 'w') as file: file.write(json.dumps(statistics)) -def log_result(conn, datasetname, result): +def log_result(conn, datasetname, fts_method, result): metrics = ['rmse', 'size', 'time'] for metric in metrics: - record = (datasetname, 'Evolutive', 'WHOFTS', None, result['mf'], + record = (datasetname, 'Evolutive', fts_method, None, result['mf'], result['order'], result['partitioner'], result['npart'], result['alpha'], str(result['lags']), metric, result[metric]) @@ -568,6 +571,9 @@ def execute(datasetname, dataset, **kwargs): distributed = kwargs.get('distributed', False) + fts_method = kwargs.get('fts_method', hofts.WeightedHighOrderFTS) + shortname = str(fts_method.__module__).split('.')[-1] + if distributed == 'dispy': nodes = kwargs.get('nodes', ['127.0.0.1']) cluster, http_server = dUtil.start_dispy_cluster(evaluate, nodes=nodes) @@ -583,7 +589,7 @@ def execute(datasetname, dataset, **kwargs): ret['time'] = end - start experiment = {'individual': ret, 'statistics': statistics} - ret = process_experiment(experiment, datasetname, conn) + ret = process_experiment(shortname, experiment, datasetname, conn) if distributed == 'dispy': dUtil.stop_dispy_cluster(cluster, http_server) diff --git a/pyFTS/models/chen.py b/pyFTS/models/chen.py index 9f70968..045118a 100644 --- a/pyFTS/models/chen.py +++ b/pyFTS/models/chen.py @@ -64,7 +64,7 @@ class ConventionalFTS(fts.FTS): for k in np.arange(0, l): - actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets) + actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets) if explain: print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name)) @@ -78,7 +78,7 @@ class ConventionalFTS(fts.FTS): else: _flrg = self.flrgs[actual.name] - mp = _flrg.get_midpoint(self.sets) + mp = _flrg.get_midpoint(self.partitioner.sets) ret.append(mp) diff --git a/pyFTS/models/ifts.py b/pyFTS/models/ifts.py index 4772e8c..20855ed 100644 --- a/pyFTS/models/ifts.py +++ b/pyFTS/models/ifts.py @@ -33,9 +33,9 @@ class IntervalFTS(hofts.HighOrderFTS): if len(flrg.LHS) > 0: if flrg.get_key() in self.flrgs: tmp = self.flrgs[flrg.get_key()] - ret = tmp.get_upper(self.sets) + ret = tmp.get_upper(self.partitioner.sets) else: - ret = self.sets[flrg.LHS[-1]].upper + ret = self.partitioner.sets[flrg.LHS[-1]].upper return ret def get_lower(self, flrg): @@ -74,7 +74,7 @@ class IntervalFTS(hofts.HighOrderFTS): for flrg in flrgs: if len(flrg.LHS) > 0: - mv = flrg.get_membership(sample, self.sets) + mv = flrg.get_membership(sample, self.partitioner.sets) up.append(mv * self.get_upper(flrg)) lo.append(mv * self.get_lower(flrg)) affected_flrgs_memberships.append(mv) @@ -119,9 +119,9 @@ class WeightedIntervalFTS(hofts.WeightedHighOrderFTS): if len(flrg.LHS) > 0: if flrg.get_key() in self.flrgs: tmp = self.flrgs[flrg.get_key()] - ret = tmp.get_upper(self.sets) + ret = tmp.get_upper(self.partitioner.sets) else: - ret = self.sets[flrg.LHS[-1]].upper + ret = self.partitioner.sets[flrg.LHS[-1]].upper return ret def get_lower(self, flrg): @@ -159,7 +159,7 @@ class WeightedIntervalFTS(hofts.WeightedHighOrderFTS): for flrg in flrgs: if len(flrg.LHS) > 0: - mv = flrg.get_membership(sample, self.sets) + mv = flrg.get_membership(sample, self.partitioner.sets) up.append(mv * self.get_upper(flrg)) lo.append(mv * self.get_lower(flrg)) affected_flrgs_memberships.append(mv) diff --git a/pyFTS/models/ismailefendi.py b/pyFTS/models/ismailefendi.py index 237a973..89811e4 100644 --- a/pyFTS/models/ismailefendi.py +++ b/pyFTS/models/ismailefendi.py @@ -74,7 +74,7 @@ class ImprovedWeightedFTS(fts.FTS): if self.partitioner is not None: ordered_sets = self.partitioner.ordered_sets else: - ordered_sets = FuzzySet.set_ordered(self.sets) + ordered_sets = FuzzySet.set_ordered(self.partitioner.sets) ndata = np.array(ndata) @@ -84,7 +84,7 @@ class ImprovedWeightedFTS(fts.FTS): for k in np.arange(0, l): - actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets) + actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets) if explain: print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name)) @@ -97,7 +97,7 @@ class ImprovedWeightedFTS(fts.FTS): else: flrg = self.flrgs[actual.name] - mp = flrg.get_midpoints(self.sets) + mp = flrg.get_midpoints(self.partitioner.sets) final = mp.dot(flrg.weights()) diff --git a/pyFTS/models/nonstationary/cvfts.py b/pyFTS/models/nonstationary/cvfts.py index 8d42794..b9c4c92 100644 --- a/pyFTS/models/nonstationary/cvfts.py +++ b/pyFTS/models/nonstationary/cvfts.py @@ -55,6 +55,7 @@ class ConditionalVarianceFTS(hofts.HighOrderFTS): self.variance_residual = 0. self.mean_residual = 0. self.memory_window = kwargs.get("memory_window",5) + self.sets = {} def train(self, ndata, **kwargs): diff --git a/pyFTS/models/nonstationary/nsfts.py b/pyFTS/models/nonstationary/nsfts.py index e33b42c..e344045 100644 --- a/pyFTS/models/nonstationary/nsfts.py +++ b/pyFTS/models/nonstationary/nsfts.py @@ -69,11 +69,11 @@ class NonStationaryFTS(fts.FTS): if self.method == 'unconditional': window_size = kwargs.get('parameters', 1) - tmpdata = common.fuzzySeries(data, self.sets, + tmpdata = common.fuzzySeries(data, self.partitioner.sets, self.partitioner.ordered_sets, window_size, method='fuzzy') else: - tmpdata = common.fuzzySeries(data, self.sets, + tmpdata = common.fuzzySeries(data, self.partitioner.sets, self.partitioner.ordered_sets, method='fuzzy', const_t=0) @@ -190,9 +190,9 @@ class NonStationaryFTS(fts.FTS): ix = affected_sets[0][0] aset = self.partitioner.ordered_sets[ix] if aset in self.flrgs: - numerator.append(self.flrgs[aset].get_midpoint(self.sets, perturb[ix])) + numerator.append(self.flrgs[aset].get_midpoint(self.partitioner.sets, perturb[ix])) else: - fuzzy_set = self.sets[aset] + fuzzy_set = self.partitioner.sets[aset] numerator.append(fuzzy_set.get_midpoint(perturb[ix])) denominator.append(1) else: @@ -201,9 +201,9 @@ class NonStationaryFTS(fts.FTS): fs = self.partitioner.ordered_sets[ix] tdisp = perturb[ix] if fs in self.flrgs: - numerator.append(self.flrgs[fs].get_midpoint(self.sets, tdisp) * aset[1]) + numerator.append(self.flrgs[fs].get_midpoint(self.partitioner.sets, tdisp) * aset[1]) else: - fuzzy_set = self.sets[fs] + fuzzy_set = self.partitioner.sets[fs] numerator.append(fuzzy_set.get_midpoint(tdisp) * aset[1]) denominator.append(aset[1]) @@ -241,9 +241,9 @@ class NonStationaryFTS(fts.FTS): tdisp = common.window_index(k + time_displacement, window_size) - affected_sets = [[self.sets[key], self.sets[key].membership(ndata[k], tdisp)] + affected_sets = [[self.partitioner.sets[key], self.partitioner.sets[key].membership(ndata[k], tdisp)] for key in self.partitioner.ordered_sets - if self.sets[key].membership(ndata[k], tdisp) > 0.0] + if self.partitioner.sets[key].membership(ndata[k], tdisp) > 0.0] if len(affected_sets) == 0: affected_sets.append([common.check_bounds(ndata[k], self.partitioner, tdisp), 1.0]) diff --git a/pyFTS/models/sadaei.py b/pyFTS/models/sadaei.py index baca8e9..3e379e0 100644 --- a/pyFTS/models/sadaei.py +++ b/pyFTS/models/sadaei.py @@ -78,7 +78,7 @@ class ExponentialyWeightedFTS(fts.FTS): if self.partitioner is not None: ordered_sets = self.partitioner.ordered_sets else: - ordered_sets = FuzzySet.set_ordered(self.sets) + ordered_sets = FuzzySet.set_ordered(self.partitioner.sets) data = np.array(ndata) @@ -88,7 +88,7 @@ class ExponentialyWeightedFTS(fts.FTS): for k in np.arange(0, l): - actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets) + actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets) if explain: print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name)) @@ -101,7 +101,7 @@ class ExponentialyWeightedFTS(fts.FTS): else: flrg = self.flrgs[actual.name] - mp = flrg.get_midpoints(self.sets) + mp = flrg.get_midpoints(self.partitioner.sets) final = mp.dot(flrg.weights()) diff --git a/pyFTS/models/seasonal/partitioner.py b/pyFTS/models/seasonal/partitioner.py index 4d3e6a5..144e227 100644 --- a/pyFTS/models/seasonal/partitioner.py +++ b/pyFTS/models/seasonal/partitioner.py @@ -44,8 +44,11 @@ class TimeGridPartitioner(partitioner.Partitioner): else: self.ordered_sets = FS.set_ordered(self.sets) + def extractor(self,x): if self.type == 'seasonal': - self.extractor = lambda x: strip_datepart(x, self.season, self.mask) + return strip_datepart(x, self.season, self.mask) + else: + return x def build(self, data): sets = {} diff --git a/pyFTS/models/tsaur.py b/pyFTS/models/tsaur.py index 7d3535f..281abe2 100644 --- a/pyFTS/models/tsaur.py +++ b/pyFTS/models/tsaur.py @@ -79,7 +79,7 @@ class MarkovWeightedFTS(fts.FTS): if self.partitioner is not None: ordered_sets = self.partitioner.ordered_sets else: - ordered_sets = FuzzySet.set_ordered(self.sets) + ordered_sets = FuzzySet.set_ordered(self.partitioner.sets) data = np.array(ndata) @@ -89,7 +89,7 @@ class MarkovWeightedFTS(fts.FTS): for k in np.arange(0, l): - actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets) + actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets) if explain: print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name)) @@ -102,7 +102,7 @@ class MarkovWeightedFTS(fts.FTS): else: flrg = self.flrgs[actual.name] - mp = flrg.get_midpoints(self.sets) + mp = flrg.get_midpoints(self.partitioner.sets) final = mp.dot(flrg.weights()) diff --git a/pyFTS/models/yu.py b/pyFTS/models/yu.py index e0d4418..3559a7a 100644 --- a/pyFTS/models/yu.py +++ b/pyFTS/models/yu.py @@ -70,7 +70,7 @@ class WeightedFTS(fts.FTS): if self.partitioner is not None: ordered_sets = self.partitioner.ordered_sets else: - ordered_sets = FuzzySet.set_ordered(self.sets) + ordered_sets = FuzzySet.set_ordered(self.partitioner.sets) ndata = np.array(ndata) @@ -80,7 +80,7 @@ class WeightedFTS(fts.FTS): for k in np.arange(0, l): - actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets) + actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets) if explain: print("Fuzzyfication:\n\n {} -> {} \n\n".format(ndata[k], actual.name)) @@ -93,9 +93,9 @@ class WeightedFTS(fts.FTS): else: flrg = self.flrgs[actual.name] - mp = flrg.get_midpoints(self.sets) + mp = flrg.get_midpoints(self.partitioner.sets) - final = mp.dot(flrg.weights(self.sets)) + final = mp.dot(flrg.weights(self.partitioner.sets)) ret.append(final) diff --git a/pyFTS/partitioners/partitioner.py b/pyFTS/partitioners/partitioner.py index 2493d31..aafc4f2 100644 --- a/pyFTS/partitioners/partitioner.py +++ b/pyFTS/partitioners/partitioner.py @@ -31,8 +31,6 @@ class Partitioner(object): """In a multivariate context, the variable that contains this partitioner""" self.type = kwargs.get('type', 'common') """The type of fuzzy sets that are generated by this partitioner""" - self.extractor = kwargs.get('extractor', lambda x: x) - """Anonymous function used to extract a single primitive type from an object instance""" self.ordered_sets = None """A ordered list of the fuzzy sets names, sorted by their middle point""" self.kdtree = None @@ -76,6 +74,10 @@ class Partitioner(object): del(ndata) + def extractor(self,x): + """Extract a single primitive type from an structured instance""" + return x + def build(self, data): """ Perform the partitioning of the Universe of Discourse diff --git a/pyFTS/tests/distributed.py b/pyFTS/tests/distributed.py index 7401ff3..f7cb7c6 100644 --- a/pyFTS/tests/distributed.py +++ b/pyFTS/tests/distributed.py @@ -7,122 +7,39 @@ import numpy as np import os from pyFTS.common import Transformations from copy import deepcopy -from pyFTS.nonstationary import flrg, util, honsfts, partitioners -from pyFTS.models.nonstationary import nsfts +from pyFTS.models import pwfts +from pyFTS.benchmarks import benchmarks as bchmk, Measures -bc = Transformations.BoxCox(0) +import time -import dispy -import dispy.httpd +from pyFTS.data import SONDA, Malaysia -os.chdir("/home/petronio/Dropbox/Doutorado/Codigos/") +datasets = {} +sonda = SONDA.get_dataframe()[['datahora','glo_avg','ws_10m']] -def evaluate_individual_model(model, partitioner, train, test, window_size, time_displacement): - import numpy as np - from pyFTS.partitioners import Grid - from pyFTS.benchmarks import Measures +sonda = sonda.drop(sonda.index[np.where(sonda["ws_10m"] <= 0.01)]) +sonda = sonda.drop(sonda.index[np.where(sonda["glo_avg"] <= 0.01)]) +sonda = sonda.dropna() - try: - model.train(train, sets=partitioner.sets, order=model.order, parameters=window_size) - forecasts = model.forecast(test, time_displacement=time_displacement, window_size=window_size) - _rmse = Measures.rmse(test[model.order:], forecasts[:-1]) - _mape = Measures.mape(test[model.order:], forecasts[:-1]) - _u = Measures.UStatistic(test[model.order:], forecasts[:-1]) - except Exception as e: - print(e) - _rmse = np.nan - _mape = np.nan - _u = np.nan +malaysia = Malaysia.get_dataframe() - return {'model': model.shortname, 'partitions': partitioner.partitions, 'order': model.order, - 'rmse': _rmse, 'mape': _mape, 'u': _u} +datasets['SONDA.ws_10m'] = sonda["ws_10m"].values +datasets['SONDA.glo_avg'] = sonda["glo_avg"].values +datasets['Malaysia.temperature'] = malaysia["temperature"].values +datasets['Malaysia.load'] = malaysia["load"].values +windows = [600000, 600000, 10000, 10000] -data = pd.read_csv("DataSets/synthetic_nonstationary_dataset_A.csv", sep=";") -data = np.array(data["0"][:]) +cpus = 3 -cluster = dispy.JobCluster(evaluate_individual_model, nodes=['192.168.0.108', '192.168.0.110']) -http_server = dispy.httpd.DispyHTTPServer(cluster) - -jobs = [] - -models = [] - -for order in [1, 2, 3]: - if order == 1: - model = nsfts.NonStationaryFTS("") - else: - model = honsfts.HighOrderNonStationaryFTS("") - - model.order = order - - models.append(model) - -for ct, train, test in cUtil.sliding_window(data, 300): - for partition in np.arange(5, 100, 1): - tmp_partitioner = Grid.GridPartitioner(train, partition) - partitioner = partitioners.PolynomialNonStationaryPartitioner(train, tmp_partitioner, - window_size=35, degree=1) - for model in models: - # print(model.shortname, partition, model.order) - #job = evaluate_individual_model(model, train, test) - job = cluster.submit(deepcopy(model), deepcopy(partitioner), train, test, 35, 240) - job.id = ct + model.order*100 - jobs.append(job) - -results = {} - -for job in jobs: - tmp = job() - # print(tmp) - if job.status == dispy.DispyJob.Finished and tmp is not None: - _m = tmp['model'] - _o = tmp['order'] - _p = tmp['partitions'] - if _m not in results: - results[_m] = {} - if _o not in results[_m]: - results[_m][_o] = {} - if _p not in results[_m][_o]: - results[_m][_o][_p] = {} - results[_m][_o][_p]['rmse'] = [] - results[_m][_o][_p]['mape'] = [] - results[_m][_o][_p]['u'] = [] - - results[_m][_o][_p]['rmse'].append_rhs(tmp['rmse']) - results[_m][_o][_p]['mape'].append_rhs(tmp['mape']) - results[_m][_o][_p]['u'].append_rhs(tmp['u']) - -cluster.wait() # wait for all jobs to finish - -cluster.print_status() - -http_server.shutdown() # this waits until browser gets all updates -cluster.close() - -dados = [] -ncolunas = None - -for m in sorted(results.keys()): - for o in sorted(results[m].keys()): - for p in sorted(results[m][o].keys()): - for r in ['rmse', 'mape', 'u']: - tmp = [] - tmp.append(m) - tmp.append(o) - tmp.append(p) - tmp.append(r) - tmp.extend(results[m][o][p][r]) - - dados.append(tmp) - - if ncolunas is None: - ncolunas = len(results[m][o][p][r]) - -colunas = ["model", "order", "partitions", "metric"] -for k in np.arange(0, ncolunas): - colunas.append(str(k)) - -dat = pd.DataFrame(dados, columns=colunas) -dat.to_csv("experiments/nsfts_partitioning_A.csv", sep=";") +for ct, (dataset_name, dataset) in enumerate(datasets.items()): + bchmk.train_test_time(dataset, windowsize=windows[ct], train=0.9, inc=.5, + methods=[pwfts.ProbabilisticWeightedFTS], + order=2, + partitions=50, + steps=cpus, + num_batches=cpus, + distributed='dispy', nodes=['192.168.0.110'], #, '192.168.0.107','192.168.0.106'], + file="experiments.db", dataset=dataset_name, + tag="speedup") diff --git a/pyFTS/tests/general.py b/pyFTS/tests/general.py index 22e221d..3828f8a 100644 --- a/pyFTS/tests/general.py +++ b/pyFTS/tests/general.py @@ -46,24 +46,71 @@ Util.plot_distribution2(distributions, test[:10], start_at=model.order, ax=ax, c print("") ''' +from pyFTS.data import SONDA, Malaysia + + +def sample_by_hour(data): + return [np.nanmean(data[k:k+60]) for k in np.arange(0,len(data),60)] + + datasets = {} -datasets['TAIEX'] = TAIEX.get_data()[:5000] -datasets['NASDAQ'] = NASDAQ.get_data()[:5000] -datasets['SP500'] = SP500.get_data()[10000:15000] + +sonda = SONDA.get_dataframe()[['datahora','glo_avg','ws_10m']] + +sonda = sonda.drop(sonda.index[np.where(sonda["ws_10m"] <= 0.01)]) +sonda = sonda.drop(sonda.index[np.where(sonda["glo_avg"] <= 0.01)]) +sonda = sonda.dropna() + + +malaysia = Malaysia.get_dataframe() + +datasets['SONDA.ws_10m'] = sample_by_hour(sonda["ws_10m"].values) +datasets['SONDA.glo_avg'] = sample_by_hour(sonda["glo_avg"].values) +datasets['Malaysia.temperature'] = malaysia["temperature"].values +datasets['Malaysia.load'] = malaysia["load"].values + #''' for dataset_name, dataset in datasets.items(): - bchmk.sliding_window_benchmarks2(dataset, 1000, train=0.8, inc=0.2, - methods=[pwfts.ProbabilisticWeightedFTS], + bchmk.sliding_window_benchmarks2(dataset, 10000, train=0.9, inc=0.25, + methods=[hofts.HighOrderFTS, hofts.WeightedHighOrderFTS, pwfts.ProbabilisticWeightedFTS], benchmark_models=False, transformations=[None], - orders=[1, 2, 3], - partitions=np.arange(10, 100, 5), + orders=[2], + partitions=[50], progress=False, type='point', - distributed=True, nodes=['192.168.254.113'], + distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'], file="experiments.db", dataset=dataset_name, - tag="gridsearch") + tag="experiments") + +for dataset_name, dataset in datasets.items(): + bchmk.sliding_window_benchmarks2(dataset, 10000, train=0.9, inc=0.25, + methods=[ensemble.SimpleEnsembleFTS, ifts.IntervalFTS, + ifts.WeightedIntervalFTS, pwfts.ProbabilisticWeightedFTS], + methods_parameters=[{'partitions': [45, 50, 55], 'alpha':.05}, + {},{},{}], + benchmark_models=False, + transformations=[None], + orders=[2], + partitions=[50], + progress=False, type='interval', + distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'], + file="experiments.db", dataset=dataset_name, + tag="experiments") + +for dataset_name, dataset in datasets.items(): + bchmk.sliding_window_benchmarks2(dataset, 10000, train=0.9, inc=0.25, + methods=[ensemble.SimpleEnsembleFTS, pwfts.ProbabilisticWeightedFTS], + methods_parameters=[{'partitions':[45,50,55]}, {}], + benchmark_models=False, + transformations=[None], + orders=[2], + partitions=[50], + progress=False, type='distribution', + distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'], + file="experiments.db", dataset=dataset_name, + tag="experiments") ''' diff --git a/pyFTS/tests/hyperparam.py b/pyFTS/tests/hyperparam.py index 3fd1202..b62e7dd 100644 --- a/pyFTS/tests/hyperparam.py +++ b/pyFTS/tests/hyperparam.py @@ -3,18 +3,21 @@ import pandas as pd from pyFTS.hyperparam import GridSearch, Evolutionary from pyFTS.models import pwfts + def get_dataset(): from pyFTS.data import SONDA #from pyFTS.data import Malaysia - #data = SONDA.get_data('temperature')[:3000] - data = pd.read_csv('https://query.data.world/s/6xfb5useuotbbgpsnm5b2l3wzhvw2i', sep=';') - #data = Malaysia.get_data('temperature')[:1000] + data = [k for k in SONDA.get_data('ws_10m') if k > 0.1 and k != np.nan and k is not None] + data = [np.nanmean(data[k:k+60]) for k in np.arange(0,len(data),60)] + #data = pd.read_csv('https://query.data.world/s/6xfb5useuotbbgpsnm5b2l3wzhvw2i', sep=';') + #data = Malaysia.get_data('temperature') - return 'SONDA.glo_avg', data['glo_avg'].values #train, test + return 'SONDA.ws_10m', data #return 'Malaysia.temperature', data #train, test + #return 'Malaysia.temperature', data # train, test - +''' hyperparams = { 'order':[3], 'partitions': np.arange(10,100,3), @@ -24,7 +27,6 @@ hyperparams = { 'alpha': np.arange(.0, .5, .05) } -''' hyperparams = { 'order':[3], #[1, 2], 'partitions': np.arange(10,100,10), @@ -43,11 +45,11 @@ datsetname, dataset = get_dataset() ret = Evolutionary.execute(datsetname, dataset, ngen=30, npop=20,psel=0.6, pcross=.5, pmut=.3, - window_size=10000, train_rate=.9, increment_rate=1, - experiments=1, + window_size=10000, train_rate=.9, increment_rate=.3, + experiments=2, fts_method=pwfts.ProbabilisticWeightedFTS, database_file='experiments.db', - distributed=False, nodes=nodes) + distributed='dispy', nodes=nodes) #res = GridSearch.cluster_method({'mf':1, 'partitioner': 1, 'npart': 10, 'lags':[1], 'alpha': 0.0, 'order': 1}, # dataset, window_size = 10000, train_rate = .9, increment_rate = 1)