diff --git a/benchmarks/Measures.py b/benchmarks/Measures.py index 4ecd3af..1ce1bd1 100644 --- a/benchmarks/Measures.py +++ b/benchmarks/Measures.py @@ -202,6 +202,7 @@ def crps(targets, densities): def get_point_statistics(data, model, indexer=None): """Condensate all measures for point forecasters""" + if indexer is not None: ndata = np.array(indexer.get_data(data[model.order:])) else: @@ -212,22 +213,29 @@ def get_point_statistics(data, model, indexer=None): elif not model.is_multivariate and indexer is not None: forecasts = model.forecast(indexer.get_data(data)) - if model.has_seasonality: - nforecasts = np.array(forecasts) - else: - nforecasts = np.array(forecasts[:-1]) + try: + if model.has_seasonality: + nforecasts = np.array(forecasts) + else: + nforecasts = np.array(forecasts[:-1]) + except Exception as ex: + print(ex) + return [np.nan,np.nan,np.nan] ret = list() try: ret.append(np.round(rmse(ndata, nforecasts), 2)) - except: + except Exception as ex: + print(ex) ret.append(np.nan) try: ret.append(np.round(smape(ndata, nforecasts), 2)) - except: + except Exception as ex: + print(ex) ret.append(np.nan) try: ret.append(np.round(UStatistic(ndata, nforecasts), 2)) - except: + except Exception as ex: + print(ex) ret.append(np.nan) return ret diff --git a/benchmarks/Util.py b/benchmarks/Util.py index 5e2f1fe..7289bb0 100644 --- a/benchmarks/Util.py +++ b/benchmarks/Util.py @@ -48,7 +48,7 @@ def find_best(dataframe, criteria, ascending): return ret -def point_dataframe_sintetic_columns(): +def point_dataframe_synthetic_columns(): return ["Model", "Order", "Scheme", "Partitions", "Size", "RMSEAVG", "RMSESTD", "SMAPEAVG", "SMAPESTD", "UAVG", "USTD", "TIMEAVG", "TIMESTD"] @@ -64,7 +64,7 @@ def point_dataframe_analytic_columns(experiments): return columns -def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u): +def save_dataframe_point(experiments, file, objs, rmse, save, synthetic, smape, times, u): """ Create a dataframe to store the benchmark results :param experiments: dictionary with the execution results @@ -72,7 +72,7 @@ def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, t :param objs: :param rmse: :param save: - :param sintetic: + :param synthetic: :param smape: :param times: :param u: @@ -80,7 +80,7 @@ def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, t """ ret = [] - if sintetic: + if synthetic: for k in sorted(objs.keys()): try: @@ -109,7 +109,7 @@ def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, t print("Erro ao salvar ", k) print("Exceção ", ex) - columns = point_dataframe_sintetic_columns() + columns = point_dataframe_synthetic_columns() else: for k in sorted(objs.keys()): try: @@ -152,7 +152,7 @@ def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, t print(ret) -def cast_dataframe_to_sintetic_point(infile, outfile, experiments): +def cast_dataframe_to_synthetic_point(infile, outfile, experiments): columns = point_dataframe_analytic_columns(experiments) dat = pd.read_csv(infile, sep=";", usecols=columns) models = dat.Model.unique() @@ -190,7 +190,7 @@ def cast_dataframe_to_sintetic_point(infile, outfile, experiments): mod.append(np.round(np.nanstd(times), 4)) ret.append(mod) - dat = pd.DataFrame(ret, columns=point_dataframe_sintetic_columns()) + dat = pd.DataFrame(ret, columns=point_dataframe_synthetic_columns()) dat.to_csv(Util.uniquefilename(outfile), sep=";", index=False) @@ -208,7 +208,7 @@ def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam): axes[2].set_title('U Statistic') axes[3].set_title('Execution Time') - dat_syn = pd.read_csv(file_synthetic, sep=";", usecols=point_dataframe_sintetic_columns()) + dat_syn = pd.read_csv(file_synthetic, sep=";", usecols=point_dataframe_synthetic_columns()) bests = find_best(dat_syn, ['UAVG','RMSEAVG','USTD','RMSESTD'], [1,1,1,1]) @@ -241,9 +241,9 @@ def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam): -def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times): +def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times): ret = [] - if sintetic: + if synthetic: for k in sorted(objs.keys()): mod = [] mfts = objs[k] @@ -268,7 +268,7 @@ def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, mod.append(l) ret.append(mod) - columns = interval_dataframe_sintetic_columns() + columns = interval_dataframe_synthetic_columns() else: for k in sorted(objs.keys()): try: @@ -315,13 +315,13 @@ def interval_dataframe_analytic_columns(experiments): return columns -def interval_dataframe_sintetic_columns(): +def interval_dataframe_synthetic_columns(): columns = ["Model", "Order", "Scheme", "Partitions", "SHARPAVG", "SHARPSTD", "RESAVG", "RESSTD", "COVAVG", "COVSTD", "TIMEAVG", "TIMESTD", "SIZE"] return columns -def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic): +def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic): """ Save benchmark results for m-step ahead probabilistic forecasters :param experiments: @@ -332,12 +332,12 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim :param times1: :param times2: :param save: - :param sintetic: + :param synthetic: :return: """ ret = [] - if sintetic: + if synthetic: for k in sorted(objs.keys()): try: @@ -370,7 +370,7 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim print("Erro ao salvar ", k) print("Exceção ", ex) - columns = ahead_dataframe_sintetic_columns() + columns = ahead_dataframe_synthetic_columns() else: for k in sorted(objs.keys()): try: @@ -417,7 +417,7 @@ def ahead_dataframe_analytic_columns(experiments): return columns -def ahead_dataframe_sintetic_columns(): +def ahead_dataframe_synthetic_columns(): columns = ["Model", "Order", "Scheme", "Partitions", "CRPS1AVG", "CRPS1STD", "CRPS2AVG", "CRPS2STD", "SIZE", "TIME1AVG", "TIME2AVG"] return columns diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index aa82950..4aab96f 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -18,7 +18,8 @@ from mpl_toolkits.mplot3d import Axes3D from probabilistic import ProbabilityDistribution from pyFTS import song, chen, yu, ismailefendi, sadaei, hofts, pwfts, ifts, cheng, ensemble, hwang -from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, Util, quantreg +from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, quantreg +from pyFTS.benchmarks import Util as bUtil from pyFTS.common import Transformations, Util # from sklearn.cross_validation import KFold from pyFTS.partitioners import Grid @@ -60,10 +61,47 @@ def get_probabilistic_methods(): return [quantreg.QuantileRegression, ensemble.EnsembleFTS, pwfts.ProbabilisticWeightedFTS] -def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, +def run_point(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None): + """ + Point forecast benchmark function to be executed on sliding window + :param mfts: FTS model + :param partitioner: Universe of Discourse partitioner + :param train_data: data used to train the model + :param test_data: ata used to test the model + :param window_key: id of the sliding window + :param transformation: data transformation + :param indexer: seasonal indexer + :return: a dictionary with the benchmark results + """ + + if mfts.benchmark_only: + _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + else: + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) + + _start = time.time() + mfts.train(train_data, partitioner.sets, order=mfts.order) + _end = time.time() + times = _end - _start + + _start = time.time() + _rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer) + _end = time.time() + times += _end - _start + + ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times, 'window': window_key} + + return ret + + +def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, benchmark_models=None, benchmark_models_parameters = None, - save=False, file=None, sintetic=True): + save=False, file=None, sintetic=False): """ Sliding window benchmarks for FTS point forecasters :param data: @@ -76,116 +114,89 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G :param transformation: data transformation :param indexer: seasonal indexer :param dump: + :param benchmark_models: Non FTS models to benchmark + :param benchmark_models_parameters: Non FTS models parameters :param save: save results :param file: file path to save the results :param sintetic: if true only the average and standard deviation of the results :return: DataFrame with the results """ + if benchmark_models is None: # and models is None: + benchmark_models = [naive.Naive, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, + quantreg.QuantileRegression, quantreg.QuantileRegression] + + if benchmark_models_parameters is None: + benchmark_models_parameters = [1, (1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2] + _process_start = time.time() print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) - if models is None: - models = get_point_methods() - + pool = [] + jobs = [] objs = {} - lcolors = {} rmse = {} smape = {} u = {} times = {} + if models is None: + models = get_point_methods() + + for model in models: + mfts = model("") + + if mfts.is_high_order: + for order in np.arange(1, max_order + 1): + if order >= mfts.min_order: + mfts = model("") + mfts.order = order + pool.append(mfts) + else: + mfts.order = 1 + pool.append(mfts) + + if benchmark_models is not None: + for count, model in enumerate(benchmark_models, start=0): + par = benchmark_models_parameters[count] + mfts = model(str(par if par is not None else "")) + mfts.order = par + pool.append(mfts) + experiments = 0 - for ct, train,test in Util.sliding_window(data, windowsize, train): + for ct, train, test in Util.sliding_window(data, windowsize, train): experiments += 1 + + benchmarks_only = {} + + if dump: print('\nWindow: {0}\n'.format(ct)) + for partition in partitions: + for partitioner in partitioners: - pttr = str(partitioner.__module__).split('.')[-1] + data_train_fs = partitioner(train, partition, transformation=transformation) - for count, model in enumerate(models, start=0): - - mfts = model("") - - _key = mfts.shortname + " " + pttr + " q = " + str(partition) - - mfts.partitioner = data_train_fs - if not mfts.is_high_order: - - if dump: print(ct,_key) - - if _key not in objs: - objs[_key] = mfts - lcolors[_key] = colors[count % ncol] - rmse[_key] = [] - smape[_key] = [] - u[_key] = [] - times[_key] = [] - - if transformation is not None: - mfts.appendTransformation(transformation) - - - _start = time.time() - mfts.train(train, data_train_fs.sets) - _end = time.time() - times[_key].append(_end - _start) - - _start = time.time() - _rmse, _smape, _u = Measures.get_point_statistics(test, mfts, indexer) - _end = time.time() - rmse[_key].append(_rmse) - smape[_key].append(_smape) - u[_key].append(_u) - times[_key].append(_end - _start) - - if dump: print(_rmse, _smape, _u) - + for _id, m in enumerate(pool,start=0): + if m.benchmark_only and m.shortname in benchmarks_only: + continue else: - for order in np.arange(1, max_order + 1): - if order >= mfts.min_order: - mfts = model("") + benchmarks_only[m.shortname] = m - _key = mfts.shortname + " n = " + str(order) + " " + pttr + " q = " + str(partition) + tmp = run_point(deepcopy(m), data_train_fs, train, test, ct, transformation) - mfts.partitioner = data_train_fs - - if dump: print(ct,_key) - - if _key not in objs: - objs[_key] = mfts - lcolors[_key] = colors[count % ncol] - rmse[_key] = [] - smape[_key] = [] - u[_key] = [] - times[_key] = [] - - if transformation is not None: - mfts.appendTransformation(transformation) - - try: - _start = time.time() - mfts.train(train, data_train_fs.sets, order=order) - _end = time.time() - times[_key].append(_end - _start) - - _start = time.time() - _rmse, _smape, _u = Measures.get_point_statistics(test, mfts, indexer) - _end = time.time() - rmse[_key].append(_rmse) - smape[_key].append(_smape) - u[_key].append(_u) - times[_key].append(_end - _start) - - if dump: print(_rmse, _smape, _u) - - except Exception as e: - print(e) - rmse[_key].append(np.nan) - smape[_key].append(np.nan) - u[_key].append(np.nan) - times[_key].append(np.nan) + if tmp['key'] not in objs: + objs[tmp['key']] = tmp['obj'] + rmse[tmp['key']] = [] + smape[tmp['key']] = [] + u[tmp['key']] = [] + times[tmp['key']] = [] + rmse[tmp['key']].append(tmp['rmse']) + smape[tmp['key']].append(tmp['smape']) + u[tmp['key']].append(tmp['u']) + times[tmp['key']].append(tmp['time']) + print(tmp['key'], tmp['window']) _process_end = time.time() @@ -193,7 +204,8 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G print("Process Duration: {0}".format(_process_end - _process_start)) - return Util.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) + return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) + def all_point_forecasters(data_train, data_test, partitions, max_order=3, statistics=True, residuals=True, @@ -309,9 +321,9 @@ def getProbabilityDistributionStatistics(pmfs, data): -def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, - save=False, file=None, sintetic=True): +def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, synthetic=True): if models is None: models = get_interval_methods() @@ -400,7 +412,7 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners coverage[_key].append(_cov) times[_key].append(_tdiff) - return Util.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times) + return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times) def all_interval_forecasters(data_train, data_test, partitions, max_order=3,save=False, file=None, tam=[20, 5], @@ -522,8 +534,8 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]): def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution = None, partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, - save=False, file=None, sintetic=False): + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, synthetic=False): if models is None: models = [pwfts.ProbabilisticWeightedFTS] @@ -614,7 +626,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution if dump: print(_crps1, _crps2, _tdiff, _t1, _t2) - return Util.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic) + return bUtil.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic) def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resolution = None, max_order=3,save=False, file=None, tam=[20, 5], diff --git a/benchmarks/distributed_benchmarks.py b/benchmarks/distributed_benchmarks.py index 8e05036..dee83fd 100644 --- a/benchmarks/distributed_benchmarks.py +++ b/benchmarks/distributed_benchmarks.py @@ -99,12 +99,12 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= :return: DataFrame with the results """ - if benchmark_models is None: + if benchmark_models is None and models is None: benchmark_models = [naive.Naive, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, quantreg.QuantileRegression, quantreg.QuantileRegression] if benchmark_models_parameters is None: - benchmark_models_parameters = [None, (1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2] + benchmark_models_parameters = [1, (1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2] cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) @@ -135,15 +135,15 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= mfts.order = order pool.append(mfts) else: - pool.append(mfts) mfts.order = 1 pool.append(mfts) - for count, model in enumerate(benchmark_models, start=0): - par = benchmark_models_parameters[count] - mfts = model(str(par if par is not None else "")) - mfts.order = par - pool.append(mfts) + if benchmark_models is not None: + for count, model in enumerate(benchmark_models, start=0): + par = benchmark_models_parameters[count] + mfts = model(str(par if par is not None else "")) + mfts.order = par + pool.append(mfts) experiments = 0 for ct, train, test in Util.sliding_window(data, windowsize, train): diff --git a/benchmarks/naive.py b/benchmarks/naive.py index e9ea5ad..217436c 100644 --- a/benchmarks/naive.py +++ b/benchmarks/naive.py @@ -14,5 +14,5 @@ class Naive(fts.FTS): self.is_high_order = False def forecast(self, data, **kwargs): - return [k for k in data] + return data diff --git a/benchmarks/quantreg.py b/benchmarks/quantreg.py index 85bf679..ef73220 100644 --- a/benchmarks/quantreg.py +++ b/benchmarks/quantreg.py @@ -55,7 +55,7 @@ class QuantileRegression(fts.FTS): ret = [] - for k in np.arange(self.order, l): + for k in np.arange(self.order, l+1): #+1 to forecast one step ahead given all available lags sample = ndata[k - self.order : k] ret.append(self.linearmodel(sample, self.mean_qt)) @@ -72,7 +72,7 @@ class QuantileRegression(fts.FTS): ret = [] - for k in np.arange(self.order - 1, l): + for k in np.arange(self.order , l): sample = ndata[k - self.order: k] up = self.linearmodel(sample, self.upper_qt) down = self.linearmodel(sample, self.down_qt) diff --git a/tests/general.py b/tests/general.py index 6e79d5d..c8e9f34 100644 --- a/tests/general.py +++ b/tests/general.py @@ -39,7 +39,7 @@ os.chdir("/home/petronio/dados/Dropbox/Doutorado/Codigos/") #gauss_teste = random.normal(0,1.0,400) taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") -taiex = np.array(taiexpd["avg"][:5000]) +taiex = np.array(taiexpd["avg"][:2000]) #nasdaqpd = pd.read_csv("DataSets/NASDAQ_IXIC.csv", sep=",") #nasdaq = np.array(nasdaqpd["avg"][0:5000]) @@ -54,34 +54,39 @@ from statsmodels.tsa.tsatools import lagmat #print(lag) #print(a) -from pyFTS.benchmarks import distributed_benchmarks as bchmk +from pyFTS.benchmarks import benchmarks as bchmk +#from pyFTS.benchmarks import distributed_benchmarks as bchmk #from pyFTS.benchmarks import parallel_benchmarks as bchmk from pyFTS.benchmarks import Util from pyFTS.benchmarks import arima, quantreg -#Util.cast_dataframe_to_sintetic_point("experiments/taiex_point_analitic.csv","experiments/taiex_point_sintetic.csv",11) +#Util.cast_dataframe_to_synthetic_point("experiments/taiex_point_analitic.csv","experiments/taiex_point_sintetic.csv",11) #Util.plot_dataframe_point("experiments/taiex_point_sintetic.csv","experiments/taiex_point_analitic.csv",11) #tmp = arima.ARIMA("") -#tmp.train(taiex[:1600], None, order=(1,0,1)) -#teste = tmp.forecast(taiex[1600:1610]) +#tmp.train(taiex[:1600], None, order=(2,0,2)) +#teste = tmp.forecast(taiex[1600:1605]) #tmp = quantreg.QuantileRegression("") -#tmp.train(taiex[:1600], None, order=1) -#teste = tmp.forecast(taiex[1600:1610]) +#tmp.train(taiex[:1600], None, order=2) +#teste = tmp.forecast(taiex[1600:1605]) -#print(taiex[1600:1610]) +#print(taiex[1600:1605]) #print(teste) #bchmk.teste(taiex,['192.168.0.109', '192.168.0.101']) -bchmk.point_sliding_window(taiex,2000,train=0.8, #models=[yu.WeightedFTS], # # - partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], - partitions= np.arange(10,200,step=10), #transformation=diff, - dump=True, save=True, file="experiments/taiex_point_analytic.csv", - nodes=['192.168.0.102', '192.168.0.109', '192.168.0.106']) #, depends=[hofts, ifts]) +from pyFTS import song, chen, yu, cheng + +bchmk.point_sliding_window(taiex,1000,train=0.8, models=[], #song.ConventionalFTS, chen.ConventionalFTS], #[yu.WeightedFTS, cheng.TrendWeightedFTS], # # + partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], + partitions= [10], #np.arange(10,200,step=10), #transformation=diff, + dump=True, save=True, file="experiments/XXXtaiex_point_analytic.csv") #, +# nodes=['192.168.0.102', '192.168.0.109', '192.168.0.106']) #, depends=[hofts, ifts]) + +""" diff = Transformations.Differential(1) bchmk.point_sliding_window(taiex,2000,train=0.8, #models=[yu.WeightedFTS], # # @@ -89,7 +94,7 @@ bchmk.point_sliding_window(taiex,2000,train=0.8, #models=[yu.WeightedFTS], # # partitions= np.arange(10,200,step=10), transformation=diff, dump=True, save=True, file="experiments/taiex_point_analytic_diff.csv", nodes=['192.168.0.102', '192.168.0.109', '192.168.0.106']) #, depends=[hofts, ifts]) - +""" #bchmk.testa(taiex,[10,20],partitioners=[Grid.GridPartitioner], nodes=['192.168.0.109', '192.168.0.101']) #parallel_util.explore_partitioners(taiex,20)