From d1b18ef5c4d5120b6469176c04542e2929b81db7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido=20de=20Lima=20e=20Silva?= Date: Sat, 20 May 2017 13:43:39 -0300 Subject: [PATCH] - Bugfixes and improvements on Ensemble FTS and distributed_benchmarks --- benchmarks/benchmarks.py | 2 +- benchmarks/distributed_benchmarks.py | 53 +++++++++++++++++++++------- ensemble.py | 38 +++++++++++++------- tests/ensemble.py | 24 +++++++------ tests/general.py | 26 ++++++++++---- 5 files changed, 101 insertions(+), 42 deletions(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index ec98c62..aedb6fa 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -64,7 +64,7 @@ def get_interval_methods(): def get_probabilistic_methods(): """Return all FTS methods for probabilistic forecasting""" - return [quantreg.QuantileRegression, ensemble.EnsembleFTS, pwfts.ProbabilisticWeightedFTS] + return [arima.ARIMA, ensemble.AllMethodEnsembleFTS, pwfts.ProbabilisticWeightedFTS] def run_point(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None): diff --git a/benchmarks/distributed_benchmarks.py b/benchmarks/distributed_benchmarks.py index 0fd3240..bbf9e82 100644 --- a/benchmarks/distributed_benchmarks.py +++ b/benchmarks/distributed_benchmarks.py @@ -417,19 +417,24 @@ def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, windo :return: a dictionary with the benchmark results """ import time - from pyFTS import hofts, ifts, pwfts + import numpy as np + from pyFTS import hofts, ifts, pwfts, ensemble from pyFTS.partitioners import Grid, Entropy, FCM from pyFTS.benchmarks import Measures, arima, quantreg - tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS, arima.ARIMA, quantreg.QuantileRegression] + tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS, arima.ARIMA, ensemble.AllMethodEnsembleFTS] tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] tmp3 = [Measures.get_distribution_statistics] - pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) - mfts.partitioner = partitioner + if mfts.benchmark_only: + _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha) + else: + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: mfts.appendTransformation(transformation) @@ -456,9 +461,10 @@ def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, windo return ret -def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, partitioners=[Grid.GridPartitioner], +def ahead_sliding_window(data, windowsize, steps, resolution, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, - save=False, file=None, sintetic=False,nodes=None, depends=None): + benchmark_models=None, benchmark_models_parameters = None, + save=False, file=None, synthetic=False, nodes=None): """ Distributed sliding window benchmarks for FTS probabilistic forecasters :param data: @@ -475,12 +481,21 @@ def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, :param dump: :param save: save results :param file: file path to save the results - :param sintetic: if true only the average and standard deviation of the results + :param synthetic: if true only the average and standard deviation of the results :param nodes: list of cluster nodes to distribute tasks :param depends: list of module dependencies :return: DataFrame with the results """ - cluster = dispy.JobCluster(run_point, nodes=nodes) # , depends=dependencies) + + alphas = [0.05, 0.25] + + if benchmark_models is None and models is None: + benchmark_models = [arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA] + + if benchmark_models_parameters is None: + benchmark_models_parameters = [(1, 0, 0), (1, 0, 1), (2, 0, 0), (2, 0, 1), (2, 0, 2)] + + cluster = dispy.JobCluster(run_ahead, nodes=nodes) # , depends=dependencies) http_server = dispy.httpd.DispyHTTPServer(cluster) @@ -511,10 +526,20 @@ def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, else: pool.append(mfts) + if benchmark_models is not None: + for count, model in enumerate(benchmark_models, start=0): + for a in alphas: + par = benchmark_models_parameters[count] + mfts = model(str(par if par is not None else ""), alpha=a, dist=True) + mfts.order = par + pool.append(mfts) + experiments = 0 - for ct, train, test in Util.sliding_window(data, windowsize, train): + for ct, train, test in Util.sliding_window(data, windowsize, train, inc=inc): experiments += 1 + benchmarks_only = {} + if dump: print('\nWindow: {0}\n'.format(ct)) for partition in partitions: @@ -524,7 +549,11 @@ def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, data_train_fs = partitioner(train, partition, transformation=transformation) for id, m in enumerate(pool,start=0): - job = cluster.submit(m, data_train_fs, train, test, ct, transformation) + if m.benchmark_only and m.shortname in benchmarks_only: + continue + else: + benchmarks_only[m.shortname] = m + job = cluster.submit(m, data_train_fs, train, test, steps, resolution, ct, transformation) job.id = id # associate an ID to identify jobs (if needed later) jobs.append(job) @@ -559,4 +588,4 @@ def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, http_server.shutdown() # this waits until browser gets all updates cluster.close() - return benchmarks.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic) + return bUtil.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic) diff --git a/ensemble.py b/ensemble.py index c8ddfbf..2895cb6 100644 --- a/ensemble.py +++ b/ensemble.py @@ -6,7 +6,7 @@ import pandas as pd import math from operator import itemgetter from pyFTS.common import FLR, FuzzySet, SortedCollection -from pyFTS import fts, chen, cheng, hofts, hwang, ismailefendi, sadaei, song, yu +from pyFTS import fts, chen, cheng, hofts, hwang, ismailefendi, sadaei, song, yu, sfts from pyFTS.benchmarks import arima, quantreg from pyFTS.common import Transformations import scipy.stats as st @@ -127,24 +127,36 @@ class EnsembleFTS(fts.FTS): if 'method' in kwargs: self.interval_method = kwargs.get('method','quantile') + if 'alpha' in kwargs: + self.alpha = kwargs.get('alpha', self.alpha) + ret = [] - samples = [[k,k] for k in data[-self.order:]] + samples = [[k] for k in data[-self.order:]] - for k in np.arange(self.order, steps+self.order): + for k in np.arange(self.order, steps + self.order): forecasts = [] - sample = samples[k - self.order : k] - lo_sample = [i[0] for i in sample] - up_sample = [i[1] for i in sample] - forecasts.extend(self.get_models_forecasts(lo_sample) ) - forecasts.extend(self.get_models_forecasts(up_sample)) + lags = {} + for i in np.arange(0, self.order): lags[i] = samples[k - self.order + i] + + # Build the tree with all possible paths + + root = tree.FLRGTreeNode(None) + + tree.buildTreeWithoutOrder(root, lags, 0) + + for p in root.paths(): + path = list(reversed(list(filter(None.__ne__, p)))) + + forecasts.extend(self.get_models_forecasts(path)) + + samples.append(sampler(forecasts, np.arange(0.1, 1, 0.2))) interval = self.get_interval(forecasts) if len(interval) == 1: interval = interval[0] ret.append(interval) - samples.append(interval) return ret @@ -183,7 +195,7 @@ class EnsembleFTS(fts.FTS): forecasts.extend(self.get_models_forecasts(path)) - samples.append(sampler(forecasts, [0.05, 0.25, 0.5, 0.75, 0.95 ])) + samples.append(sampler(forecasts, np.arange(0.1, 1, 0.1))) grid = self.gridCountPoint(grid, resolution, index, forecasts) @@ -197,7 +209,7 @@ class EnsembleFTS(fts.FTS): class AllMethodEnsembleFTS(EnsembleFTS): - def __init__(self, **kwargs): + def __init__(self, name, **kwargs): super(AllMethodEnsembleFTS, self).__init__(name="Ensemble FTS", **kwargs) self.min_order = 3 @@ -210,7 +222,7 @@ class AllMethodEnsembleFTS(EnsembleFTS): self.original_min = min(data) fo_methods = [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, cheng.TrendWeightedFTS, - sadaei.ExponentialyWeightedFTS, ismailefendi.ImprovedWeightedFTS] + sadaei.ExponentialyWeightedFTS, ismailefendi.ImprovedWeightedFTS, sfts.SeasonalFTS] ho_methods = [hofts.HighOrderFTS, hwang.HighOrderFTS] @@ -227,3 +239,5 @@ class AllMethodEnsembleFTS(EnsembleFTS): self.set_transformations(model) model.train(data, sets, order=o) self.appendModel(model) + + diff --git a/tests/ensemble.py b/tests/ensemble.py index a397454..0787dd7 100644 --- a/tests/ensemble.py +++ b/tests/ensemble.py @@ -25,7 +25,7 @@ passengers = pd.read_csv("DataSets/AirPassengers.csv", sep=",") passengers = np.array(passengers["Passengers"]) -e = ensemble.AllMethodEnsembleFTS() +e = ensemble.AllMethodEnsembleFTS(alpha=0.25, point_method="median", interval_method='quantile') fo_methods = [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, cheng.TrendWeightedFTS, sadaei.ExponentialyWeightedFTS, ismailefendi.ImprovedWeightedFTS] @@ -99,24 +99,28 @@ print(_normal) """ #""" -#_extremum = e.forecastAheadInterval(passengers, 10, method="extremum") -#print(_extremum) +_extremum = e.forecastAheadInterval(passengers, 10, method="extremum") +print(_extremum) -#_quantile = e.forecastAheadInterval(passengers[:50], 40, method="quantile", alpha=0.25) -#print(_quantile) +_quantile = e.forecastAheadInterval(passengers[:50], 10, method="quantile", alpha=0.05) +print(_quantile) +_quantile = e.forecastAheadInterval(passengers[:50], 10, method="quantile", alpha=0.25) +print(_quantile) -#_normal = e.forecastAheadInterval(passengers, 10, method="normal", alpha=0.25) -#print(_normal) +_normal = e.forecastAheadInterval(passengers[:50], 10, method="normal", alpha=0.05) +print(_normal) +_normal = e.forecastAheadInterval(passengers[:50], 10, method="normal", alpha=0.25) +print(_normal) #""" #dist = e.forecastAheadDistribution(passengers, 20) #print(dist) -bchmk.plot_compared_intervals_ahead(passengers[:120],[e], ['blue','red'], - distributions=[True,False], save=True, file="pictures/distribution_ahead_arma", - time_from=60, time_to=10, tam=[12,5]) +#bchmk.plot_compared_intervals_ahead(passengers[:120],[e], ['blue','red'], +# distributions=[True,False], save=True, file="pictures/distribution_ahead_arma", +# time_from=60, time_to=10, tam=[12,5]) diff --git a/tests/general.py b/tests/general.py index 70af200..8e40825 100644 --- a/tests/general.py +++ b/tests/general.py @@ -28,8 +28,8 @@ diff = Transformations.Differential(1) DATASETS """ -passengers = pd.read_csv("DataSets/AirPassengers.csv", sep=",") -passengers = np.array(passengers["Passengers"]) +#passengers = pd.read_csv("DataSets/AirPassengers.csv", sep=",") +#passengers = np.array(passengers["Passengers"]) #sunspots = pd.read_csv("DataSets/sunspots.csv", sep=",") #sunspots = np.array(sunspots["SUNACTIVITY"]) @@ -37,8 +37,8 @@ passengers = np.array(passengers["Passengers"]) #gauss = random.normal(0,1.0,5000) #gauss_teste = random.normal(0,1.0,400) -#taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") -#taiex = np.array(taiexpd["avg"][:5000]) +taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") +taiex = np.array(taiexpd["avg"][:5000]) #nasdaqpd = pd.read_csv("DataSets/NASDAQ_IXIC.csv", sep=",") #nasdaq = np.array(nasdaqpd["avg"][0:5000]) @@ -59,8 +59,8 @@ passengers = np.array(passengers["Passengers"]) #print(lag) #print(a) -from pyFTS.benchmarks import benchmarks as bchmk -#from pyFTS.benchmarks import distributed_benchmarks as bchmk +#from pyFTS.benchmarks import benchmarks as bchmk +from pyFTS.benchmarks import distributed_benchmarks as bchmk #from pyFTS.benchmarks import parallel_benchmarks as bchmk from pyFTS.benchmarks import Util from pyFTS.benchmarks import arima, quantreg, Measures @@ -68,7 +68,7 @@ from pyFTS.benchmarks import arima, quantreg, Measures #Util.cast_dataframe_to_synthetic_point("experiments/taiex_point_analitic.csv","experiments/taiex_point_sintetic.csv",11) #Util.plot_dataframe_point("experiments/taiex_point_sintetic.csv","experiments/taiex_point_analitic.csv",11) -#""" +""" arima100 = arima.ARIMA("", alpha=0.25) #tmp.appendTransformation(diff) arima100.train(passengers, None, order=(1,0,0)) @@ -137,6 +137,18 @@ bchmk.interval_sliding_window(sp500, 2000, train=0.8, inc=0.2, #models=[yu.Weigh #""" +bchmk.ahead_sliding_window(taiex, 2000, steps=10, resolution=100, train=0.8, inc=0.1, + partitioners=[Grid.GridPartitioner], + partitions= np.arange(10,200,step=10), + dump=True, save=True, file="experiments/taiex_ahead_analytic.csv", + nodes=['192.168.0.105', '192.168.0.106', '192.168.0.108', '192.168.0.109']) #, depends=[hofts, ifts]) + +bchmk.ahead_sliding_window(taiex, 2000, steps=10, resolution=100, train=0.8, inc=0.1, + partitioners=[Grid.GridPartitioner], + partitions= np.arange(3,20,step=2), transformation=diff, + dump=True, save=True, file="experiments/taiex_ahead_analytic_diff.csv", + nodes=['192.168.0.105', '192.168.0.106', '192.168.0.108', '192.168.0.109']) #, depends=[hofts, ifts]) + """ from pyFTS.partitioners import Grid from pyFTS import pwfts