From 692503704e1a1091c9e7d3f65fd9e535347a8ed6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido=20de=20Lima=20e=20Silva?= Date: Mon, 8 May 2017 14:49:45 -0300 Subject: [PATCH] - Several bugfixes in benchmarks methods and optimizations --- benchmarks/arima.py | 51 +++++++++++--------- benchmarks/benchmarks.py | 71 ++-------------------------- benchmarks/distributed_benchmarks.py | 53 ++++++++++++++++----- benchmarks/naive.py | 2 +- benchmarks/quantreg.py | 20 ++++---- tests/general.py | 17 ++++--- 6 files changed, 93 insertions(+), 121 deletions(-) diff --git a/benchmarks/arima.py b/benchmarks/arima.py index d1d0147..20ba53c 100644 --- a/benchmarks/arima.py +++ b/benchmarks/arima.py @@ -10,7 +10,7 @@ class ARIMA(fts.FTS): """ Façade for statsmodels.tsa.arima_model """ - def __init__(self, order, **kwargs): + def __init__(self, name, **kwargs): super(ARIMA, self).__init__(1, "ARIMA") self.name = "ARIMA" self.detail = "Auto Regressive Integrated Moving Average" @@ -24,34 +24,41 @@ class ARIMA(fts.FTS): self.benchmark_only = True self.min_order = 1 - def train(self, data, sets, order=1, parameters=None): - if parameters is not None: - self.p = parameters[0] - self.d = parameters[1] - self.q = parameters[2] - self.order = max([self.p, self.d, self.q]) - self.shortname = "ARIMA(" + str(self.p) + "," + str(self.d) + "," + str(self.q) + ")" + def train(self, data, sets, order=(2,1,1), parameters=None): + self.p = order[0] + self.d = order[1] + self.q = order[2] + self.order = max([self.p, self.d, self.q]) + self.shortname = "ARIMA(" + str(self.p) + "," + str(self.d) + "," + str(self.q) + ")" old_fit = self.model_fit self.model = stats_arima(data, order=(self.p, self.d, self.q)) - try: - self.model_fit = self.model.fit(disp=0) - except: - try: - self.model = stats_arima(data, order=(self.p, self.d, self.q)) - self.model_fit = self.model.fit(disp=1) - except: - self.model_fit = old_fit + self.model_fit = self.model.fit(disp=0) - self.trained_data = data #.tolist() + def ar(self, data): + return data.dot(self.model_fit.arparams) + + def ma(self, data): + return data.dot(self.model_fit.maparams) def forecast(self, data, **kwargs): if self.model_fit is None: return np.nan + + ndata = np.array(self.doTransformations(data)) + + l = len(ndata) + ret = [] - for t in data: - output = self.model_fit.forecast() - ret.append( output[0] ) - self.trained_data = np.append(self.trained_data, t) #.append(t) - self.train(self.trained_data,None,order=self.order, parameters=(self.p, self.d, self.q)) + + ar = np.array([self.ar(ndata[k - self.p: k]) for k in np.arange(self.p, l)]) + + residuals = np.array([ar[k - self.p] - ndata[k] for k in np.arange(self.p, l)]) + + ma = np.array([self.ma(residuals[k - self.q: k]) for k in np.arange(self.q, len(ar) + 1)]) + + ret = ar + ma + + ret = self.doInverseTransformations(ret, params=[data[self.order - 1:]]) + return ret \ No newline at end of file diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 4bd2d12..aa82950 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -60,74 +60,10 @@ def get_probabilistic_methods(): return [quantreg.QuantileRegression, ensemble.EnsembleFTS, pwfts.ProbabilisticWeightedFTS] -def external_point_sliding_window(models, parameters, data, windowsize,train=0.8, dump=False, - save=False, file=None, sintetic=True): - """ - Sliding window benchmarks for non FTS point forecasters - :param models: non FTS point forecasters - :param parameters: parameters for each model - :param data: data set - :param windowsize: size of sliding window - :param train: percentual of sliding window data used to train the models - :param dump: - :param save: save results - :param file: file path to save the results - :param sintetic: if true only the average and standard deviation of the results - :return: DataFrame with the results - """ - objs = {} - lcolors = {} - rmse = {} - smape = {} - u = {} - times = {} - - experiments = 0 - for ct, train, test in Util.sliding_window(data, windowsize, train): - experiments += 1 - for count, method in enumerate(models, start=0): - model = method("") - - _start = time.time() - model.train(train, None, parameters=parameters[count]) - _end = time.time() - - _key = model.shortname - - if dump: print(ct, _key) - - if _key not in objs: - objs[_key] = model - lcolors[_key] = colors[count % ncol] - rmse[_key] = [] - smape[_key] = [] - u[_key] = [] - times[_key] = [] - - _tdiff = _end - _start - - try: - _start = time.time() - _rmse, _smape, _u = Measures.get_point_statistics(test, model, None) - _end = time.time() - rmse[_key].append(_rmse) - smape[_key].append(_smape) - u[_key].append(_u) - _tdiff += _end - _start - times[_key].append(_tdiff) - if dump: print(_rmse, _smape, _u, _tdiff) - except: - rmse[_key].append(np.nan) - smape[_key].append(np.nan) - u[_key].append(np.nan) - times[_key].append(np.nan) - - return Util.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) - - def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, - save=False, file=None, sintetic=True): + partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, + benchmark_models=None, benchmark_models_parameters = None, + save=False, file=None, sintetic=True): """ Sliding window benchmarks for FTS point forecasters :param data: @@ -153,7 +89,6 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G if models is None: models = get_point_methods() - objs = {} lcolors = {} rmse = {} diff --git a/benchmarks/distributed_benchmarks.py b/benchmarks/distributed_benchmarks.py index e1f20f7..97bc130 100644 --- a/benchmarks/distributed_benchmarks.py +++ b/benchmarks/distributed_benchmarks.py @@ -13,7 +13,7 @@ import dispy import dispy.httpd import numpy as np -from pyFTS.benchmarks import benchmarks, Util as bUtil +from pyFTS.benchmarks import benchmarks, Util as bUtil, naive, quantreg, arima from pyFTS.common import Util from pyFTS.partitioners import Grid @@ -31,22 +31,28 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo :return: a dictionary with the benchmark results """ import time - from pyFTS import yu,chen,hofts,ifts,pwfts,ismailefendi,sadaei + from pyFTS import yu,chen,hofts,ifts,pwfts,ismailefendi,sadaei, song, cheng, hwang from pyFTS.partitioners import Grid, Entropy, FCM - from pyFTS.benchmarks import Measures + from pyFTS.benchmarks import Measures, naive, arima, quantreg - tmp = [yu.WeightedFTS, chen.ConventionalFTS, hofts.HighOrderFTS, ifts.IntervalFTS, - pwfts.ProbabilisticWeightedFTS, ismailefendi.ImprovedWeightedFTS, sadaei.ExponentialyWeightedFTS] + tmp = [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS, + cheng.TrendWeightedFTS, sadaei.ExponentialyWeightedFTS, hofts.HighOrderFTS, hwang.HighOrderFTS, + pwfts.ProbabilisticWeightedFTS] tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] + tmp4 = [naive.Naive, arima.ARIMA, quantreg.QuantileRegression] + tmp3 = [Measures.get_point_statistics] - pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) - mfts.partitioner = partitioner - if transformation is not None: - mfts.appendTransformation(transformation) + if mfts.benchmark_only: + _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + else: + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) _start = time.time() mfts.train(train_data, partitioner.sets, order=mfts.order) @@ -65,6 +71,7 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + benchmark_models=None, benchmark_models_parameters = None, save=False, file=None, sintetic=False,nodes=None, depends=None): """ Distributed sliding window benchmarks for FTS point forecasters @@ -78,6 +85,8 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= :param transformation: data transformation :param indexer: seasonal indexer :param dump: + :param benchmark_models: Non FTS models to benchmark + :param benchmark_models_parameters: Non FTS models parameters :param save: save results :param file: file path to save the results :param sintetic: if true only the average and standard deviation of the results @@ -86,6 +95,13 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= :return: DataFrame with the results """ + if benchmark_models is None: + benchmark_models = [naive.Naive, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, + quantreg.QuantileRegression, quantreg.QuantileRegression] + + if benchmark_models_parameters is None: + benchmark_models_parameters = [None, (1, 0, 1), (1, 1, 1), (2, 1, 1), (2, 1, 2), 1, 2] + cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) http_server = dispy.httpd.DispyHTTPServer(cluster) @@ -116,11 +132,20 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= pool.append(mfts) else: pool.append(mfts) + mfts.order = 1 + pool.append(mfts) + + for count, model in enumerate(benchmark_models, start=0): + mfts = model("") + mfts.order = benchmark_models_parameters[count] + pool.append(mfts) experiments = 0 for ct, train, test in Util.sliding_window(data, windowsize, train): experiments += 1 + benchmarks_only = {} + if dump: print('\nWindow: {0}\n'.format(ct)) for partition in partitions: @@ -129,9 +154,13 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= data_train_fs = partitioner(train, partition, transformation=transformation) - for id, m in enumerate(pool,start=0): + for _id, m in enumerate(pool,start=0): + if m.benchmark_only and m.shortname in benchmarks_only: + continue + else: + benchmarks_only[m.shortname] = m job = cluster.submit(m, data_train_fs, train, test, ct, transformation) - job.id = id # associate an ID to identify jobs (if needed later) + job.id = _id # associate an ID to identify jobs (if needed later) jobs.append(job) for job in jobs: diff --git a/benchmarks/naive.py b/benchmarks/naive.py index 1441246..0163942 100644 --- a/benchmarks/naive.py +++ b/benchmarks/naive.py @@ -6,7 +6,7 @@ from pyFTS import fts class Naive(fts.FTS): """Naïve Forecasting method""" - def __init__(self, order, name, **kwargs): + def __init__(self, name, **kwargs): super(Naive, self).__init__(1, "Naive " + name) self.name = "Naïve Model" self.detail = "Naïve Model" diff --git a/benchmarks/quantreg.py b/benchmarks/quantreg.py index c8d6a31..6403b56 100644 --- a/benchmarks/quantreg.py +++ b/benchmarks/quantreg.py @@ -9,7 +9,7 @@ from pyFTS import fts class QuantileRegression(fts.FTS): """Façade for statsmodels.regression.quantile_regression""" - def __init__(self, order, **kwargs): + def __init__(self, name, **kwargs): super(QuantileRegression, self).__init__(1, "QR") self.name = "QR" self.detail = "Quantile Regression" @@ -27,19 +27,21 @@ class QuantileRegression(fts.FTS): def train(self, data, sets, order=1, parameters=None): self.order = order - if parameters is not None: - self.alpha = parameters + self.alpha = parameters tmp = np.array(self.doTransformations(data)) lagdata, ndata = lagmat(tmp, maxlag=order, trim="both", original='sep') - uqt = QuantReg(ndata, lagdata).fit(1 - self.alpha) - mqt = QuantReg(ndata, lagdata).fit(0.5) - lqt = QuantReg(ndata, lagdata).fit(self.alpha) - self.upper_qt = [uqt.params[k] for k in uqt.params.keys()] - self.mean_qt = [mqt.params[k] for k in mqt.params.keys()] - self.lower_qt = [lqt.params[k] for k in lqt.params.keys()] + mqt = QuantReg(ndata, lagdata).fit(0.5) + if self.alpha is not None: + uqt = QuantReg(ndata, lagdata).fit(1 - self.alpha) + lqt = QuantReg(ndata, lagdata).fit(self.alpha) + + self.mean_qt = [k for k in mqt.params] + if self.alpha is not None: + self.upper_qt = [uqt.params[k] for k in uqt.params.keys()] + self.lower_qt = [lqt.params[k] for k in lqt.params.keys()] def linearmodel(self,data,params): return params[0] + sum([ data[k] * params[k+1] for k in np.arange(0, self.order) ]) diff --git a/tests/general.py b/tests/general.py index ec794ab..a5c868f 100644 --- a/tests/general.py +++ b/tests/general.py @@ -38,9 +38,8 @@ os.chdir("/home/petronio/dados/Dropbox/Doutorado/Codigos/") #gauss = random.normal(0,1.0,5000) #gauss_teste = random.normal(0,1.0,400) - -#taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") -#taiex = np.array(taiexpd["avg"][:5000]) +taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") +taiex = np.array(taiexpd["avg"][:5000]) #nasdaqpd = pd.read_csv("DataSets/NASDAQ_IXIC.csv", sep=",") #nasdaq = np.array(nasdaqpd["avg"][0:5000]) @@ -62,7 +61,7 @@ from pyFTS.benchmarks import Util #Util.cast_dataframe_to_sintetic_point("experiments/taiex_point_analitic.csv","experiments/taiex_point_sintetic.csv",11) -Util.plot_dataframe_point("experiments/taiex_point_sintetic.csv","experiments/taiex_point_analitic.csv",11) +#Util.plot_dataframe_point("experiments/taiex_point_sintetic.csv","experiments/taiex_point_analitic.csv",11) #tmp = arima.ARIMA("") #tmp.train(taiex[:1600],None,parameters=(2,0,1)) @@ -72,11 +71,11 @@ Util.plot_dataframe_point("experiments/taiex_point_sintetic.csv","experiments/ta #bchmk.teste(taiex,['192.168.0.109', '192.168.0.101']) -#bchmk.point_sliding_window(gauss,2000,train=0.8, #models=[yu.WeightedFTS], # # -# partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], -# partitions= np.arange(3,10,step=1), #transformation=diff, -# dump=True, save=True, file="experiments/gauss_point_distributed.csv", -# nodes=['192.168.0.102', '192.168.0.109']) #, depends=[hofts, ifts]) +bchmk.point_sliding_window(taiex,2000,train=0.8, #models=[yu.WeightedFTS], # # + partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], + partitions= np.arange(10,200,step=10), #transformation=diff, + dump=True, save=True, file="experiments/taiex_point_analytic.csv", + nodes=['192.168.0.102', '192.168.0.109']) #, depends=[hofts, ifts]) #bchmk.testa(taiex,[10,20],partitioners=[Grid.GridPartitioner], nodes=['192.168.0.109', '192.168.0.101'])