From 2c0e685e878f576fb5bf7d3146e4a384ab4fd7ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido=20de=20Lima=20e=20Silva?= Date: Sat, 13 May 2017 21:03:49 -0300 Subject: [PATCH] - Optimizations for interval distributed benchmarks --- benchmarks/Measures.py | 34 +++++++++++++ benchmarks/Util.py | 25 +++++++-- benchmarks/arima.py | 66 ++++++++++++++++++++++++ benchmarks/distributed_benchmarks.py | 76 ++++++++++++++++++++++------ benchmarks/quantreg.py | 4 +- ensemble.py | 2 +- fts.py | 8 ++- tests/general.py | 59 ++++++++++----------- 8 files changed, 219 insertions(+), 55 deletions(-) diff --git a/benchmarks/Measures.py b/benchmarks/Measures.py index 1ce1bd1..abdc9af 100644 --- a/benchmarks/Measures.py +++ b/benchmarks/Measures.py @@ -164,6 +164,36 @@ def coverage(targets, forecasts): return np.mean(preds) +def pinball(tau, target, forecast): + """ + Pinball loss function. Measure the distance of forecast to the tau-quantile of the target + :param tau: quantile value in the range (0,1) + :param target: + :param forecast: + :return: distance of forecast to the tau-quantile of the target + """ + if target >= forecast: + return (target - forecast) * tau + else: + return (forecast - target) * (1 - tau) + + +def pinball_mean(tau, targets, forecasts): + """ + Mean pinball loss value of the forecast for a given tau-quantile of the targets + :param tau: quantile value in the range (0,1) + :param targets: list of target values + :param forecasts: list of prediction intervals + :return: + """ + preds = [] + if tau <= 0.5: + preds = [pinball(tau, targets[i], forecasts[i][0]) for i in np.arange(0, len(forecasts))] + else: + preds = [pinball(tau, targets[i], forecasts[i][1]) for i in np.arange(0, len(forecasts))] + return np.nanmean(preds) + + def pmf_to_cdf(density): ret = [] for row in density.index: @@ -248,6 +278,10 @@ def get_interval_statistics(original, model): ret.append(round(sharpness(forecasts), 2)) ret.append(round(resolution(forecasts), 2)) ret.append(round(coverage(original[model.order:], forecasts[:-1]), 2)) + ret.append(round(pinball_mean(0.05, original[model.order:], forecasts[:-1]), 2)) + ret.append(round(pinball_mean(0.25, original[model.order:], forecasts[:-1]), 2)) + ret.append(round(pinball_mean(0.75, original[model.order:], forecasts[:-1]), 2)) + ret.append(round(pinball_mean(0.95, original[model.order:], forecasts[:-1]), 2)) return ret diff --git a/benchmarks/Util.py b/benchmarks/Util.py index 7289bb0..a92a114 100644 --- a/benchmarks/Util.py +++ b/benchmarks/Util.py @@ -240,8 +240,7 @@ def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam): plt.show() - -def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times): +def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times, q05, q25, q75, q95): ret = [] if synthetic: for k in sorted(objs.keys()): @@ -265,6 +264,14 @@ def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, mod.append(round(np.nanstd(coverage[k]), 2)) mod.append(round(np.nanmean(times[k]), 2)) mod.append(round(np.nanstd(times[k]), 2)) + mod.append(round(np.nanmean(q05[k]), 2)) + mod.append(round(np.nanstd(q05[k]), 2)) + mod.append(round(np.nanmean(q25[k]), 2)) + mod.append(round(np.nanstd(q25[k]), 2)) + mod.append(round(np.nanmean(q75[k]), 2)) + mod.append(round(np.nanstd(q75[k]), 2)) + mod.append(round(np.nanmean(q95[k]), 2)) + mod.append(round(np.nanstd(q95[k]), 2)) mod.append(l) ret.append(mod) @@ -296,6 +303,18 @@ def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, tmp = [n, o, s, p, l, 'TIME'] tmp.extend(times[k]) ret.append(deepcopy(tmp)) + tmp = [n, o, s, p, l, 'Q05'] + tmp.extend(q05[k]) + ret.append(deepcopy(tmp)) + tmp = [n, o, s, p, l, 'Q25'] + tmp.extend(q25[k]) + ret.append(deepcopy(tmp)) + tmp = [n, o, s, p, l, 'Q75'] + tmp.extend(q75[k]) + ret.append(deepcopy(tmp)) + tmp = [n, o, s, p, l, 'Q95'] + tmp.extend(q95[k]) + ret.append(deepcopy(tmp)) except Exception as ex: print("Erro ao salvar ", k) print("Exceção ", ex) @@ -317,7 +336,7 @@ def interval_dataframe_analytic_columns(experiments): def interval_dataframe_synthetic_columns(): columns = ["Model", "Order", "Scheme", "Partitions", "SHARPAVG", "SHARPSTD", "RESAVG", "RESSTD", "COVAVG", - "COVSTD", "TIMEAVG", "TIMESTD", "SIZE"] + "COVSTD", "TIMEAVG", "TIMESTD", "Q05AVG", "Q05STD", "Q25AVG", "Q25STD", "Q75AVG", "Q75STD", "Q95AVG", "Q95STD", "SIZE"] return columns diff --git a/benchmarks/arima.py b/benchmarks/arima.py index fce1705..0ccfea2 100644 --- a/benchmarks/arima.py +++ b/benchmarks/arima.py @@ -3,6 +3,7 @@ import numpy as np from statsmodels.tsa.arima_model import ARIMA as stats_arima +import scipy.stats as st from pyFTS import fts @@ -15,6 +16,8 @@ class ARIMA(fts.FTS): self.name = "ARIMA" self.detail = "Auto Regressive Integrated Moving Average" self.is_high_order = True + self.has_point_forecasting = True + self.has_interval_forecasting = True self.model = None self.model_fit = None self.trained_data = None @@ -23,6 +26,7 @@ class ARIMA(fts.FTS): self.q = 0 self.benchmark_only = True self.min_order = 1 + self.alpha = (1 - kwargs.get("alpha", 0.90))/2 def train(self, data, sets, order, parameters=None): self.p = order[0] @@ -35,6 +39,7 @@ class ARIMA(fts.FTS): try: self.model = stats_arima(data, order=(self.p, self.d, self.q)) self.model_fit = self.model.fit(disp=0) + print(np.sqrt(self.model_fit.sigma2)) except Exception as ex: print(ex) self.model_fit = None @@ -71,4 +76,65 @@ class ARIMA(fts.FTS): ret = self.doInverseTransformations(ret, params=[data[self.order - 1:]]) + return ret + + def forecastInterval(self, data, **kwargs): + + if self.model_fit is None: + return np.nan + + sigma = np.sqrt(self.model_fit.sigma2) + + ndata = np.array(self.doTransformations(data)) + + l = len(ndata) + + ret = [] + + for k in np.arange(self.order, l+1): + tmp = [] + + sample = [ndata[i] for i in np.arange(k - self.order, k)] + + mean = self.forecast(sample)[0] + + tmp.append(mean + st.norm.ppf(self.alpha) * sigma) + tmp.append(mean + st.norm.ppf(1 - self.alpha) * sigma) + + ret.append(tmp) + + ret = self.doInverseTransformations(ret, params=[data[self.order - 1:]]) + + return ret + + def forecastAheadInterval(self, data, steps, **kwargs): + if self.model_fit is None: + return np.nan + + smoothing = kwargs.get("smoothing",0.2) + + alpha = (1 - kwargs.get("alpha", 0.95))/2 + + sigma = np.sqrt(self.model_fit.sigma2) + + ndata = np.array(self.doTransformations(data)) + + l = len(ndata) + + means = self.forecastAhead(data,steps,kwargs) + + ret = [] + + for k in np.arange(0, steps): + tmp = [] + + hsigma = (1 + k*smoothing)*sigma + + tmp.append(means[k] + st.norm.ppf(alpha) * hsigma) + tmp.append(means[k] + st.norm.ppf(1 - alpha) * hsigma) + + ret.append(tmp) + + ret = self.doInverseTransformations(ret, params=[data[self.order - 1:]]) + return ret \ No newline at end of file diff --git a/benchmarks/distributed_benchmarks.py b/benchmarks/distributed_benchmarks.py index dee83fd..5f22754 100644 --- a/benchmarks/distributed_benchmarks.py +++ b/benchmarks/distributed_benchmarks.py @@ -73,7 +73,7 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo return ret -def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], +def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, benchmark_models=None, benchmark_models_parameters = None, save=False, file=None, sintetic=False,nodes=None, depends=None): @@ -82,6 +82,7 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= :param data: :param windowsize: size of sliding window :param train: percentual of sliding window data used to train the models + :param inc: percentual of window is used do increment :param models: FTS point forecasters :param partitioners: Universe of Discourse partitioner :param partitions: the max number of partitions on the Universe of Discourse @@ -146,7 +147,7 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= pool.append(mfts) experiments = 0 - for ct, train, test in Util.sliding_window(data, windowsize, train): + for ct, train, test in Util.sliding_window(data, windowsize, train, inc): experiments += 1 benchmarks_only = {} @@ -223,13 +224,18 @@ def run_interval(mfts, partitioner, train_data, test_data, window_key=None, tran tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] + tmp4 = [arima.ARIMA, quantreg.QuantileRegression] + tmp3 = [Measures.get_interval_statistics] - pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) - mfts.partitioner = partitioner - if transformation is not None: - mfts.appendTransformation(transformation) + if mfts.benchmark_only: + _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha) + else: + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) _start = time.time() mfts.train(train_data, partitioner.sets, order=mfts.order) @@ -237,24 +243,26 @@ def run_interval(mfts, partitioner, train_data, test_data, window_key=None, tran times = _end - _start _start = time.time() - _sharp, _res, _cov = Measures.get_interval_statistics(test_data, mfts) + _sharp, _res, _cov, _q05, _q25, _q75, _q95 = Measures.get_interval_statistics(test_data, mfts) _end = time.time() times += _end - _start ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times, - 'window': window_key} + 'Q05': _q05, 'Q25': _q25, 'Q75': _q75, 'Q95': _q95, 'window': window_key} return ret -def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, - save=False, file=None, sintetic=False,nodes=None, depends=None): +def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + benchmark_models=None, benchmark_models_parameters = None, + save=False, file=None, sintetic=False,nodes=None, depends=None): """ Distributed sliding window benchmarks for FTS interval forecasters :param data: :param windowsize: size of sliding window :param train: percentual of sliding window data used to train the models + :param inc: :param models: FTS point forecasters :param partitioners: Universe of Discourse partitioner :param partitions: the max number of partitions on the Universe of Discourse @@ -262,6 +270,8 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione :param transformation: data transformation :param indexer: seasonal indexer :param dump: + :param benchmark_models: + :param benchmark_models_parameters: :param save: save results :param file: file path to save the results :param sintetic: if true only the average and standard deviation of the results @@ -270,6 +280,15 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione :return: DataFrame with the results """ + alphas = [0.5, 0.25] + + if benchmark_models is None and models is None: + benchmark_models = [arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, + quantreg.QuantileRegression, quantreg.QuantileRegression] + + if benchmark_models_parameters is None: + benchmark_models_parameters = [(1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2] + cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) http_server = dispy.httpd.DispyHTTPServer(cluster) @@ -284,6 +303,10 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione sharpness = {} resolution = {} coverage = {} + q05 = {} + q25 = {} + q75 = {} + q95 = {} times = {} if models is None: @@ -299,12 +322,23 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione mfts.order = order pool.append(mfts) else: + mfts.order = 1 pool.append(mfts) + if benchmark_models is not None: + for count, model in enumerate(benchmark_models, start=0): + for a in alphas: + par = benchmark_models_parameters[count] + mfts = model(str(par if par is not None else ""), alpha=a) + mfts.order = par + pool.append(mfts) + experiments = 0 - for ct, train, test in Util.sliding_window(data, windowsize, train): + for ct, train, test in Util.sliding_window(data, windowsize, train, inc=inc): experiments += 1 + benchmarks_only = {} + if dump: print('\nWindow: {0}\n'.format(ct)) for partition in partitions: @@ -314,6 +348,10 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione data_train_fs = partitioner(train, partition, transformation=transformation) for id, m in enumerate(pool,start=0): + if m.benchmark_only and m.shortname in benchmarks_only: + continue + else: + benchmarks_only[m.shortname] = m job = cluster.submit(m, data_train_fs, train, test, ct, transformation) job.id = id # associate an ID to identify jobs (if needed later) jobs.append(job) @@ -327,11 +365,19 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione resolution[tmp['key']] = [] coverage[tmp['key']] = [] times[tmp['key']] = [] + q05[tmp['key']] = [] + q25[tmp['key']] = [] + q75[tmp['key']] = [] + q95[tmp['key']] = [] sharpness[tmp['key']].append(tmp['sharpness']) resolution[tmp['key']].append(tmp['resolution']) coverage[tmp['key']].append(tmp['coverage']) times[tmp['key']].append(tmp['time']) + q05[tmp['key']].append(tmp['Q05']) + q25[tmp['key']].append(tmp['Q25']) + q75[tmp['key']].append(tmp['Q75']) + q95[tmp['key']].append(tmp['Q95']) print(tmp['key']) else: print(job.exception) @@ -350,8 +396,8 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione http_server.shutdown() # this waits until browser gets all updates cluster.close() - return benchmarks.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, - times) + return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, + times, q05, q25, q75, q95) def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, window_key=None, transformation=None, indexer=None): diff --git a/benchmarks/quantreg.py b/benchmarks/quantreg.py index ef73220..e031803 100644 --- a/benchmarks/quantreg.py +++ b/benchmarks/quantreg.py @@ -19,7 +19,7 @@ class QuantileRegression(fts.FTS): self.has_probability_forecasting = True self.benchmark_only = True self.minOrder = 1 - self.alpha = 0.5 + self.alpha = kwargs.get("alpha", 0.05) self.upper_qt = None self.mean_qt = None self.lower_qt = None @@ -27,8 +27,6 @@ class QuantileRegression(fts.FTS): def train(self, data, sets, order=1, parameters=None): self.order = order - self.alpha = parameters - tmp = np.array(self.doTransformations(data)) lagdata, ndata = lagmat(tmp, maxlag=order, trim="both", original='sep') diff --git a/ensemble.py b/ensemble.py index 9d0dc32..17eb578 100644 --- a/ensemble.py +++ b/ensemble.py @@ -72,7 +72,7 @@ class EnsembleFTS(fts.FTS): ret = [] - for k in np.arange(0, l): + for k in np.arange(0, l+1): tmp = [] for model in self.models: if k >= model.minOrder - 1: diff --git a/fts.py b/fts.py index 73d6352..ab9a832 100644 --- a/fts.py +++ b/fts.py @@ -88,7 +88,13 @@ class FTS(object): :param kwargs: :return: """ - pass + ret = [] + for k in np.arange(0,steps): + tmp = self.forecast(data[-self.order:],kwargs) + ret.append(tmp) + data.append(tmp) + + return ret def forecastAheadInterval(self, data, steps, **kwargs): """ diff --git a/tests/general.py b/tests/general.py index c8e9f34..643301f 100644 --- a/tests/general.py +++ b/tests/general.py @@ -22,40 +22,36 @@ os.chdir("/home/petronio/dados/Dropbox/Doutorado/Codigos/") #enrollments = pd.read_csv("DataSets/Enrollments.csv", sep=";") #enrollments = np.array(enrollments["Enrollments"]) -#from pyFTS import song - -#enrollments_fs = Grid.GridPartitioner(enrollments, 10).sets - -#model = song.ConventionalFTS('') -#model.train(enrollments,enrollments_fs) -#teste = model.forecast(enrollments) - -#print(teste) - - -#print(FCM.FCMPartitionerTrimf.__module__) +""" +DATASETS +""" #gauss = random.normal(0,1.0,5000) #gauss_teste = random.normal(0,1.0,400) -taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") -taiex = np.array(taiexpd["avg"][:2000]) +#taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") +#taiex = np.array(taiexpd["avg"][:5000]) #nasdaqpd = pd.read_csv("DataSets/NASDAQ_IXIC.csv", sep=",") #nasdaq = np.array(nasdaqpd["avg"][0:5000]) -#from statsmodels.tsa.arima_model import ARIMA as stats_arima -from statsmodels.tsa.tsatools import lagmat +#sp500pd = pd.read_csv("DataSets/S&P500.csv", sep=",") +#sp500 = np.array(sp500pd["Avg"][11000:]) +#del(sp500pd) -#tmp = np.arange(10) +sondapd = pd.read_csv("DataSets/SONDA_BSB_HOURLY_AVG.csv", sep=";") +sondapd = sondapd.dropna(axis=0, how='any') +sonda = np.array(sondapd["ws_10m"]) +del(sondapd) -#lag, a = lagmat(tmp, maxlag=2, trim="both", original='sep') +#bestpd = pd.read_csv("DataSets/BEST_TAVG.csv", sep=";") +#best = np.array(bestpd["Anomaly"]) #print(lag) #print(a) -from pyFTS.benchmarks import benchmarks as bchmk -#from pyFTS.benchmarks import distributed_benchmarks as bchmk +#from pyFTS.benchmarks import benchmarks as bchmk +from pyFTS.benchmarks import distributed_benchmarks as bchmk #from pyFTS.benchmarks import parallel_benchmarks as bchmk from pyFTS.benchmarks import Util from pyFTS.benchmarks import arima, quantreg @@ -66,9 +62,9 @@ from pyFTS.benchmarks import arima, quantreg #tmp = arima.ARIMA("") #tmp.train(taiex[:1600], None, order=(2,0,2)) -#teste = tmp.forecast(taiex[1600:1605]) +#teste = tmp.forecastInterval(taiex[1600:1605]) -#tmp = quantreg.QuantileRegression("") +#tmp = quan#treg.QuantileRegression("") #tmp.train(taiex[:1600], None, order=2) #teste = tmp.forecast(taiex[1600:1605]) @@ -80,21 +76,20 @@ from pyFTS.benchmarks import arima, quantreg from pyFTS import song, chen, yu, cheng -bchmk.point_sliding_window(taiex,1000,train=0.8, models=[], #song.ConventionalFTS, chen.ConventionalFTS], #[yu.WeightedFTS, cheng.TrendWeightedFTS], # # +bchmk.point_sliding_window(sonda, 9000, train=0.8, inc=0.4,#models=[yu.WeightedFTS], # # partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], - partitions= [10], #np.arange(10,200,step=10), #transformation=diff, - dump=True, save=True, file="experiments/XXXtaiex_point_analytic.csv") #, -# nodes=['192.168.0.102', '192.168.0.109', '192.168.0.106']) #, depends=[hofts, ifts]) + partitions= np.arange(10,200,step=10), #transformation=diff, + dump=True, save=True, file="experiments/sondaws_point_analytic.csv", + nodes=['192.168.0.103', '192.168.0.106', '192.168.0.108', '192.168.0.109']) #, depends=[hofts, ifts]) -""" diff = Transformations.Differential(1) -bchmk.point_sliding_window(taiex,2000,train=0.8, #models=[yu.WeightedFTS], # # +bchmk.point_sliding_window(sonda, 9000, train=0.8, inc=0.4, #models=[yu.WeightedFTS], # # partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], - partitions= np.arange(10,200,step=10), transformation=diff, - dump=True, save=True, file="experiments/taiex_point_analytic_diff.csv", - nodes=['192.168.0.102', '192.168.0.109', '192.168.0.106']) #, depends=[hofts, ifts]) -""" + partitions= np.arange(3,20,step=2), #transformation=diff, + dump=True, save=True, file="experiments/sondaws_point_analytic_diff.csv", + nodes=['192.168.0.103', '192.168.0.106', '192.168.0.108', '192.168.0.109']) #, depends=[hofts, ifts]) +#""" #bchmk.testa(taiex,[10,20],partitioners=[Grid.GridPartitioner], nodes=['192.168.0.109', '192.168.0.101']) #parallel_util.explore_partitioners(taiex,20)