From fa676f9f1d45ec377bea3bf0cd8071903f4cc06a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido=20de=20Lima=20e=20Silva?= Date: Tue, 4 Apr 2017 18:39:15 -0300 Subject: [PATCH] - Parallel sliding window benchmarks --- benchmarks/benchmarks.py | 315 ++++++++++++++++++---------- benchmarks/parallel_benchmarks.py | 334 +++++++++++++++++++++--------- tests/general.py | 3 +- 3 files changed, 441 insertions(+), 211 deletions(-) diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 9f368dd..2edeea6 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -4,6 +4,7 @@ import numpy as np import pandas as pd import time +import datetime import matplotlib as plt import matplotlib.colors as pltcolors import matplotlib.cm as cmx @@ -14,6 +15,7 @@ from pyFTS.partitioners import partitioner, Grid, Huarng, Entropy, FCM from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, ProbabilityDistribution from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util from pyFTS import fts, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts +from copy import deepcopy colors = ['grey', 'rosybrown', 'maroon', 'red','orange', 'yellow', 'olive', 'green', 'cyan', 'blue', 'darkblue', 'purple', 'darkviolet'] @@ -34,7 +36,66 @@ def get_point_methods(): def get_interval_methods(): return [ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS] -def external_point_sliding_window(models, parameters, data, windowsize,train=0.8, dump=False, save=False, file=None): + +def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u): + ret = [] + + if sintetic: + + for k in sorted(objs.keys()): + try: + mod = [] + mfts = objs[k] + mod.append(mfts.shortname) + mod.append(mfts.order) + mod.append(mfts.partitioner.name) + mod.append(mfts.partitioner.partitions) + mod.append(len(mfts)) + mod.append(np.round(np.nanmean(rmse[k]), 2)) + mod.append(np.round(np.nanstd(rmse[k]), 2)) + mod.append(np.round(np.nanmean(smape[k]), 2)) + mod.append(np.round(np.nanstd(smape[k]), 2)) + mod.append(np.round(np.nanmean(u[k]), 2)) + mod.append(np.round(np.nanstd(u[k]), 2)) + mod.append(np.round(np.nanmean(times[k]), 4)) + ret.append(mod) + except Exception as ex: + print("Erro ao salvar ", k) + print("Exceção ", ex) + + columns = ["Model", "Order", "Scheme","Partitions", "Size", "RMSEAVG", "RMSESTD", "SMAPEAVG", "SMAPESTD", "UAVG", "USTD", "TIMEAVG"] + else: + for k in sorted(objs.keys()): + try: + mfts = objs[k] + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'RMSE'] + tmp.extend(rmse[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'SMAPE'] + tmp.extend(smape[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'U'] + tmp.extend(u[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME'] + tmp.extend(times[k]) + ret.append(deepcopy(tmp)) + except Exception as ex: + print("Erro ao salvar ", k) + print("Exceção ", ex) + columns = [str(k) for k in np.arange(0, experiments)] + columns.insert(0, "Model") + columns.insert(1, "Order") + columns.insert(2, "Scheme") + columns.insert(3, "Partitions") + columns.insert(4, "Size") + columns.insert(5, "Measure") + dat = pd.DataFrame(ret, columns=columns) + if save: dat.to_csv(Util.uniquefilename(file), sep=";") + return dat + + +def external_point_sliding_window(models, parameters, data, windowsize,train=0.8, dump=False, save=False, file=None, sintetic=True): objs = {} lcolors = {} rmse = {} @@ -42,7 +103,9 @@ def external_point_sliding_window(models, parameters, data, windowsize,train=0.8 u = {} times = {} + experiments = 0 for ct, train, test in Util.sliding_window(data, windowsize, train): + experiments += 1 for count, method in enumerate(models, start=0): model = method("") @@ -80,36 +143,17 @@ def external_point_sliding_window(models, parameters, data, windowsize,train=0.8 u[_key].append(np.nan) times[_key].append(np.nan) - ret = [] - for k in sorted(objs.keys()): - try: - mod = [] - mfts = objs[k] - mod.append(mfts.shortname) - mod.append(np.round(np.nanmean(rmse[k]), 2)) - mod.append(np.round(np.nanstd(rmse[k]), 2)) - mod.append(np.round(np.nanmean(smape[k]), 2)) - mod.append(np.round(np.nanstd(smape[k]), 2)) - mod.append(np.round(np.nanmean(u[k]), 2)) - mod.append(np.round(np.nanstd(u[k]), 2)) - mod.append(np.round(np.nanmean(times[k]), 4)) - ret.append(mod) - except Exception as ex: - print("Erro ao salvar ",k) - print("Exceção ", ex) - - columns = ["Model", "RMSEAVG", "RMSESTD", "SMAPEAVG", "SMAPESTD", "UAVG", "USTD", "TIMEAVG"] - - dat = pd.DataFrame(ret, columns=columns) - - if save: dat.to_csv(Util.uniquefilename(file), sep=";") - - return dat + return save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, - save=False, file=None): + save=False, file=None, sintetic=True): + + _process_start = time.time() + + print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) + if models is None: models = get_point_methods() @@ -121,8 +165,9 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G u = {} times = {} + experiments = 0 for ct, train,test in Util.sliding_window(data, windowsize, train): - mocks = {} + experiments += 1 for partition in partitions: for partitioner in partitioners: pttr = str(partitioner.__module__).split('.')[-1] @@ -210,37 +255,14 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G smape[_key].append(np.nan) u[_key].append(np.nan) times[_key].append(np.nan) - ret = [] - for k in sorted(objs.keys()): - try: - mod = [] - tmp = objs[k] - mod.append(tmp.shortname) - mod.append(tmp.order ) - mod.append(tmp.partitioner.name) - mod.append(tmp.partitioner.partitions) - mod.append(np.round(np.nanmean(rmse[k]),2)) - mod.append(np.round(np.nanstd(rmse[k]), 2)) - mod.append(np.round(np.nanmean(smape[k]), 2)) - mod.append(np.round(np.nanstd(smape[k]), 2)) - mod.append(np.round(np.nanmean(u[k]), 2)) - mod.append(np.round(np.nanstd(u[k]), 2)) - mod.append(np.round(np.nanmean(times[k]), 4)) - mod.append(np.round(np.nanstd(times[k]), 4)) - mod.append(len(tmp)) - ret.append(mod) - except Exception as ex: - print("Erro ao salvar ",k) - print("Exceção ", ex) - columns = ["Model","Order","Scheme","Partitions","RMSEAVG","RMSESTD","SMAPEAVG","SMAPESTD","UAVG","USTD","TIMEAVG","TIMESTD","SIZE"] + _process_end = time.time() - dat = pd.DataFrame(ret,columns=columns) + print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) - if save: dat.to_csv(Util.uniquefilename(file),sep=";") - - return dat + print("Process Duration: {0}".format(_process_end - _process_start)) + return save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) def all_point_forecasters(data_train, data_test, partitions, max_order=3, statistics=True, residuals=True, @@ -370,10 +392,67 @@ def getProbabilityDistributionStatistics(pmfs, data): ret += " \\\\ \n" return ret +def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times): + ret = [] + if sintetic: + for k in sorted(objs.keys()): + mod = [] + mfts = objs[k] + mod.append(mfts.shortname) + mod.append(mfts.order) + mod.append(mfts.partitioner.name) + mod.append(mfts.partitioner.partitions) + mod.append(round(np.nanmean(sharpness[k]), 2)) + mod.append(round(np.nanstd(sharpness[k]), 2)) + mod.append(round(np.nanmean(resolution[k]), 2)) + mod.append(round(np.nanstd(resolution[k]), 2)) + mod.append(round(np.nanmean(coverage[k]), 2)) + mod.append(round(np.nanstd(coverage[k]), 2)) + mod.append(round(np.nanmean(times[k]), 2)) + mod.append(round(np.nanstd(times[k]), 2)) + mod.append(len(mfts)) + ret.append(mod) + + columns = ["Model", "Order", "Scheme", "Partitions", "SHARPAVG", "SHARPSTD", "RESAVG", "RESSTD", "COVAVG", + "COVSTD", "TIMEAVG", "TIMESTD", "SIZE"] + else: + for k in sorted(objs.keys()): + try: + mfts = objs[k] + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), + 'Sharpness'] + tmp.extend(sharpness[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), + 'Resolution'] + tmp.extend(resolution[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), + 'Coverage'] + tmp.extend(coverage[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), + 'TIME'] + tmp.extend(times[k]) + ret.append(deepcopy(tmp)) + except Exception as ex: + print("Erro ao salvar ", k) + print("Exceção ", ex) + columns = [str(k) for k in np.arange(0, experiments)] + columns.insert(0, "Model") + columns.insert(1, "Order") + columns.insert(2, "Scheme") + columns.insert(3, "Partitions") + columns.insert(4, "Size") + columns.insert(5, "Measure") + dat = pd.DataFrame(ret, columns=columns) + if save: dat.to_csv(Util.uniquefilename(file), sep=";") + return dat + def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, - save=False, file=None): + save=False, file=None, sintetic=True): if models is None: models = get_interval_methods() @@ -384,7 +463,9 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners coverage = {} times = {} + experiments = 0 for ct, training,test in Util.sliding_window(data, windowsize, train): + experiments += 1 for partition in partitions: for partitioner in partitioners: pttr = str(partitioner.__module__).split('.')[-1] @@ -460,32 +541,7 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners coverage[_key].append(_cov) times[_key].append(_tdiff) - ret = [] - for k in sorted(objs.keys()): - mod = [] - mfts = objs[k] - mod.append(mfts.shortname) - mod.append(mfts.order ) - mod.append(mfts.partitioner.name) - mod.append(mfts.partitioner.partitions) - mod.append(round(np.nanmean(sharpness[k]),2)) - mod.append(round(np.nanstd(sharpness[k]), 2)) - mod.append(round(np.nanmean(resolution[k]), 2)) - mod.append(round(np.nanstd(resolution[k]), 2)) - mod.append(round(np.nanmean(coverage[k]), 2)) - mod.append(round(np.nanstd(coverage[k]), 2)) - mod.append(round(np.nanmean(times[k]), 2)) - mod.append(round(np.nanstd(times[k]), 2)) - mod.append(len(mfts)) - ret.append(mod) - - columns = ["Model","Order","Scheme","Partitions","SHARPAVG","SHARPSTD","RESAVG","RESSTD","COVAVG","COVSTD","TIMEAVG","TIMESTD","SIZE"] - - dat = pd.DataFrame(ret,columns=columns) - - if save: dat.to_csv(Util.uniquefilename(file),sep=";") - - return dat + return save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times) def all_interval_forecasters(data_train, data_test, partitions, max_order=3,save=False, file=None, tam=[20, 5], @@ -613,9 +669,72 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]): ax.legend(handles0, labels0) +def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic): + ret = [] + + if sintetic: + + for k in sorted(objs.keys()): + try: + ret = [] + for k in sorted(objs.keys()): + try: + mod = [] + mfts = objs[k] + mod.append(mfts.shortname) + mod.append(mfts.order) + mod.append(mfts.partitioner.name) + mod.append(mfts.partitioner.partitions) + mod.append(np.round(np.nanmean(crps_interval[k]), 2)) + mod.append(np.round(np.nanstd(crps_interval[k]), 2)) + mod.append(np.round(np.nanmean(crps_distr[k]), 2)) + mod.append(np.round(np.nanstd(crps_distr[k]), 2)) + mod.append(len(mfts)) + mod.append(np.round(np.nanmean(times1[k]), 4)) + mod.append(np.round(np.nanmean(times2[k]), 4)) + ret.append(mod) + except Exception as e: + print('Erro: %s' % e) + except Exception as ex: + print("Erro ao salvar ", k) + print("Exceção ", ex) + + columns = ["Model", "Order", "Scheme", "Partitions", "CRPS1AVG", "CRPS1STD", "CRPS2AVG", "CRPS2STD", + "SIZE", "TIME1AVG", "TIME2AVG"] + else: + for k in sorted(objs.keys()): + try: + mfts = objs[k] + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'CRPS_Interval'] + tmp.extend(crps_interval[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'CRPS_Distribution'] + tmp.extend(crps_distr[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME_Interval'] + tmp.extend(times1[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME_Distribution'] + tmp.extend(times2[k]) + ret.append(deepcopy(tmp)) + except Exception as ex: + print("Erro ao salvar ", k) + print("Exceção ", ex) + columns = [str(k) for k in np.arange(0, experiments)] + columns.insert(0, "Model") + columns.insert(1, "Order") + columns.insert(2, "Scheme") + columns.insert(3, "Partitions") + columns.insert(4, "Size") + columns.insert(5, "Measure") + dat = pd.DataFrame(ret, columns=columns) + if save: dat.to_csv(Util.uniquefilename(file), sep=";") + return dat + + def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution = None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, - save=False, file=None): + save=False, file=None, sintetic=False): if models is None: models = [pwfts.ProbabilisticWeightedFTS] @@ -626,7 +745,9 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution times1 = {} times2 = {} + experiments = 0 for ct, train,test in Util.sliding_window(data, windowsize, train): + experiments += 1 for partition in partitions: for partitioner in partitioners: pttr = str(partitioner.__module__).split('.')[-1] @@ -704,33 +825,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution if dump: print(_crps1, _crps2, _tdiff, _t1, _t2) - ret = [] - for k in sorted(objs.keys()): - try: - mod = [] - mfts = objs[k] - mod.append(mfts.shortname) - mod.append(mfts.order ) - mod.append(mfts.partitioner.name) - mod.append(mfts.partitioner.partitions) - mod.append(np.round(np.nanmean(crps_interval[k]),2)) - mod.append(np.round(np.nanstd(crps_interval[k]), 2)) - mod.append(np.round(np.nanmean(crps_distr[k]), 2)) - mod.append(np.round(np.nanstd(crps_distr[k]), 2)) - mod.append(len(mfts)) - mod.append(np.round(np.nanmean(times1[k]), 4)) - mod.append(np.round(np.nanmean(times2[k]), 4)) - ret.append(mod) - except Exception as e: - print ('Erro: %s' % e) - - columns = ["Model","Order","Scheme","Partitions","CRPS1AVG","CRPS1STD","CRPS2AVG","CRPS2STD","SIZE","TIME1AVG","TIME2AVG"] - - dat = pd.DataFrame(ret,columns=columns) - - if save: dat.to_csv(Util.uniquefilename(file),sep=";") - - return dat + return save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic) def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resolution = None, max_order=3,save=False, file=None, tam=[20, 5], diff --git a/benchmarks/parallel_benchmarks.py b/benchmarks/parallel_benchmarks.py index 24e8ed7..d8061d9 100644 --- a/benchmarks/parallel_benchmarks.py +++ b/benchmarks/parallel_benchmarks.py @@ -5,6 +5,7 @@ import multiprocessing import numpy as np import pandas as pd import time +import datetime import matplotlib as plt import matplotlib.colors as pltcolors import matplotlib.cm as cmx @@ -17,25 +18,17 @@ from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util from pyFTS import fts, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts from pyFTS.benchmarks import benchmarks -def get_first_order_models(): - return [chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS, - sadaei.ExponentialyWeightedFTS] -def get_high_order_models(): - return [hofts.HighOrderFTS, pwfts.ProbabilisticWeightedFTS] - - -def run_first_order(method, partitioner, train_data, test_data, transformation = None, indexer=None ): - mfts = method("") +def run_point(mfts, partitioner, train_data, test_data, transformation=None, indexer=None): pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " " + pttr + " q = " + str(partitioner.partitions) + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) mfts.partitioner = partitioner if transformation is not None: mfts.appendTransformation(transformation) try: _start = time.time() - mfts.train(train_data, partitioner.sets) + mfts.train(train_data, partitioner.sets, order=mfts.order) _end = time.time() times = _end - _start @@ -46,73 +39,62 @@ def run_first_order(method, partitioner, train_data, test_data, transformation = except Exception as e: print(e) _rmse = np.nan - _smape= np.nan + _smape = np.nan _u = np.nan times = np.nan - ret = {'key':_key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times } + ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times} print(ret) return ret -def run_high_order(method, order, partitioner, train_data, test_data, transformation=None, indexer=None): - mfts = method("") - if order >= mfts.minOrder: - pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " n = " + str(order) + " " + pttr + " q = " + str(partitioner.partitions) - mfts.partitioner = partitioner - if transformation is not None: - mfts.appendTransformation(transformation) +def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, sintetic=False): + _process_start = time.time() - try: - _start = time.time() - mfts.train(train_data, partitioner.sets, order=order) - _end = time.time() - times = _end - _start - - _start = time.time() - _rmse, _smape, _u = benchmarks.get_point_statistics(test_data, mfts, indexer) - _end = time.time() - times += _end - _start - except Exception as e: - print(e) - _rmse = np.nan - _smape = np.nan - _u = np.nan - times = np.nan - - ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times} - - print(ret) - - return ret - - return {'key': None, 'obj': mfts, 'rmse': np.nan, 'smape': np.nan, 'u': np.nan, 'time': np.nan} - - -def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3,transformation=None,indexer=None,dump=False, - save=False, file=None): + print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) num_cores = multiprocessing.cpu_count() + pool = [] + objs = {} rmse = {} smape = {} u = {} times = {} - for ct, train,test in Util.sliding_window(data, windowsize, train): - mocks = {} + for model in benchmarks.get_point_methods(): + mfts = model("") + + if mfts.isHighOrder: + for order in np.arange(1, max_order + 1): + if order >= mfts.minOrder: + mfts = model("") + mfts.order = order + pool.append(mfts) + else: + pool.append(mfts) + + experiments = 0 + for ct, train, test in Util.sliding_window(data, windowsize, train): + experiments += 1 + + if dump: print('\nWindow: {0}\n'.format(ct)) + for partition in partitions: + for partitioner in partitioners: - pttr = str(partitioner.__module__).split('.')[-1] + data_train_fs = partitioner(train, partition, transformation=transformation) - results = Parallel(n_jobs=num_cores)(delayed(run_first_order)(m, deepcopy(data_train_fs), deepcopy(train), deepcopy(test), transformation) - for m in get_first_order_models()) + results = Parallel(n_jobs=num_cores)( + delayed(run_point)(deepcopy(m), deepcopy(data_train_fs), deepcopy(train), deepcopy(test), + transformation) + for m in pool) for tmp in results: if tmp['key'] not in objs: @@ -126,56 +108,208 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G u[tmp['key']].append(tmp['u']) times[tmp['key']].append(tmp['time']) - for count, model in enumerate(get_high_order_models(), start=0): + _process_end = time.time() - results = Parallel(n_jobs=num_cores)( - delayed(run_high_order)(model, order, deepcopy(data_train_fs), deepcopy(train), deepcopy(test), - transformation) - for order in np.arange(1, max_order + 1)) + print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) - for tmp in results: - if tmp['key'] not in objs: - objs[tmp['key']] = tmp['obj'] - rmse[tmp['key']] = [] - smape[tmp['key']] = [] - u[tmp['key']] = [] - times[tmp['key']] = [] - rmse[tmp['key']].append(tmp['rmse']) - smape[tmp['key']].append(tmp['smape']) - u[tmp['key']].append(tmp['u']) - times[tmp['key']].append(tmp['time']) - ret = [] - for k in sorted(objs.keys()): - try: - mod = [] - tmp = objs[k] - mod.append(tmp.shortname) - mod.append(tmp.order ) - mod.append(tmp.partitioner.name) - mod.append(tmp.partitioner.partitions) - mod.append(np.round(np.nanmean(rmse[k]),2)) - mod.append(np.round(np.nanstd(rmse[k]), 2)) - mod.append(np.round(np.nanmean(smape[k]), 2)) - mod.append(np.round(np.nanstd(smape[k]), 2)) - mod.append(np.round(np.nanmean(u[k]), 2)) - mod.append(np.round(np.nanstd(u[k]), 2)) - mod.append(np.round(np.nanmean(times[k]), 4)) - mod.append(np.round(np.nanstd(times[k]), 4)) - mod.append(len(tmp)) - ret.append(mod) - except Exception as ex: - print("Erro ao salvar ",k) - print("Exceção ", ex) + print("Process Duration: {0}".format(_process_end - _process_start)) - columns = ["Model","Order","Scheme","Partitions","RMSEAVG","RMSESTD","SMAPEAVG","SMAPESTD","UAVG","USTD","TIMEAVG","TIMESTD","SIZE"] - - dat = pd.DataFrame(ret,columns=columns) - - if save: dat.to_csv(Util.uniquefilename(file),sep=";") - - return dat + return benchmarks.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) +def run_interval(mfts, partitioner, train_data, test_data, transformation=None, indexer=None): + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) + + try: + _start = time.time() + mfts.train(train_data, partitioner.sets, order=mfts.order) + _end = time.time() + times = _end - _start + + _start = time.time() + _sharp, _res, _cov = benchmarks.get_interval_statistics(test_data, mfts) + _end = time.time() + times += _end - _start + except Exception as e: + print(e) + _sharp = np.nan + _res = np.nan + _cov = np.nan + times = np.nan + + ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times} + + print(ret) + + return ret +def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, sintetic=False): + _process_start = time.time() + print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) + + num_cores = multiprocessing.cpu_count() + + pool = [] + + objs = {} + sharpness = {} + resolution = {} + coverage = {} + times = {} + + for model in benchmarks.get_interval_methods(): + mfts = model("") + + if mfts.isHighOrder: + for order in np.arange(1, max_order + 1): + if order >= mfts.minOrder: + mfts = model("") + mfts.order = order + pool.append(mfts) + else: + pool.append(mfts) + + experiments = 0 + for ct, train, test in Util.sliding_window(data, windowsize, train): + experiments += 1 + + if dump: print('\nWindow: {0}\n'.format(ct)) + + for partition in partitions: + + for partitioner in partitioners: + + data_train_fs = partitioner(train, partition, transformation=transformation) + + results = Parallel(n_jobs=num_cores)( + delayed(run_interval)(deepcopy(m), deepcopy(data_train_fs), deepcopy(train), deepcopy(test), + transformation) + for m in pool) + + for tmp in results: + if tmp['key'] not in objs: + objs[tmp['key']] = tmp['obj'] + sharpness[tmp['key']] = [] + resolution[tmp['key']] = [] + coverage[tmp['key']] = [] + times[tmp['key']] = [] + + sharpness[tmp['key']].append(tmp['sharpness']) + resolution[tmp['key']].append(tmp['resolution']) + coverage[tmp['key']].append(tmp['coverage']) + times[tmp['key']].append(tmp['time']) + + _process_end = time.time() + + print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) + + print("Process Duration: {0}".format(_process_end - _process_start)) + + return benchmarks.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times) + + +def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, transformation=None, indexer=None): + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) + + try: + _start = time.time() + mfts.train(train_data, partitioner.sets, order=mfts.order) + _end = time.time() + times = _end - _start + + _crps1, _crps2, _t1, _t2 = benchmarks.get_distribution_statistics(test_data, mfts, steps=steps, + resolution=resolution) + _t1 += times + _t2 += times + except Exception as e: + print(e) + _crps1 = np.nan + _crps2 = np.nan + _t1 = np.nan + _t2 = np.nan + + ret = {'key': _key, 'obj': mfts, 'CRPS_Interval': _crps1, 'CRPS_Distribution': _crps2, 'TIME_Interval': _t1, 'TIME_Distribution': _t2} + + print(ret) + + return ret + + +def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, sintetic=False): + _process_start = time.time() + + print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) + + num_cores = multiprocessing.cpu_count() + + pool = [] + + objs = {} + crps_interval = {} + crps_distr = {} + times1 = {} + times2 = {} + + for model in benchmarks.get_interval_methods(): + mfts = model("") + + if mfts.isHighOrder: + for order in np.arange(1, max_order + 1): + if order >= mfts.minOrder: + mfts = model("") + mfts.order = order + pool.append(mfts) + else: + pool.append(mfts) + + experiments = 0 + for ct, train, test in Util.sliding_window(data, windowsize, train): + experiments += 1 + + if dump: print('\nWindow: {0}\n'.format(ct)) + + for partition in partitions: + + for partitioner in partitioners: + + data_train_fs = partitioner(train, partition, transformation=transformation) + + results = Parallel(n_jobs=num_cores)( + delayed(run_ahead)(deepcopy(m), deepcopy(data_train_fs), deepcopy(train), deepcopy(test), + steps, resolution, transformation) + for m in pool) + + for tmp in results: + if tmp['key'] not in objs: + objs[tmp['key']] = tmp['obj'] + crps_interval[tmp['key']] = [] + crps_distr[tmp['key']] = [] + times1[tmp['key']] = [] + times2[tmp['key']] = [] + + crps_interval[tmp['key']].append(tmp['CRPS_Interval']) + crps_distr[tmp['key']].append(tmp['CRPS_Distribution']) + times1[tmp['key']].append(tmp['TIME_Interval']) + times2[tmp['key']].append(tmp['TIME_Distribution']) + + _process_end = time.time() + + print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) + + print("Process Duration: {0}".format(_process_end - _process_start)) + + return benchmarks.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic) diff --git a/tests/general.py b/tests/general.py index e9053ae..c4c5a88 100644 --- a/tests/general.py +++ b/tests/general.py @@ -29,11 +29,12 @@ taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") taiex = np.array(taiexpd["avg"][:5000]) from pyFTS.benchmarks import parallel_benchmarks as bchmk +#from pyFTS.benchmarks import benchmarks as bchmk bchmk.point_sliding_window(taiex,2000,train=0.8, #transformation=diff, #models=[pwfts.ProbabilisticWeightedFTS], # # partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], partitions= np.arange(10,200,step=5), # - dump=True, save=True, file="experiments/nasdaq_point_paralllel.csv") + dump=False, save=True, file="experiments/nasdaq_point_parallel.csv") #parallel_util.explore_partitioners(taiex,20)