From 1d7801bdbf9f65a3a47f1a56230d60cdeaa6cfec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido?= Date: Mon, 9 Apr 2018 11:09:28 -0300 Subject: [PATCH] - Benchmark refactoring to expose a unified and simpler interface (the method sliding_window_benchmarks) - Improvements at FTS.fit and FTS.predict - PWFTS bugfixes and improvements --- pyFTS/benchmarks/Measures.py | 89 +-- pyFTS/benchmarks/Util.py | 117 ++-- pyFTS/benchmarks/benchmarks.py | 773 +++++++++++---------- pyFTS/benchmarks/distributed_benchmarks.py | 182 +---- pyFTS/common/Util.py | 62 +- pyFTS/common/fts.py | 42 +- pyFTS/models/pwfts.py | 239 ++----- pyFTS/partitioners/Util.py | 106 ++- pyFTS/tests/general.py | 15 +- 9 files changed, 755 insertions(+), 870 deletions(-) diff --git a/pyFTS/benchmarks/Measures.py b/pyFTS/benchmarks/Measures.py index b55da80..381ecdd 100644 --- a/pyFTS/benchmarks/Measures.py +++ b/pyFTS/benchmarks/Measures.py @@ -5,12 +5,14 @@ pyFTS module for common benchmark metrics """ import time +import numba import numpy as np import pandas as pd from pyFTS.common import FuzzySet,SortedCollection from pyFTS.probabilistic import ProbabilityDistribution + def acf(data, k): """ Autocorrelation function estimative @@ -28,6 +30,7 @@ def acf(data, k): return 1/((n-k)*sigma)*s + def rmse(targets, forecasts): """ Root Mean Squared Error @@ -42,6 +45,7 @@ def rmse(targets, forecasts): return np.sqrt(np.nanmean((targets - forecasts) ** 2)) + def rmse_interval(targets, forecasts): """ Root Mean Squared Error @@ -53,6 +57,7 @@ def rmse_interval(targets, forecasts): return np.sqrt(np.nanmean((fmean - targets) ** 2)) + def mape(targets, forecasts): """ Mean Average Percentual Error @@ -67,6 +72,7 @@ def mape(targets, forecasts): return np.mean(np.abs(targets - forecasts) / targets) * 100 + def smape(targets, forecasts, type=2): """ Symmetric Mean Average Percentual Error @@ -87,11 +93,13 @@ def smape(targets, forecasts, type=2): return sum(np.abs(forecasts - targets)) / sum(forecasts + targets) + def mape_interval(targets, forecasts): fmean = [np.mean(i) for i in forecasts] return np.mean(abs(fmean - targets) / fmean) * 100 + def UStatistic(targets, forecasts): """ Theil's U Statistic @@ -108,11 +116,12 @@ def UStatistic(targets, forecasts): naive = [] y = [] for k in np.arange(0,l-1): - y.append((forecasts[k ] - targets[k ]) ** 2) + y.append((forecasts[k ] - targets[k]) ** 2) naive.append((targets[k + 1] - targets[k]) ** 2) return np.sqrt(sum(y) / sum(naive)) + def TheilsInequality(targets, forecasts): """ Theil’s Inequality Coefficient @@ -128,6 +137,7 @@ def TheilsInequality(targets, forecasts): return us / (ys + fs) + def BoxPierceStatistic(data, h): """ Q Statistic for Box-Pierce test @@ -204,12 +214,15 @@ def pinball_mean(tau, targets, forecasts): :param forecasts: list of prediction intervals :return: """ - preds = [] - if tau <= 0.5: - preds = [pinball(tau, targets[i], forecasts[i][0]) for i in np.arange(0, len(forecasts))] - else: - preds = [pinball(tau, targets[i], forecasts[i][1]) for i in np.arange(0, len(forecasts))] - return np.nanmean(preds) + try: + if tau <= 0.5: + preds = [pinball(tau, targets[i], forecasts[i][0]) for i in np.arange(0, len(forecasts))] + else: + preds = [pinball(tau, targets[i], forecasts[i][1]) for i in np.arange(0, len(forecasts))] + return np.nanmean(preds) + except Exception as ex: + print(ex) + def pmf_to_cdf(density): @@ -259,18 +272,17 @@ def crps(targets, densities): return _crps / float(l * n) -def get_point_statistics(data, model, indexer=None): +def get_point_statistics(data, model, **kwargs): """Condensate all measures for point forecasters""" + indexer = kwargs.get('indexer', None) + if indexer is not None: ndata = np.array(indexer.get_data(data)) else: ndata = np.array(data[model.order:]) - if model.is_multivariate or indexer is None: - forecasts = model.forecast(data) - elif not model.is_multivariate and indexer is not None: - forecasts = model.forecast(indexer.get_data(data)) + forecasts = model.predict(data, **kwargs) try: if model.has_seasonality: @@ -281,29 +293,18 @@ def get_point_statistics(data, model, indexer=None): print(ex) return [np.nan,np.nan,np.nan] ret = list() - try: - ret.append(np.round(rmse(ndata, nforecasts), 2)) - except Exception as ex: - print('Error in RMSE: {}'.format(ex)) - ret.append(np.nan) - try: - ret.append(np.round(smape(ndata, nforecasts), 2)) - except Exception as ex: - print('Error in SMAPE: {}'.format(ex)) - ret.append(np.nan) - try: - ret.append(np.round(UStatistic(ndata, nforecasts), 2)) - except Exception as ex: - print('Error in U: {}'.format(ex)) - ret.append(np.nan) + + ret.append(np.round(rmse(ndata, nforecasts), 2)) + ret.append(np.round(smape(ndata, nforecasts), 2)) + ret.append(np.round(UStatistic(ndata, nforecasts), 2)) return ret -def get_interval_statistics(original, model): +def get_interval_statistics(original, model, **kwargs): """Condensate all measures for point_to_interval forecasters""" ret = list() - forecasts = model.forecast_interval(original) + forecasts = model.predict(original, **kwargs) ret.append(round(sharpness(forecasts), 2)) ret.append(round(resolution(forecasts), 2)) ret.append(round(coverage(original[model.order:], forecasts[:-1]), 2)) @@ -314,27 +315,13 @@ def get_interval_statistics(original, model): return ret -def get_distribution_statistics(original, model, steps, resolution): +def get_distribution_statistics(original, model, **kwargs): ret = list() - try: - _s1 = time.time() - densities1 = model.forecast_ahead_distribution(original, steps, parameters=3) - _e1 = time.time() - ret.append(round(crps(original, densities1), 3)) - ret.append(round(_e1 - _s1, 3)) - except Exception as e: - print('Erro: ', e) - ret.append(np.nan) - ret.append(np.nan) - - try: - _s2 = time.time() - densities2 = model.forecast_ahead_distribution(original, steps, parameters=2) - _e2 = time.time() - ret.append( round(crps(original, densities2), 3)) - ret.append(round(_e2 - _s2, 3)) - except: - ret.append(np.nan) - ret.append(np.nan) - + _s1 = time.time() + densities1 = model.predict(original, **kwargs) + _e1 = time.time() + ret.append(round(crps(original, densities1), 3)) + ret.append(round(_e1 - _s1, 3)) return ret + + diff --git a/pyFTS/benchmarks/Util.py b/pyFTS/benchmarks/Util.py index 42bfbad..1c96479 100644 --- a/pyFTS/benchmarks/Util.py +++ b/pyFTS/benchmarks/Util.py @@ -2,6 +2,7 @@ Benchmark utility functions """ +import numba import matplotlib as plt import matplotlib.cm as cmx import matplotlib.colors as pltcolors @@ -11,8 +12,6 @@ import pandas as pd #from mpl_toolkits.mplot3d import Axes3D -import numpy as np -import pandas as pd from copy import deepcopy from pyFTS.common import Util @@ -124,7 +123,7 @@ def save_dataframe_point(experiments, file, objs, rmse, save, synthetic, smape, s = '-' p = '-' l = '-' - + print([n, o, s, p, l]) tmp = [n, o, s, p, l, 'RMSE'] tmp.extend(rmse[k]) ret.append(deepcopy(tmp)) @@ -194,23 +193,31 @@ def cast_dataframe_to_synthetic_point(infile, outfile, experiments): dat.to_csv(outfile, sep=";", index=False) + def analytical_data_columns(experiments): data_columns = [str(k) for k in np.arange(0, experiments)] return data_columns + def scale_params(data): vmin = np.nanmin(data) vlen = np.nanmax(data) - vmin return (vmin, vlen) + + def scale(data, params): ndata = [(k-params[0])/params[1] for k in data] return ndata + + def stats(measure, data): print(measure, np.nanmean(data), np.nanstd(data)) + + def unified_scaled_point(experiments, tam, save=False, file=None, sort_columns=['UAVG', 'RMSEAVG', 'USTD', 'RMSESTD'], sort_ascend=[1, 1, 1, 1],save_best=False, @@ -320,6 +327,7 @@ def unified_scaled_point(experiments, tam, save=False, file=None, Util.show_and_save_image(fig, file, save) + def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam, save=False, file=None, sort_columns=['UAVG', 'RMSEAVG', 'USTD', 'RMSESTD'], sort_ascend=[1, 1, 1, 1],save_best=False, @@ -375,6 +383,7 @@ def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam, save=F Util.show_and_save_image(fig, file, save) + def check_replace_list(m, replace): if replace is not None: for r in replace: @@ -383,6 +392,7 @@ def check_replace_list(m, replace): return m + def check_ignore_list(b, ignore): flag = False if ignore is not None: @@ -475,6 +485,7 @@ def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, if save: dat.to_csv(Util.uniquefilename(file), sep=";") return dat + def interval_dataframe_analytic_columns(experiments): columns = [str(k) for k in np.arange(0, experiments)] columns.insert(0, "Model") @@ -486,12 +497,14 @@ def interval_dataframe_analytic_columns(experiments): return columns + def interval_dataframe_synthetic_columns(): columns = ["Model", "Order", "Scheme", "Partitions", "SHARPAVG", "SHARPSTD", "RESAVG", "RESSTD", "COVAVG", "COVSTD", "TIMEAVG", "TIMESTD", "Q05AVG", "Q05STD", "Q25AVG", "Q25STD", "Q75AVG", "Q75STD", "Q95AVG", "Q95STD"] return columns + def cast_dataframe_to_synthetic_interval(infile, outfile, experiments): columns = interval_dataframe_analytic_columns(experiments) dat = pd.read_csv(infile, sep=";", usecols=columns) @@ -545,6 +558,7 @@ def cast_dataframe_to_synthetic_interval(infile, outfile, experiments): dat.to_csv(outfile, sep=";", index=False) + def unified_scaled_interval(experiments, tam, save=False, file=None, sort_columns=['COVAVG', 'SHARPAVG', 'COVSTD', 'SHARPSTD'], sort_ascend=[True, False, True, True],save_best=False, @@ -643,6 +657,7 @@ def unified_scaled_interval(experiments, tam, save=False, file=None, Util.show_and_save_image(fig, file, save) + def plot_dataframe_interval(file_synthetic, file_analytic, experiments, tam, save=False, file=None, sort_columns=['COVAVG', 'SHARPAVG', 'COVSTD', 'SHARPSTD'], sort_ascend=[True, False, True, True],save_best=False, @@ -698,6 +713,7 @@ def plot_dataframe_interval(file_synthetic, file_analytic, experiments, tam, sav Util.show_and_save_image(fig, file, save) + def unified_scaled_interval_pinball(experiments, tam, save=False, file=None, sort_columns=['COVAVG','SHARPAVG','COVSTD','SHARPSTD'], sort_ascend=[True, False, True, True], save_best=False, @@ -795,6 +811,8 @@ def unified_scaled_interval_pinball(experiments, tam, save=False, file=None, Util.show_and_save_image(fig, file, save) + + def plot_dataframe_interval_pinball(file_synthetic, file_analytic, experiments, tam, save=False, file=None, sort_columns=['COVAVG','SHARPAVG','COVSTD','SHARPSTD'], sort_ascend=[True, False, True, True], save_best=False, @@ -846,7 +864,7 @@ def plot_dataframe_interval_pinball(file_synthetic, file_analytic, experiments, Util.show_and_save_image(fig, file, save) -def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic): +def save_dataframe_probabilistic(experiments, file, objs, crps, times, save, synthetic): """ Save benchmark results for m-step ahead probabilistic forecasters :param experiments: @@ -854,7 +872,7 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim :param objs: :param crps_interval: :param crps_distr: - :param times1: + :param times: :param times2: :param save: :param synthetic: @@ -881,13 +899,11 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim mod.append('-') mod.append('-') l = '-' - mod.append(np.round(np.nanmean(crps_interval[k]), 2)) - mod.append(np.round(np.nanstd(crps_interval[k]), 2)) - mod.append(np.round(np.nanmean(crps_distr[k]), 2)) - mod.append(np.round(np.nanstd(crps_distr[k]), 2)) + mod.append(np.round(np.nanmean(crps[k]), 2)) + mod.append(np.round(np.nanstd(crps[k]), 2)) mod.append(l) - mod.append(np.round(np.nanmean(times1[k]), 4)) - mod.append(np.round(np.nanmean(times2[k]), 4)) + mod.append(np.round(np.nanmean(times[k]), 4)) + mod.append(np.round(np.nanstd(times[k]), 4)) ret.append(mod) except Exception as e: print('Erro: %s' % e) @@ -895,7 +911,7 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim print("Erro ao salvar ", k) print("Exceção ", ex) - columns = ahead_dataframe_synthetic_columns() + columns = probabilistic_dataframe_synthetic_columns() else: for k in sorted(objs.keys()): try: @@ -910,28 +926,23 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim s = '-' p = '-' l = '-' - tmp = [n, o, s, p, l, 'CRPS_Interval'] - tmp.extend(crps_interval[k]) + tmp = [n, o, s, p, l, 'CRPS'] + tmp.extend(crps[k]) ret.append(deepcopy(tmp)) - tmp = [n, o, s, p, l, 'CRPS_Distribution'] - tmp.extend(crps_distr[k]) - ret.append(deepcopy(tmp)) - tmp = [n, o, s, p, l, 'TIME_Interval'] - tmp.extend(times1[k]) - ret.append(deepcopy(tmp)) - tmp = [n, o, s, p, l, 'TIME_Distribution'] - tmp.extend(times2[k]) + tmp = [n, o, s, p, l, 'TIME'] + tmp.extend(times[k]) ret.append(deepcopy(tmp)) except Exception as ex: print("Erro ao salvar ", k) print("Exceção ", ex) - columns = ahead_dataframe_analytic_columns(experiments) + columns = probabilistic_dataframe_analytic_columns(experiments) dat = pd.DataFrame(ret, columns=columns) if save: dat.to_csv(Util.uniquefilename(file), sep=";") return dat -def ahead_dataframe_analytic_columns(experiments): + +def probabilistic_dataframe_analytic_columns(experiments): columns = [str(k) for k in np.arange(0, experiments)] columns.insert(0, "Model") columns.insert(1, "Order") @@ -942,14 +953,14 @@ def ahead_dataframe_analytic_columns(experiments): return columns -def ahead_dataframe_synthetic_columns(): - columns = ["Model", "Order", "Scheme", "Partitions", "CRPS1AVG", "CRPS1STD", "CRPS2AVG", "CRPS2STD", - "TIME1AVG", "TIME1STD", "TIME2AVG", "TIME2STD"] +def probabilistic_dataframe_synthetic_columns(): + columns = ["Model", "Order", "Scheme", "Partitions", "CRPSAVG", "CRPSSTD", + "TIMEAVG", "TIMESTD"] return columns -def cast_dataframe_to_synthetic_ahead(infile, outfile, experiments): - columns = ahead_dataframe_analytic_columns(experiments) +def cast_dataframe_to_synthetic_probabilistic(infile, outfile, experiments): + columns = probabilistic_dataframe_analytic_columns(experiments) dat = pd.read_csv(infile, sep=";", usecols=columns) models = dat.Model.unique() orders = dat.Order.unique() @@ -967,36 +978,31 @@ def cast_dataframe_to_synthetic_ahead(infile, outfile, experiments): mod = [] df = dat[(dat.Model == m) & (dat.Order == o) & (dat.Scheme == s) & (dat.Partitions == p)] if not df.empty: - crps1 = extract_measure(df, 'CRPS_Interval', data_columns) - crps2 = extract_measure(df, 'CRPS_Distribution', data_columns) - times1 = extract_measure(df, 'TIME_Interval', data_columns) - times2 = extract_measure(df, 'TIME_Distribution', data_columns) + crps1 = extract_measure(df, 'CRPS', data_columns) + times1 = extract_measure(df, 'TIME', data_columns) mod.append(m) mod.append(o) mod.append(s) mod.append(p) mod.append(np.round(np.nanmean(crps1), 2)) mod.append(np.round(np.nanstd(crps1), 2)) - mod.append(np.round(np.nanmean(crps2), 2)) - mod.append(np.round(np.nanstd(crps2), 2)) mod.append(np.round(np.nanmean(times1), 2)) mod.append(np.round(np.nanstd(times1), 2)) - mod.append(np.round(np.nanmean(times2), 4)) - mod.append(np.round(np.nanstd(times2), 4)) ret.append(mod) - dat = pd.DataFrame(ret, columns=ahead_dataframe_synthetic_columns()) + dat = pd.DataFrame(ret, columns=probabilistic_dataframe_synthetic_columns()) dat.to_csv(outfile, sep=";", index=False) -def unified_scaled_ahead(experiments, tam, save=False, file=None, - sort_columns=['CRPS1AVG', 'CRPS2AVG', 'CRPS1STD', 'CRPS2STD'], - sort_ascend=[True, True, True, True], save_best=False, - ignore=None, replace=None): - fig, axes = plt.subplots(nrows=2, ncols=1, figsize=tam) - axes[0].set_title('CRPS Interval Ahead') - axes[1].set_title('CRPS Distribution Ahead') +def unified_scaled_probabilistic(experiments, tam, save=False, file=None, + sort_columns=['CRPSAVG', 'CRPSSTD'], + sort_ascend=[True, True], save_best=False, + ignore=None, replace=None): + fig, axes = plt.subplots(nrows=1, ncols=1, figsize=tam) + + axes.set_title('CRPS') + #axes[1].set_title('CRPS Distribution Ahead') models = {} @@ -1006,11 +1012,11 @@ def unified_scaled_ahead(experiments, tam, save=False, file=None, mdl = {} - dat_syn = pd.read_csv(experiment[0], sep=";", usecols=ahead_dataframe_synthetic_columns()) + dat_syn = pd.read_csv(experiment[0], sep=";", usecols=probabilistic_dataframe_synthetic_columns()) bests = find_best(dat_syn, sort_columns, sort_ascend) - dat_ana = pd.read_csv(experiment[1], sep=";", usecols=ahead_dataframe_analytic_columns(experiment[2])) + dat_ana = pd.read_csv(experiment[1], sep=";", usecols=probabilistic_dataframe_analytic_columns(experiment[2])) crps1 = [] crps2 = [] @@ -1070,21 +1076,22 @@ def unified_scaled_ahead(experiments, tam, save=False, file=None, Util.show_and_save_image(fig, file, save) -def plot_dataframe_ahead(file_synthetic, file_analytic, experiments, tam, save=False, file=None, - sort_columns=['CRPS1AVG', 'CRPS2AVG', 'CRPS1STD', 'CRPS2STD'], - sort_ascend=[True, True, True, True],save_best=False, - ignore=None, replace=None): + +def plot_dataframe_probabilistic(file_synthetic, file_analytic, experiments, tam, save=False, file=None, + sort_columns=['CRPS1AVG', 'CRPS2AVG', 'CRPS1STD', 'CRPS2STD'], + sort_ascend=[True, True, True, True], save_best=False, + ignore=None, replace=None): fig, axes = plt.subplots(nrows=2, ncols=1, figsize=tam) - axes[0].set_title('CRPS Interval Ahead') - axes[1].set_title('CRPS Distribution Ahead') + axes[0].set_title('CRPS') + axes[1].set_title('CRPS') - dat_syn = pd.read_csv(file_synthetic, sep=";", usecols=ahead_dataframe_synthetic_columns()) + dat_syn = pd.read_csv(file_synthetic, sep=";", usecols=probabilistic_dataframe_synthetic_columns()) bests = find_best(dat_syn, sort_columns, sort_ascend) - dat_ana = pd.read_csv(file_analytic, sep=";", usecols=ahead_dataframe_analytic_columns(experiments)) + dat_ana = pd.read_csv(file_analytic, sep=";", usecols=probabilistic_dataframe_analytic_columns(experiments)) data_columns = analytical_data_columns(experiments) diff --git a/pyFTS/benchmarks/benchmarks.py b/pyFTS/benchmarks/benchmarks.py index 0be8ebb..5973d63 100644 --- a/pyFTS/benchmarks/benchmarks.py +++ b/pyFTS/benchmarks/benchmarks.py @@ -6,6 +6,7 @@ import datetime import time +import numba from copy import deepcopy import matplotlib as plt @@ -20,15 +21,15 @@ from pyFTS.models import song, chen, yu, ismailefendi, sadaei, hofts, pwfts, ift from pyFTS.models.ensemble import ensemble from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, quantreg from pyFTS.benchmarks import Util as bUtil -from pyFTS.common import Util +from pyFTS.common import Util as cUtil # from sklearn.cross_validation import KFold from pyFTS.partitioners import Grid from matplotlib import rc -rc('font',**{'family':'sans-serif','sans-serif':['Helvetica']}) +#rc('font',**{'family':'sans-serif','sans-serif':['Helvetica']}) ## for Palatino and other serif fonts use: #rc('font',**{'family':'serif','serif':['Palatino']}) -rc('text', usetex=True) +#rc('text', usetex=True) colors = ['grey', 'darkgrey', 'rosybrown', 'maroon', 'red','orange', 'gold', 'yellow', 'olive', 'green', 'darkgreen', 'cyan', 'lightblue','blue', 'darkblue', 'purple', 'darkviolet' ] @@ -40,6 +41,193 @@ styles = ['-','--','-.',':','.'] nsty = len(styles) +def __pop(key, default, kwargs): + if key in kwargs: + return kwargs.pop(key) + else: + return default + + +def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs): + """ + Sliding window benchmarks for FTS point forecasters + :param data: + :param windowsize: size of sliding window + :param train: percentual of sliding window data used to train the models + :param models: FTS point forecasters + :param partitioners: Universe of Discourse partitioner + :param partitions: the max number of partitions on the Universe of Discourse + :param max_order: the max order of the models (for high order models) + :param transformation: data transformation + :param indexer: seasonal indexer + :param dump: + :param benchmark_methods: Non FTS models to benchmark + :param benchmark_methods_parameters: Non FTS models parameters + :param save: save results + :param file: file path to save the results + :param sintetic: if true only the average and standard deviation of the results + :return: DataFrame with the results + """ + distributed = __pop('distributed', False, kwargs) + save = __pop('save', False, kwargs) + + transformation = kwargs.get('transformation', None) + progress = kwargs.get('progress', None) + type = kwargs.get("type", 'point') + + orders = __pop("orders", [1,2,3], kwargs) + + partitioners_models = __pop("partitioners_models", None, kwargs) + partitioners_methods = __pop("partitioners_methods", [Grid.GridPartitioner], kwargs) + partitions = __pop("partitions", [10], kwargs) + + methods = __pop('methods', None, kwargs) + + models = __pop('models', None, kwargs) + + pool = [] if models is None else models + + if models is None and methods is None: + if type == 'point': + methods = get_point_methods() + elif type == 'interval': + methods = get_interval_methods() + elif type == 'distribution': + methods = get_probabilistic_methods() + + if models is None: + for method in methods: + mfts = method("") + + if mfts.is_high_order: + for order in orders: + if order >= mfts.min_order: + mfts = method("") + mfts.order = order + pool.append(mfts) + else: + mfts.order = 1 + pool.append(mfts) + + benchmark_models = __pop("benchmark_models", None, kwargs) + + benchmark_methods = __pop("benchmark_methods", None, kwargs) + benchmark_methods_parameters = __pop("benchmark_methods_parameters", None, kwargs) + + if benchmark_models != False: + + if benchmark_models is None and benchmark_methods is None: + if type == 'point'or type == 'partition': + benchmark_methods = get_benchmark_point_methods() + elif type == 'interval': + benchmark_methods = get_benchmark_interval_methods() + elif type == 'distribution': + benchmark_methods = get_benchmark_probabilistic_methods() + + if benchmark_models is not None: + pool.extend(benchmark_models) + elif benchmark_methods is not None: + for count, model in enumerate(benchmark_methods, start=0): + par = benchmark_methods_parameters[count] + mfts = model(str(par if par is not None else "")) + mfts.order = par + pool.append(mfts) + + if type == 'point': + experiment_method = run_point + synthesis_method = process_point_jobs + elif type == 'interval': + experiment_method = run_interval + synthesis_method = process_interval_jobs + elif type == 'distribution': + experiment_method = run_probabilistic + synthesis_method = process_probabilistic_jobs + + if distributed: + import dispy, dispy.httpd + + nodes = kwargs.get("nodes", ['127.0.0.1']) + cluster, http_server = cUtil.start_dispy_cluster(experiment_method, nodes) + + experiments = 0 + jobs = [] + + if progress: + from tqdm import tqdm + progressbar = tqdm(total=len(data), desc="Sliding Window:") + + inc = __pop("inc", 0.1, kwargs) + + for ct, train, test in cUtil.sliding_window(data, windowsize, train, inc=inc, **kwargs): + experiments += 1 + + if progress: + progressbar.update(windowsize * inc) + + partitioners_pool = [] + + if partitioners_models is None: + + for partition in partitions: + + for partitioner in partitioners_methods: + + data_train_fs = partitioner(data=train, npart=partition, transformation=transformation) + + partitioners_pool.append(data_train_fs) + else: + partitioners_pool = partitioners_models + + rng1 = partitioners_pool + + if progress: + rng1 = tqdm(partitioners_pool, desc="Partitioners") + + for partitioner in rng1: + + rng2 = enumerate(pool,start=0) + + if progress: + rng2 = enumerate(tqdm(pool, desc="Models"),start=0) + + for _id, model in rng2: + + if not distributed: + job = experiment_method(deepcopy(model), deepcopy(partitioner), train, test, **kwargs) + jobs.append(job) + else: + job = cluster.submit(deepcopy(model), deepcopy(partitioner), train, test, **kwargs) + job.id = id # associate an ID to identify jobs (if needed later) + jobs.append(job) + + if progress: + progressbar.close() + + if distributed: + jobs2 = [] + + rng = jobs + + cluster.wait() # wait for all jobs to finish + + if progress: + rng = tqdm(jobs) + + for job in rng: + if job.status == dispy.DispyJob.Finished and job is not None: + tmp = job() + jobs2.append(tmp) + + jobs = deepcopy(jobs2) + + cUtil.stop_dispy_cluster(cluster, http_server) + + file = kwargs.get('file', None) + sintetic = kwargs.get('sintetic', False) + + return synthesis_method(jobs, experiments, save, file, sintetic) + + def get_benchmark_point_methods(): """Return all non FTS methods for point forecasting""" return [naive.Naive, arima.ARIMA, quantreg.QuantileRegression] @@ -64,12 +252,17 @@ def get_interval_methods(): def get_probabilistic_methods(): """Return all FTS methods for probabilistic forecasting""" - return [arima.ARIMA, ensemble.AllMethodEnsembleFTS, pwfts.ProbabilisticWeightedFTS] + return [ensemble.AllMethodEnsembleFTS, pwfts.ProbabilisticWeightedFTS] -def run_point(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None): +def get_benchmark_probabilistic_methods(): + """Return all FTS methods for probabilistic forecasting""" + return [arima.ARIMA, quantreg.QuantileRegression] + + +def run_point(mfts, partitioner, train_data, test_data, window_key=None, **kwargs): """ - Point forecast benchmark function to be executed on sliding window + Point forecast benchmark function to be executed on cluster nodes :param mfts: FTS model :param partitioner: Universe of Discourse partitioner :param train_data: data used to train the model @@ -77,8 +270,28 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo :param window_key: id of the sliding window :param transformation: data transformation :param indexer: seasonal indexer - :return: a dictionary with the benchmark results + :return: a dictionary with the benchmark results """ + import time + from pyFTS.models import yu, chen, hofts, pwfts,ismailefendi,sadaei, song, cheng, hwang + from pyFTS.partitioners import Grid, Entropy, FCM + from pyFTS.benchmarks import Measures, naive, arima, quantreg + from pyFTS.common import Transformations + + tmp = [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS, + cheng.TrendWeightedFTS, sadaei.ExponentialyWeightedFTS, hofts.HighOrderFTS, hwang.HighOrderFTS, + pwfts.ProbabilisticWeightedFTS] + + tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] + + tmp4 = [naive.Naive, arima.ARIMA, quantreg.QuantileRegression] + + tmp3 = [Measures.get_point_statistics] + + tmp5 = [Transformations.Differential] + + transformation = kwargs.get('transformation', None) + indexer = kwargs.get('indexer', None) if mfts.benchmark_only: _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") @@ -86,16 +299,17 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo pttr = str(partitioner.__module__).split('.')[-1] _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) mfts.partitioner = partitioner - if transformation is not None: - mfts.append_transformation(transformation) + + if transformation is not None: + mfts.append_transformation(transformation) _start = time.time() - mfts.train(train_data, sets=partitioner.sets, order=mfts.order) + mfts.fit(train_data, order=mfts.order, **kwargs) _end = time.time() times = _end - _start _start = time.time() - _rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer) + _rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, **kwargs) _end = time.time() times += _end - _start @@ -104,113 +318,120 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo return ret -def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, - benchmark_models=None, benchmark_models_parameters = None, - save=False, file=None, sintetic=False): +def run_interval(mfts, partitioner, train_data, test_data, window_key=None, **kwargs): """ - Sliding window benchmarks for FTS point forecasters - :param data: - :param windowsize: size of sliding window - :param train: percentual of sliding window data used to train the models - :param models: FTS point forecasters - :param partitioners: Universe of Discourse partitioner - :param partitions: the max number of partitions on the Universe of Discourse - :param max_order: the max order of the models (for high order models) + Interval forecast benchmark function to be executed on cluster nodes + :param mfts: FTS model + :param partitioner: Universe of Discourse partitioner + :param train_data: data used to train the model + :param test_data: ata used to test the model + :param window_key: id of the sliding window :param transformation: data transformation :param indexer: seasonal indexer - :param dump: - :param benchmark_models: Non FTS models to benchmark - :param benchmark_models_parameters: Non FTS models parameters - :param save: save results - :param file: file path to save the results - :param sintetic: if true only the average and standard deviation of the results - :return: DataFrame with the results + :return: a dictionary with the benchmark results """ + import time + from pyFTS.models import hofts,ifts,pwfts + from pyFTS.partitioners import Grid, Entropy, FCM + from pyFTS.benchmarks import Measures, arima, quantreg - if benchmark_models is None: # and models is None: - benchmark_models = [naive.Naive, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, - quantreg.QuantileRegression, quantreg.QuantileRegression] + tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS] - if benchmark_models_parameters is None: - benchmark_models_parameters = [1, (1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2] + tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] - _process_start = time.time() + tmp4 = [arima.ARIMA, quantreg.QuantileRegression] - print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) + tmp3 = [Measures.get_interval_statistics] - pool = [] - jobs = [] - objs = {} - rmse = {} - smape = {} - u = {} - times = {} + transformation = kwargs.get('transformation', None) + indexer = kwargs.get('indexer', None) - if models is None: - models = get_point_methods() + if mfts.benchmark_only: + _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha) + else: + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner - for model in models: - mfts = model("") + if transformation is not None: + mfts.append_transformation(transformation) - if mfts.is_high_order: - for order in np.arange(1, max_order + 1): - if order >= mfts.min_order: - mfts = model("") - mfts.order = order - pool.append(mfts) - else: - mfts.order = 1 - pool.append(mfts) + _start = time.time() + mfts.fit(train_data, order=mfts.order, **kwargs) + _end = time.time() + times = _end - _start - if benchmark_models is not None: - for count, model in enumerate(benchmark_models, start=0): - par = benchmark_models_parameters[count] - mfts = model(str(par if par is not None else "")) - mfts.order = par - pool.append(mfts) + _start = time.time() + _sharp, _res, _cov, _q05, _q25, _q75, _q95 = Measures.get_interval_statistics(test_data, mfts, **kwargs) + _end = time.time() + times += _end - _start - experiments = 0 - for ct, train, test in Util.sliding_window(data, windowsize, train): - experiments += 1 + ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times, + 'Q05': _q05, 'Q25': _q25, 'Q75': _q75, 'Q95': _q95, 'window': window_key} - benchmarks_only = {} + return ret - if dump: print('\nWindow: {0}\n'.format(ct)) - for partition in partitions: +def run_probabilistic(mfts, partitioner, train_data, test_data, window_key=None, **kwargs): + """ + Probabilistic forecast benchmark function to be executed on cluster nodes + :param mfts: FTS model + :param partitioner: Universe of Discourse partitioner + :param train_data: data used to train the model + :param test_data: ata used to test the model + :param steps: + :param resolution: + :param window_key: id of the sliding window + :param transformation: data transformation + :param indexer: seasonal indexer + :return: a dictionary with the benchmark results + """ + import time + import numpy as np + from pyFTS.models import hofts, ifts, pwfts + from pyFTS.models.ensemble import ensemble + from pyFTS.partitioners import Grid, Entropy, FCM + from pyFTS.benchmarks import Measures, arima + from pyFTS.models.seasonal import SeasonalIndexer - for partitioner in partitioners: + tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS, arima.ARIMA, ensemble.AllMethodEnsembleFTS] - data_train_fs = partitioner(data=train, npart=partition, transformation=transformation) + tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] - for _id, m in enumerate(pool,start=0): - if m.benchmark_only and m.shortname in benchmarks_only: - continue - else: - benchmarks_only[m.shortname] = m + tmp3 = [Measures.get_distribution_statistics, SeasonalIndexer.SeasonalIndexer, SeasonalIndexer.LinearSeasonalIndexer] - tmp = run_point(deepcopy(m), data_train_fs, train, test, ct, transformation) + transformation = kwargs.get('transformation', None) + indexer = kwargs.get('indexer', None) - if tmp['key'] not in objs: - objs[tmp['key']] = tmp['obj'] - rmse[tmp['key']] = [] - smape[tmp['key']] = [] - u[tmp['key']] = [] - times[tmp['key']] = [] - rmse[tmp['key']].append_rhs(tmp['rmse']) - smape[tmp['key']].append_rhs(tmp['smape']) - u[tmp['key']].append_rhs(tmp['u']) - times[tmp['key']].append_rhs(tmp['time']) - print(tmp['key'], tmp['window']) + if mfts.benchmark_only: + _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha) + else: + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner - _process_end = time.time() + if transformation is not None: + mfts.append_transformation(transformation) - print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) + if mfts.has_seasonality: + mfts.indexer = indexer - print("Process Duration: {0}".format(_process_end - _process_start)) + try: + _start = time.time() + mfts.fit(train_data, order=mfts.order) + _end = time.time() + times = _end - _start - return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) + _crps1, _t1 = Measures.get_distribution_statistics(test_data, mfts, **kwargs) + _t1 += times + except Exception as e: + print(e) + _crps1 = np.nan + _t1 = np.nan + + ret = {'key': _key, 'obj': mfts, 'CRPS': _crps1, 'time': _t1, 'window': window_key} + + return ret def build_model_pool_point(models, max_order, benchmark_models, benchmark_models_parameters): @@ -239,71 +460,84 @@ def build_model_pool_point(models, max_order, benchmark_models, benchmark_models return pool -def all_point_forecasters(data_train, data_test, partitions, max_order=3, statistics=True, residuals=True, - series=True, save=False, file=None, tam=[20, 5], models=None, transformation=None, - distributions=False, benchmark_models=None, benchmark_models_parameters=None): - """ - Fixed data benchmark for FTS point forecasters - :param data_train: data used to train the models - :param data_test: data used to test the models - :param partitions: the max number of partitions on the Universe of Discourse - :param max_order: the max order of the models (for high order models) - :param statistics: print statistics - :param residuals: print and plot residuals - :param series: plot time series - :param save: save results - :param file: file path to save the results - :param tam: figure dimensions to plot the graphs - :param models: list of models to benchmark - :param transformation: data transformation - :param distributions: plot distributions - :return: - """ - models = build_model_pool_point(models, max_order, benchmark_models, benchmark_models_parameters) +def process_point_jobs(jobs, experiments, save=False, file=None, sintetic=False): + objs = {} + rmse = {} + smape = {} + u = {} + times = {} - objs = [] + for job in jobs: + _key = job['key'] + if _key not in objs: + objs[_key] = job['obj'] + rmse[_key] = [] + smape[_key] = [] + u[_key] = [] + times[_key] = [] + rmse[_key].append(job['rmse']) + smape[_key].append(job['smape']) + u[_key].append(job['u']) + times[_key].append(job['time']) - data_train_fs = Grid.GridPartitioner(data=data_train, npart=partitions, transformation=transformation) + return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) - count = 1 - lcolors = [] +def process_interval_jobs(jobs, experiments, save=False, file=None, sintetic=False): + objs = {} + sharpness = {} + resolution = {} + coverage = {} + q05 = {} + q25 = {} + q75 = {} + q95 = {} + times = {} - for count, model in enumerate(models, start=0): - #print(model) - if transformation is not None: - model.append_transformation(transformation) - model.train(data_train, sets=data_train_fs.sets, order=model.order) - objs.append(model) - lcolors.append( colors[count % ncol] ) + for job in jobs: + _key = job['key'] + if _key not in objs: + objs[_key] = job['obj'] + sharpness[_key] = [] + resolution[_key] = [] + coverage[_key] = [] + times[_key] = [] + q05[_key] = [] + q25[_key] = [] + q75[_key] = [] + q95[_key] = [] - if statistics: - print_point_statistics(data_test, objs) + sharpness[_key].append(job['sharpness']) + resolution[_key].append(job['resolution']) + coverage[_key].append(job['coverage']) + times[_key].append(job['time']) + q05[_key].append(job['Q05']) + q25[_key].append(job['Q25']) + q75[_key].append(job['Q75']) + q95[_key].append(job['Q95']) - if residuals: - print(ResidualAnalysis.compare_residuals(data_test, objs)) - ResidualAnalysis.plot_residuals(data_test, objs, save=save, file=file, tam=tam) - if series: - plot_compared_series(data_test, objs, lcolors, typeonlegend=False, save=save, file=file, tam=tam, - intervals=False) + return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, + times, q05, q25, q75, q95) - if distributions: - lcolors.insert(0,'black') - pmfs = [] - pmfs.append( - ProbabilityDistribution.ProbabilityDistribution("Original", 100, [min(data_test), max(data_test)], data=data_test) ) - for m in objs: - forecasts = m.forecast(data_test) - pmfs.append( - ProbabilityDistribution.ProbabilityDistribution(m.shortname, 100, [min(data_test), max(data_test)], - data=forecasts)) - print(getProbabilityDistributionStatistics(pmfs,data_test)) +def process_probabilistic_jobs(jobs, experiments, save=False, file=None, sintetic=False): + objs = {} + crps = {} + times = {} - plot_probability_distributions(pmfs, lcolors, tam=tam) + for job in jobs: + _key = job['key'] + if _key not in objs: + objs[_key] = job['obj'] + crps[_key] = [] + times[_key] = [] + + crps[_key].append(job['CRPS']) + times[_key].append(job['time']) + + return bUtil.save_dataframe_probabilistic(experiments, file, objs, crps, times, save, sintetic) - return models def print_point_statistics(data, models, externalmodels = None, externalforecasts = None, indexers=None): @@ -330,163 +564,6 @@ def print_point_statistics(data, models, externalmodels = None, externalforecast -def getProbabilityDistributionStatistics(pmfs, data): - ret = "Model & Entropy & Empirical Likelihood & Pseudo Likelihood \\\\ \n" - for k in pmfs: - ret += k.name + " & " - ret += str(k.entropy()) + " & " - ret += str(k.empiricalloglikelihood())+ " & " - ret += str(k.pseudologlikelihood(data)) - ret += " \\\\ \n" - return ret - - - -def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], - partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, - save=False, file=None, synthetic=True): - if models is None: - models = get_interval_methods() - - objs = {} - lcolors = {} - sharpness = {} - resolution = {} - coverage = {} - times = {} - - experiments = 0 - for ct, training,test in Util.sliding_window(data, windowsize, train): - experiments += 1 - for partition in partitions: - for partitioner in partitioners: - pttr = str(partitioner.__module__).split('.')[-1] - data_train_fs = partitioner(data=training, npart=partition, transformation=transformation) - - for count, model in enumerate(models, start=0): - - mfts = model("") - _key = mfts.shortname + " " + pttr+ " q = " +str(partition) - - mfts.partitioner = data_train_fs - if not mfts.is_high_order: - - if dump: print(ct,_key) - - if _key not in objs: - objs[_key] = mfts - lcolors[_key] = colors[count % ncol] - sharpness[_key] = [] - resolution[_key] = [] - coverage[_key] = [] - times[_key] = [] - - if transformation is not None: - mfts.append_transformation(transformation) - - _start = time.time() - mfts.train(training, sets=data_train_fs.sets) - _end = time.time() - _tdiff = _end - _start - - _start = time.time() - _sharp, _res, _cov = Measures.get_interval_statistics(test, mfts) - _end = time.time() - _tdiff += _end - _start - sharpness[_key].append_rhs(_sharp) - resolution[_key].append_rhs(_res) - coverage[_key].append_rhs(_cov) - times[_key].append_rhs(_tdiff) - - else: - for order in np.arange(1, max_order + 1): - if order >= mfts.min_order: - mfts = model("") - _key = mfts.shortname + " n = " + str(order) + " " + pttr + " q = " + str(partition) - mfts.partitioner = data_train_fs - - if dump: print(ct,_key) - - if _key not in objs: - objs[_key] = mfts - lcolors[_key] = colors[count % ncol] - sharpness[_key] = [] - resolution[_key] = [] - coverage[_key] = [] - times[_key] = [] - - if transformation is not None: - mfts.append_transformation(transformation) - - _start = time.time() - mfts.train(training, sets=data_train_fs.sets, order=order) - _end = time.time() - - _tdiff = _end - _start - - _start = time.time() - _sharp, _res, _cov = Measures.get_interval_statistics(test, mfts) - _end = time.time() - _tdiff += _end - _start - sharpness[_key].append_rhs(_sharp) - resolution[_key].append_rhs(_res) - coverage[_key].append_rhs(_cov) - times[_key].append_rhs(_tdiff) - - return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times) - - -def build_model_pool_interval(models, max_order, benchmark_models, benchmark_models_parameters): - pool = [] - if models is None: - models = get_interval_methods() - for model in models: - mfts = model("") - - if mfts.is_high_order: - for order in np.arange(1, max_order + 1): - if order >= mfts.min_order: - mfts = model("") - mfts.order = order - pool.append(mfts) - else: - mfts.order = 1 - pool.append(mfts) - alphas = [0.05, 0.25] - if benchmark_models is not None: - for count, model in enumerate(benchmark_models, start=0): - par = benchmark_models_parameters[count] - for alpha in alphas: - mfts = model(str(alpha), alpha=alpha) - mfts.order = par - pool.append(mfts) - return pool - - -def all_interval_forecasters(data_train, data_test, partitions, max_order=3,save=False, file=None, tam=[20, 5], - statistics=False, models=None, transformation=None, - benchmark_models=None, benchmark_models_parameters=None): - models = build_model_pool_interval(models, max_order, benchmark_models, benchmark_models_parameters) - - data_train_fs = Grid.GridPartitioner(data=data_train, npart=partitions, transformation=transformation).sets - - lcolors = [] - objs = [] - - for count, model in Util.enumerate2(models, start=0, step=2): - if transformation is not None: - model.append_transformation(transformation) - model.train(data_train, sets=data_train_fs, order=model.order) - objs.append(model) - lcolors.append( colors[count % ncol] ) - - if statistics: - print_interval_statistics(data_test, objs) - - plot_compared_series(data_test, objs, lcolors, typeonlegend=False, save=save, file=file, tam=tam, - points=False, intervals=True) - - def print_interval_statistics(original, models): ret = "Model & Order & Sharpness & Resolution & Coverage & .05 & .25 & .75 & .95 \\\\ \n" for fts in models: @@ -503,6 +580,7 @@ def print_interval_statistics(original, models): print(ret) + def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False, ls='-', linewidth=1): lower = [kk[0] for kk in intervals] upper = [kk[1] for kk in intervals] @@ -517,6 +595,7 @@ def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False return [mi, ma] + def plot_compared_series(original, models, colors, typeonlegend=False, save=False, file=None, tam=[20, 5], points=True, intervals=True, linewidth=1.5): """ @@ -596,6 +675,7 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]): ax.legend(handles0, labels0) + def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution = None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, save=False, file=None, synthetic=False): @@ -610,7 +690,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution times2 = {} experiments = 0 - for ct, train,test in Util.sliding_window(data, windowsize, train): + for ct, train,test in cUtil.sliding_window(data, windowsize, train): experiments += 1 for partition in partitions: for partitioner in partitioners: @@ -692,6 +772,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution return bUtil.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic) + def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resolution = None, max_order=3,save=False, file=None, tam=[20, 5], models=None, transformation=None, option=2): if models is None: @@ -704,7 +785,7 @@ def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resol data_train_fs = Grid.GridPartitioner(data=data_train, npart=partitions, transformation=transformation).sets lcolors = [] - for count, model in Util.enumerate2(models, start=0, step=2): + for count, model in cUtil.enumerate2(models, start=0, step=2): mfts = model("") if not mfts.is_high_order: if transformation is not None: @@ -746,6 +827,7 @@ def print_distribution_statistics(original, models, steps, resolution): print(ret) + def plot_compared_intervals_ahead(original, models, colors, distributions, time_from, time_to, intervals = True, save=False, file=None, tam=[20, 5], resolution=None, cmap='Blues', linewidth=1.5): @@ -820,7 +902,8 @@ def plot_compared_intervals_ahead(original, models, colors, distributions, time_ ax.set_xlabel('T') ax.set_xlim([0, len(original)]) - Util.show_and_save_image(fig, file, save, lgd=lgd) + cUtil.show_and_save_image(fig, file, save, lgd=lgd) + def plot_density_rectange(ax, cmap, density, fig, resolution, time_from, time_to): @@ -845,6 +928,7 @@ def plot_density_rectange(ax, cmap, density, fig, resolution, time_from, time_to from pyFTS.common import Transformations + def plot_probabilitydistribution_density(ax, cmap, probabilitydist, fig, time_from): from matplotlib.patches import Rectangle from matplotlib.collections import PatchCollection @@ -1102,86 +1186,11 @@ def simpleSearch_RMSE(train, test, model, partitions, orders, save=False, file=N # plt.tight_layout() - Util.show_and_save_image(fig, file, save) + cUtil.show_and_save_image(fig, file, save) return ret -def sliding_window_simple_search(data, windowsize, model, partitions, orders, save=False, file=None, tam=[10, 15], - plotforecasts=False, elev=30, azim=144, intervals=False, parameters=None): - _3d = len(orders) > 1 - ret = [] - errors = np.array([[0 for k in range(len(partitions))] for kk in range(len(orders))]) - forecasted_best = [] - fig = plt.figure(figsize=tam) - # fig.suptitle("Comparação de modelos ") - if plotforecasts: - ax0 = fig.add_axes([0, 0.4, 0.9, 0.5]) # left, bottom, width, height - ax0.set_xlim([0, len(data)]) - ax0.set_ylim([min(data) * 0.9, max(data) * 1.1]) - ax0.set_title('Forecasts') - ax0.set_ylabel('F(T)') - ax0.set_xlabel('T') - min_rmse = 1000000.0 - best = None - - for pc, p in enumerate(partitions, start=0): - - sets = Grid.GridPartitioner(data=data, npart=p).sets - for oc, o in enumerate(orders, start=0): - _error = [] - for ct, train, test in Util.sliding_window(data, windowsize, 0.8): - fts = model("q = " + str(p) + " n = " + str(o)) - fts.train(data, sets=sets, order=o, parameters=parameters) - if not intervals: - forecasted = fts.forecast(test) - if not fts.has_seasonality: - _error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted[:-1])) ) - else: - _error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted)) ) - for kk in range(o): - forecasted.insert(0, None) - if plotforecasts: ax0.plot(forecasted, label=fts.name) - else: - forecasted = fts.forecast_interval(test) - _error.append( 1.0 - Measures.rmse_interval(np.array(test[o:]), np.array(forecasted[:-1])) ) - error = np.nanmean(_error) - errors[oc, pc] = error - if error < min_rmse: - min_rmse = error - best = fts - forecasted_best = forecasted - - # print(min_rmse) - if plotforecasts: - # handles0, labels0 = ax0.get_legend_handles_labels() - # ax0.legend(handles0, labels0) - ax0.plot(test, label="Original", linewidth=3.0, color="black") - if _3d: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim) - if not plotforecasts: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim) - # ax1 = fig.add_axes([0.6, 0.5, 0.45, 0.45], projection='3d') - if _3d: - ax1.set_title('Error Surface') - ax1.set_ylabel('Model order') - ax1.set_xlabel('Number of partitions') - ax1.set_zlabel('RMSE') - X, Y = np.meshgrid(partitions, orders) - surf = ax1.plot_surface(X, Y, errors, rstride=1, cstride=1, antialiased=True) - else: - ax1 = fig.add_axes([0, 1, 0.9, 0.9]) - ax1.set_title('Error Curve') - ax1.set_ylabel('Number of partitions') - ax1.set_xlabel('RMSE') - ax0.plot(errors,partitions) - ret.append(best) - ret.append(forecasted_best) - - # plt.tight_layout() - - Util.show_and_save_image(fig, file, save) - - return ret - def pftsExploreOrderAndPartitions(data,save=False, file=None): fig, axes = plt.subplots(nrows=4, ncols=1, figsize=[6, 8]) @@ -1242,5 +1251,5 @@ def pftsExploreOrderAndPartitions(data,save=False, file=None): plt.tight_layout() - Util.show_and_save_image(fig, file, save) + cUtil.show_and_save_image(fig, file, save) diff --git a/pyFTS/benchmarks/distributed_benchmarks.py b/pyFTS/benchmarks/distributed_benchmarks.py index a590e26..134967d 100644 --- a/pyFTS/benchmarks/distributed_benchmarks.py +++ b/pyFTS/benchmarks/distributed_benchmarks.py @@ -8,6 +8,7 @@ python3 /usr/local/bin/dispynode.py -i [local IP] -d import datetime import time +import numba import dispy import dispy.httpd @@ -18,61 +19,7 @@ from pyFTS.common import Util from pyFTS.partitioners import Grid -def run_point(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None): - """ - Point forecast benchmark function to be executed on cluster nodes - :param mfts: FTS model - :param partitioner: Universe of Discourse partitioner - :param train_data: data used to train the model - :param test_data: ata used to test the model - :param window_key: id of the sliding window - :param transformation: data transformation - :param indexer: seasonal indexer - :return: a dictionary with the benchmark results - """ - import time - from pyFTS.models import yu, chen, hofts, pwfts,ismailefendi,sadaei, song, cheng, hwang - from pyFTS.partitioners import Grid, Entropy, FCM - from pyFTS.benchmarks import Measures, naive, arima, quantreg - from pyFTS.common import Transformations - - tmp = [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS, - cheng.TrendWeightedFTS, sadaei.ExponentialyWeightedFTS, hofts.HighOrderFTS, hwang.HighOrderFTS, - pwfts.ProbabilisticWeightedFTS] - - tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] - - tmp4 = [naive.Naive, arima.ARIMA, quantreg.QuantileRegression] - - tmp3 = [Measures.get_point_statistics] - - tmp5 = [Transformations.Differential] - - if mfts.benchmark_only: - _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") - else: - pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) - mfts.partitioner = partitioner - - if transformation is not None: - mfts.append_transformation(transformation) - - _start = time.time() - mfts.train(train_data, partitioner.sets, order=mfts.order) - _end = time.time() - times = _end - _start - - _start = time.time() - _rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer) - _end = time.time() - times += _end - _start - - ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times, 'window': window_key} - - return ret - - +@numba.jit() def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, benchmark_models=None, benchmark_models_parameters = None, @@ -100,7 +47,7 @@ def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, part :return: DataFrame with the results """ - cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) + cluster = dispy.JobCluster(benchmarks.run_point, nodes=nodes) #, depends=dependencies) http_server = dispy.httpd.DispyHTTPServer(cluster) @@ -174,7 +121,7 @@ def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, part return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) - +@numba.jit() def build_model_pool_point(models, max_order, benchmark_models, benchmark_models_parameters): pool = [] @@ -209,57 +156,7 @@ def build_model_pool_point(models, max_order, benchmark_models, benchmark_models return pool -def run_interval(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None): - """ - Interval forecast benchmark function to be executed on cluster nodes - :param mfts: FTS model - :param partitioner: Universe of Discourse partitioner - :param train_data: data used to train the model - :param test_data: ata used to test the model - :param window_key: id of the sliding window - :param transformation: data transformation - :param indexer: seasonal indexer - :return: a dictionary with the benchmark results - """ - import time - from pyFTS.models import hofts,ifts,pwfts - from pyFTS.partitioners import Grid, Entropy, FCM - from pyFTS.benchmarks import Measures, arima, quantreg - - tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS] - - tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] - - tmp4 = [arima.ARIMA, quantreg.QuantileRegression] - - tmp3 = [Measures.get_interval_statistics] - - if mfts.benchmark_only: - _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha) - else: - pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) - mfts.partitioner = partitioner - - if transformation is not None: - mfts.append_transformation(transformation) - - _start = time.time() - mfts.train(train_data, partitioner.sets, order=mfts.order) - _end = time.time() - times = _end - _start - - _start = time.time() - _sharp, _res, _cov, _q05, _q25, _q75, _q95 = Measures.get_interval_statistics(test_data, mfts) - _end = time.time() - times += _end - _start - - ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times, - 'Q05': _q05, 'Q25': _q25, 'Q75': _q75, 'Q95': _q95, 'window': window_key} - - return ret - - +@numba.jit() def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, benchmark_models=None, benchmark_models_parameters = None, @@ -296,7 +193,7 @@ def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, if benchmark_models_parameters is None: benchmark_models_parameters = [(1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2] - cluster = dispy.JobCluster(run_interval, nodes=nodes) #, depends=dependencies) + cluster = dispy.JobCluster(benchmarks.run_interval, nodes=nodes) #, depends=dependencies) http_server = dispy.httpd.DispyHTTPServer(cluster) @@ -407,70 +304,7 @@ def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, times, q05, q25, q75, q95) -def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, window_key=None, transformation=None, indexer=None): - """ - Probabilistic m-step ahead forecast benchmark function to be executed on cluster nodes - :param mfts: FTS model - :param partitioner: Universe of Discourse partitioner - :param train_data: data used to train the model - :param test_data: ata used to test the model - :param steps: - :param resolution: - :param window_key: id of the sliding window - :param transformation: data transformation - :param indexer: seasonal indexer - :return: a dictionary with the benchmark results - """ - import time - import numpy as np - from pyFTS.models import hofts, ifts, pwfts - from pyFTS.models.ensemble import ensemble - from pyFTS.partitioners import Grid, Entropy, FCM - from pyFTS.benchmarks import Measures, arima - from pyFTS.models.seasonal import SeasonalIndexer - - tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS, arima.ARIMA, ensemble.AllMethodEnsembleFTS] - - tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] - - tmp3 = [Measures.get_distribution_statistics, SeasonalIndexer.SeasonalIndexer, SeasonalIndexer.LinearSeasonalIndexer] - - if mfts.benchmark_only: - _key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha) - else: - pttr = str(partitioner.__module__).split('.')[-1] - _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) - mfts.partitioner = partitioner - - if transformation is not None: - mfts.append_transformation(transformation) - - if mfts.has_seasonality: - mfts.indexer = indexer - - try: - _start = time.time() - mfts.train(train_data, partitioner.sets, order=mfts.order) - _end = time.time() - times = _end - _start - - _crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(test_data, mfts, steps=steps, - resolution=resolution) - _t1 += times - _t2 += times - except Exception as e: - print(e) - _crps1 = np.nan - _crps2 = np.nan - _t1 = np.nan - _t2 = np.nan - - ret = {'key': _key, 'obj': mfts, 'CRPS_Interval': _crps1, 'CRPS_Distribution': _crps2, 'TIME_Interval': _t1, - 'TIME_Distribution': _t2, 'window': window_key} - - return ret - - +@numba.jit() def ahead_sliding_window(data, windowsize, steps, resolution, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, benchmark_models=None, benchmark_models_parameters = None, @@ -505,7 +339,7 @@ def ahead_sliding_window(data, windowsize, steps, resolution, train=0.8, inc=0.1 if benchmark_models_parameters is None: benchmark_models_parameters = [(1, 0, 0), (1, 0, 1), (2, 0, 0), (2, 0, 1), (2, 0, 2)] - cluster = dispy.JobCluster(run_ahead, nodes=nodes) # , depends=dependencies) + cluster = dispy.JobCluster(benchmarks.run_ahead, nodes=nodes) # , depends=dependencies) http_server = dispy.httpd.DispyHTTPServer(cluster) diff --git a/pyFTS/common/Util.py b/pyFTS/common/Util.py index 6866f8d..dfd7bf1 100644 --- a/pyFTS/common/Util.py +++ b/pyFTS/common/Util.py @@ -1,4 +1,5 @@ import time +import numba import matplotlib.pyplot as plt import dill import numpy as np @@ -15,6 +16,7 @@ def uniquefilename(name): return name + str(current_milli_time()) + def show_and_save_image(fig, file, flag, lgd=None): """ Show and image and save on file @@ -38,7 +40,7 @@ def enumerate2(xs, start=0, step=1): start += step -def sliding_window(data, windowsize, train=0.8, inc=0.1): +def sliding_window(data, windowsize, train=0.8, inc=0.1, **kwargs): """ Sliding window method of cross validation for time series :param data: the entire dataset @@ -50,7 +52,16 @@ def sliding_window(data, windowsize, train=0.8, inc=0.1): l = len(data) ttrain = int(round(windowsize * train, 0)) ic = int(round(windowsize * inc, 0)) - for count in np.arange(0,l-windowsize+ic,ic): + + progressbar = kwargs.get('progress', None) + + rng = np.arange(0,l-windowsize+ic,ic) + + if progressbar: + from tqdm import tqdm + rng = tqdm(rng) + + for count in rng: if count + windowsize > l: _end = l else: @@ -91,11 +102,34 @@ def load_env(file): dill.load_session(file) + +def start_dispy_cluster(method, nodes): + import dispy, dispy.httpd, logging + + cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000) + + http_server = dispy.httpd.DispyHTTPServer(cluster) + + return cluster, http_server + + + +def stop_dispy_cluster(cluster, http_server): + cluster.wait() # wait for all jobs to finish + + cluster.print_status() + + http_server.shutdown() # this waits until browser gets all updates + cluster.close() + + + def simple_model_train(model, data, parameters): model.train(data, **parameters) return model + def distributed_train(model, train_method, nodes, fts_method, data, num_batches=10, train_parameters={}, **kwargs): import dispy, dispy.httpd, datetime @@ -106,9 +140,7 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches= file_path = kwargs.get('file_path', None) - cluster = dispy.JobCluster(train_method, nodes=nodes) # , depends=dependencies) - - http_server = dispy.httpd.DispyHTTPServer(cluster) + cluster, http_server = start_dispy_cluster(train_method, nodes) print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now())) @@ -149,26 +181,21 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches= print("[{0: %H:%M:%S}] Distrituted Train Finished".format(datetime.datetime.now())) - cluster.wait() # wait for all jobs to finish - - cluster.print_status() - - http_server.shutdown() # this waits until browser gets all updates - cluster.close() + stop_dispy_cluster(cluster, http_server) return model + def simple_model_predict(model, data, parameters): return model.predict(data, **parameters) + def distributed_predict(model, parameters, nodes, data, num_batches): import dispy, dispy.httpd - cluster = dispy.JobCluster(simple_model_predict, nodes=nodes) # , depends=dependencies) - - http_server = dispy.httpd.DispyHTTPServer(cluster) + cluster, http_server = start_dispy_cluster(simple_model_predict, nodes) jobs = [] n = len(data) @@ -199,11 +226,6 @@ def distributed_predict(model, parameters, nodes, data, num_batches): print(job.exception) print(job.stdout) - cluster.wait() # wait for all jobs to finish - - cluster.print_status() - - http_server.shutdown() # this waits until browser gets all updates - cluster.close() + stop_dispy_cluster(cluster, http_server) return ret diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index 126e5ee..acf404a 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -5,7 +5,7 @@ from pyFTS.common import FuzzySet, SortedCollection, tree, Util class FTS(object): """ - Fuzzy Time Series + Fuzzy Time Series object model """ def __init__(self, order, name, **kwargs): """ @@ -60,7 +60,14 @@ class FTS(object): Forecast using trained model :param data: time series with minimal length to the order of the model :param kwargs: - :return: + + :keyword + type: the forecasting type, one of these values: point(default), interval or distribution. + steps_ahead: The forecasting horizon, i. e., the number of steps ahead to forecast + start: in the multi step forecasting, the index of the data where to start forecasting + distributed: boolean, indicate if the forecasting procedure will be distributed in a dispy cluster + nodes: a list with the dispy cluster nodes addresses + :return: a numpy array with the forecasted data """ if 'distributed' in kwargs: @@ -181,7 +188,7 @@ class FTS(object): def fit(self, data, **kwargs): """ - :param data: + :param data: the training time series :param kwargs: :keyword @@ -189,12 +196,17 @@ class FTS(object): save_model: save final model on disk batch_save: save the model between each batch file_path: path to save the model + distributed: boolean, indicate if the training procedure will be distributed in a dispy cluster + nodes: a list with the dispy cluster nodes addresses + :return: """ import datetime - num_batches = kwargs.get('num_batches', 10) + dump = kwargs.get('dump', None) + + num_batches = kwargs.get('num_batches', None) save = kwargs.get('save_model', False) # save model on disk @@ -214,14 +226,24 @@ class FTS(object): batch_save_interval=batch_save_interval) else: - print("[{0: %H:%M:%S}] Start training".format(datetime.datetime.now())) + if dump == 'time': + print("[{0: %H:%M:%S}] Start training".format(datetime.datetime.now())) if num_batches is not None: n = len(data) batch_size = int(n / num_batches) bcount = 1 - for ct in range(self.order, n, batch_size): - print("[{0: %H:%M:%S}] Starting batch ".format(datetime.datetime.now()) + str(bcount)) + + rng = range(self.order, n, batch_size) + + if dump == 'tqdm': + from tqdm import tqdm + + rng = tqdm(rng) + + for ct in rng: + if dump == 'time': + print("[{0: %H:%M:%S}] Starting batch ".format(datetime.datetime.now()) + str(bcount)) if self.is_multivariate: ndata = data.iloc[ct - self.order:ct + batch_size] else: @@ -232,14 +254,16 @@ class FTS(object): if batch_save: Util.persist_obj(self,file_path) - print("[{0: %H:%M:%S}] Finish batch ".format(datetime.datetime.now()) + str(bcount)) + if dump == 'time': + print("[{0: %H:%M:%S}] Finish batch ".format(datetime.datetime.now()) + str(bcount)) bcount += 1 else: self.train(data, **kwargs) - print("[{0: %H:%M:%S}] Finish training".format(datetime.datetime.now())) + if dump == 'time': + print("[{0: %H:%M:%S}] Finish training".format(datetime.datetime.now())) if save: Util.persist_obj(self, file_path) diff --git a/pyFTS/models/pwfts.py b/pyFTS/models/pwfts.py index 52f5ae2..b3c761b 100644 --- a/pyFTS/models/pwfts.py +++ b/pyFTS/models/pwfts.py @@ -201,8 +201,9 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): if flrg.get_key() in self.flrgs: return self.flrgs[flrg.get_key()].frequency_count / self.global_frequency_count else: - self.add_new_PWFLGR(flrg) - return self.flrg_lhs_unconditional_probability(flrg) + return 0.0 + #self.add_new_PWFLGR(flrg) + #return self.flrg_lhs_unconditional_probability(flrg) def flrg_lhs_conditional_probability(self, x, flrg): mv = flrg.get_membership(x, self.sets) @@ -214,8 +215,11 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): tmp = self.flrgs[flrg.get_key()] ret = tmp.get_midpoint(self.sets) #sum(np.array([tmp.rhs_unconditional_probability(s) * self.setsDict[s].centroid for s in tmp.RHS])) else: - pi = 1 / len(flrg.LHS) - ret = sum(np.array([pi * self.sets[s].centroid for s in flrg.LHS])) + if len(flrg.LHS) > 0: + pi = 1 / len(flrg.LHS) + ret = sum(np.array([pi * self.sets[s].centroid for s in flrg.LHS])) + else: + ret = np.nan return ret def flrg_rhs_conditional_probability(self, x, flrg): @@ -241,8 +245,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): tmp = self.flrgs[flrg.get_key()] ret = tmp.get_upper(self.sets) else: - pi = 1 / len(flrg.LHS) - ret = sum(np.array([pi * self.sets[s].upper for s in flrg.LHS])) + ret = 0 return ret def get_lower(self, flrg): @@ -250,8 +253,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): tmp = self.flrgs[flrg.get_key()] ret = tmp.get_lower(self.sets) else: - pi = 1 / len(flrg.LHS) - ret = sum(np.array([pi * self.sets[s].lower for s in flrg.LHS])) + ret = 0 return ret def forecast(self, data, **kwargs): @@ -322,92 +324,31 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): ret.append_rhs([lo_qt, up_qt]) def interval_extremum(self, k, ndata, ret): - affected_flrgs = [] - affected_flrgs_memberships = [] - norms = [] + + sample = ndata[k - (self.order - 1): k + 1] + + flrgs = self.generate_lhs_flrg(sample) + up = [] lo = [] - # Find the sets which membership > 0 for each lag - count = 0 - lags = {} - if self.order > 1: - subset = ndata[k - (self.order - 1): k + 1] - - for instance in subset: - mb = FuzzySet.fuzzyfy_instance(instance, self.sets) - tmp = np.argwhere(mb) - idx = np.ravel(tmp) # flatten the array - - if idx.size == 0: # the element is out of the bounds of the Universe of Discourse - if math.isclose(instance, self.sets[0].lower) or instance < self.sets[0].lower: - idx = [0] - elif math.isclose(instance, self.sets[-1].upper) or instance > self.sets[-1].upper: - idx = [len(self.sets) - 1] - else: - raise Exception("Data exceed the known bounds [%s, %s] of universe of discourse: %s" % - (self.sets[0].lower, self.sets[-1].upper, instance)) - - lags[count] = idx - count += 1 - - # Build the tree with all possible paths - - root = tree.FLRGTreeNode(None) - - self.build_tree(root, lags, 0) - - # Trace the possible paths and build the PFLRG's - - for p in root.paths(): - path = list(reversed(list(filter(None.__ne__, p)))) - flrg = hofts.HighOrderFLRG(self.order) - for kk in path: flrg.append_lhs(self.sets[kk]) - - assert len(flrg.LHS) == subset.size, str(subset) + " -> " + str([s.name for s in flrg.LHS]) - - ## - affected_flrgs.append(flrg) - - # Find the general membership of FLRG - affected_flrgs_memberships.append(min(self.get_sequence_membership(subset, flrg.LHS))) - - else: - - mv = FuzzySet.fuzzyfy_instance(ndata[k], self.sets) # get all membership values - tmp = np.argwhere(mv) # get the indices of values > 0 - idx = np.ravel(tmp) # flatten the array - - if idx.size == 0: # the element is out of the bounds of the Universe of Discourse - if math.isclose(ndata[k], self.sets[0].lower) or ndata[k] < self.sets[0].lower: - idx = [0] - elif math.isclose(ndata[k], self.sets[-1].upper) or ndata[k] > self.sets[-1].upper: - idx = [len(self.sets) - 1] - else: - raise Exception("Data exceed the known bounds [%s, %s] of universe of discourse: %s" % - (self.sets[0].lower, self.sets[-1].upper, ndata[k])) - - for kk in idx: - flrg = hofts.HighOrderFLRG(self.order) - flrg.append_lhs(self.sets[kk]) - affected_flrgs.append(flrg) - affected_flrgs_memberships.append(mv[kk]) - for count, flrg in enumerate(affected_flrgs): - # achar o os bounds de cada FLRG, ponderados pela probabilidade e pertinência - norm = self.flrg_lhs_unconditional_probability(flrg) * affected_flrgs_memberships[count] + norms = [] + for flrg in flrgs: + norm = self.flrg_lhs_conditional_probability(sample, flrg) if norm == 0: norm = self.flrg_lhs_unconditional_probability(flrg) # * 0.001 up.append(norm * self.get_upper(flrg)) lo.append(norm * self.get_lower(flrg)) norms.append(norm) - # gerar o intervalo + # gerar o intervalo norm = sum(norms) if norm == 0: - ret.append_rhs([0, 0]) + ret.append([0, 0]) else: lo_ = sum(lo) / norm up_ = sum(up) / norm - ret.append_rhs([lo_, up_]) + ret.append([lo_, up_]) + def forecast_distribution(self, data, **kwargs): @@ -415,15 +356,18 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): data = [data] smooth = kwargs.get("smooth", "none") - nbins = kwargs.get("num_bins", 100) ndata = np.array(self.apply_transformations(data)) - l = len(ndata) + uod = self.get_UoD() + + if 'bins' in kwargs: + _bins = kwargs.pop('bins') + else: + nbins = kwargs.get("num_bins", 100) + _bins = np.linspace(uod[0], uod[1], nbins) ret = [] - uod = self.get_UoD() - _bins = np.linspace(uod[0], uod[1], nbins) for k in np.arange(self.order - 1, l): sample = ndata[k - (self.order - 1): k + 1] @@ -487,120 +431,63 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): ret = [] - method = kwargs.get('method', 2) - smooth = "KDE" if method != 4 else "none" - nbins = kwargs.get("num_bins", 100) + smooth = kwargs.get("smooth", "none") + + ndata = np.array(self.apply_transformations(data)) uod = self.get_UoD() - _bins = np.linspace(uod[0], uod[1], nbins).tolist() - if method != 4: - intervals = self.forecast_ahead_interval(data, steps) + if 'bins' in kwargs: + _bins = kwargs.pop('bins') else: - l = len(data) - for k in np.arange(l - self.order, l): - dist = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, bins=_bins, **kwargs) - dist.set(data[k], 1.0) - ret.append(dist) + nbins = kwargs.get("num_bins", 100) + _bins = np.linspace(uod[0], uod[1], nbins) - for k in np.arange(self.order, steps + self.order): + start = kwargs.get('start', self.order) - data = [] + sample = ndata[start - (self.order - 1): start + 1] - if method == 1: + for dat in sample: + tmp = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, bins=_bins, **kwargs) + tmp.set(dat, 1.0) + ret.append(tmp) - lags = {} + dist = self.forecast_distribution(sample, bins=_bins) - cc = 0 + ret.append(dist) - for i in intervals[k - self.order : k]: + for k in np.arange(self.order, steps+self.order): + dist = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, bins=_bins, **kwargs) - quantiles = [] + lags = {} - for qt in np.arange(0, 50, 2): - quantiles.append(i[0] + qt * ((i[1] - i[0]) / 100)) - quantiles.append(i[1] - qt * ((i[1] - i[0]) / 100)) - quantiles.append(i[0] + ((i[1] - i[0]) / 2)) + # Find all bins of past distributions with probability greater than zero - quantiles = list(set(quantiles)) + for ct, dd in enumerate(ret[k - self.order: k]): + vals = [float(v) for v in dd.bins if round(dd.density(v), 4) > 0] + lags[ct] = sorted(vals) - quantiles.sort() + root = tree.FLRGTreeNode(None) - lags[cc] = quantiles + self.build_tree_without_order(root, lags, 0) - cc += 1 + # Trace all possible combinations between the bins of past distributions - # Build the tree with all possible paths + for p in root.paths(): + path = list(reversed(list(filter(None.__ne__, p)))) - root = tree.FLRGTreeNode(None) + # get the combined probabilities for this path - self.build_tree_without_order(root, lags, 0) + pk = np.prod([ret[k - self.order + o].density(path[o]) + for o in np.arange(0, self.order)]) - # Trace the possible paths - for p in root.paths(): - path = list(reversed(list(filter(None.__ne__, p)))) - qtle = np.ravel(self.forecast_interval(path)) + d = self.forecast_distribution(path)[0] - data.extend(np.linspace(qtle[0],qtle[1],100).tolist()) + for bin in _bins: + dist.set(bin, dist.density(bin) + pk * d.density(bin)) - elif method == 2: - - for qt in np.arange(0, 50, 1): - # print(qt) - qtle_lower = self.forecast_interval( - [intervals[x][0] + qt * ((intervals[x][1] - intervals[x][0]) / 100) for x in - np.arange(k - self.order, k)]) - qtle_lower = np.ravel(qtle_lower) - data.extend(np.linspace(qtle_lower[0], qtle_lower[1], 100).tolist()) - qtle_upper = self.forecast_interval( - [intervals[x][1] - qt * ((intervals[x][1] - intervals[x][0]) / 100) for x in - np.arange(k - self.order, k)]) - qtle_upper = np.ravel(qtle_upper) - data.extend(np.linspace(qtle_upper[0], qtle_upper[1], 100).tolist()) - qtle_mid = self.forecast_interval( - [intervals[x][0] + (intervals[x][1] - intervals[x][0]) / 2 for x in np.arange(k - self.order, k)]) - qtle_mid = np.ravel(qtle_mid) - data.extend(np.linspace(qtle_mid[0], qtle_mid[1], 100).tolist()) - - elif method == 3: - i = intervals[k] - - data = np.linspace(i[0],i[1],100).tolist() - - else: - dist = ProbabilityDistribution.ProbabilityDistribution(smooth, bins=_bins, - uod=uod, **kwargs) - lags = {} - - cc = 0 - - for dd in ret[k - self.order: k]: - vals = [float(v) for v in dd.bins if round(dd.density(v),4) > 0] - lags[cc] = sorted(vals) - cc += 1 - - root = tree.FLRGTreeNode(None) - - self.build_tree_without_order(root, lags, 0) - - # Trace the possible paths - for p in root.paths(): - path = list(reversed(list(filter(None.__ne__, p)))) - - pk = np.prod([ret[k - self.order + o].density(path[o]) - for o in np.arange(0,self.order)]) - - d = self.forecast_distribution(path)[0] - - for bin in _bins: - dist.set(bin, dist.density(bin) + pk * d.density(bin)) - - if method != 4: - dist = ProbabilityDistribution.ProbabilityDistribution(smooth, bins=_bins, data=data, - uod=uod, **kwargs) - - ret.append(dist) + ret = ret[self.order:] return ret diff --git a/pyFTS/partitioners/Util.py b/pyFTS/partitioners/Util.py index 9bdf51e..09f1e67 100644 --- a/pyFTS/partitioners/Util.py +++ b/pyFTS/partitioners/Util.py @@ -1,9 +1,12 @@ +import numba import numpy as np import pandas as pd import matplotlib as plt import matplotlib.colors as pltcolors import matplotlib.pyplot as plt #from mpl_toolkits.mplot3d import Axes3D + +from pyFTS.benchmarks import Measures from pyFTS.common import Membership, Util from pyFTS.partitioners import Grid,Huarng,FCM,Entropy @@ -12,6 +15,107 @@ all_methods = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitio mfs = [Membership.trimf, Membership.gaussmf, Membership.trapmf] +@numba.jit() +def sliding_window_simple_search(data, windowsize, model, partitions, orders, **kwargs): + + _3d = len(orders) > 1 + ret = [] + errors = np.array([[0 for k in range(len(partitions))] for kk in range(len(orders))]) + forecasted_best = [] + + figsize = kwargs.get('figsize', [10, 15]) + fig = plt.figure(figsize=figsize) + + plotforecasts = kwargs.get('plotforecasts',False) + if plotforecasts: + ax0 = fig.add_axes([0, 0.4, 0.9, 0.5]) # left, bottom, width, height + ax0.set_xlim([0, len(data)]) + ax0.set_ylim([min(data) * 0.9, max(data) * 1.1]) + ax0.set_title('Forecasts') + ax0.set_ylabel('F(T)') + ax0.set_xlabel('T') + min_rmse = 1000000.0 + best = None + + intervals = kwargs.get('intervals',False) + threshold = kwargs.get('threshold',0.5) + + progressbar = kwargs.get('progressbar', None) + + rng1 = enumerate(partitions, start=0) + + if progressbar: + from tqdm import tqdm + rng1 = enumerate(tqdm(partitions), start=0) + + for pc, p in rng1: + fs = Grid.GridPartitioner(data=data, npart=p) + + rng2 = enumerate(orders, start=0) + + if progressbar: + rng2 = enumerate(tqdm(orders), start=0) + + for oc, o in rng2: + _error = [] + for ct, train, test in Util.sliding_window(data, windowsize, 0.8, **kwargs): + fts = model("q = " + str(p) + " n = " + str(o), partitioner=fs) + fts.fit(train, order=o) + if not intervals: + forecasted = fts.forecast(test) + if not fts.has_seasonality: + _error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted[:-1])) ) + else: + _error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted)) ) + for kk in range(o): + forecasted.insert(0, None) + if plotforecasts: ax0.plot(forecasted, label=fts.name) + else: + forecasted = fts.forecast_interval(test) + _error.append( 1.0 - Measures.rmse_interval(np.array(test[o:]), np.array(forecasted[:-1])) ) + error = np.nanmean(_error) + errors[oc, pc] = error + if (min_rmse - error) > threshold: + min_rmse = error + best = fts + forecasted_best = forecasted + + # print(min_rmse) + if plotforecasts: + # handles0, labels0 = ax0.get_legend_handles_labels() + # ax0.legend(handles0, labels0) + elev = kwargs.get('elev', 30) + azim = kwargs.get('azim', 144) + ax0.plot(test, label="Original", linewidth=3.0, color="black") + if _3d: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim) + if not plotforecasts: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim) + # ax1 = fig.add_axes([0.6, 0.5, 0.45, 0.45], projection='3d') + if _3d: + ax1.set_title('Error Surface') + ax1.set_ylabel('Model order') + ax1.set_xlabel('Number of partitions') + ax1.set_zlabel('RMSE') + X, Y = np.meshgrid(partitions, orders) + surf = ax1.plot_surface(X, Y, errors, rstride=1, cstride=1, antialiased=True) + else: + ax1 = fig.add_axes([0, 1, 0.9, 0.9]) + ax1.set_title('Error Curve') + ax1.set_ylabel('Number of partitions') + ax1.set_xlabel('RMSE') + ax0.plot(errors,partitions) + ret.append(best) + ret.append(forecasted_best) + + # plt.tight_layout() + + file = kwargs.get('file', None) + save = kwargs.get('save', False) + + Util.show_and_save_image(fig, file, save) + + return ret + + def plot_sets(data, sets, titles, tam=[12, 10], save=False, file=None): num = len(sets) #fig = plt.figure(figsize=tam) @@ -69,4 +173,4 @@ def explore_partitioners(data, npart, methods=None, mf=None, tam=[12, 10], save= plot_partitioners(data, objs, tam, save, file) - return objs \ No newline at end of file + return objs diff --git a/pyFTS/tests/general.py b/pyFTS/tests/general.py index dc62f47..15844fa 100644 --- a/pyFTS/tests/general.py +++ b/pyFTS/tests/general.py @@ -9,6 +9,17 @@ import numpy as np import pandas as pd from pyFTS.common import Transformations -from pyFTS.data import INMET +from pyFTS.data import TAIEX -print(INMET.get_dataframe()) +dataset = TAIEX.get_data() + +from pyFTS.benchmarks import benchmarks as bchmk + +from pyFTS.models import pwfts + + +bchmk.sliding_window_benchmarks(dataset, 1000, train=0.8, inc=0.2, methods=[pwfts.ProbabilisticWeightedFTS], + benchmark_models=False, orders=[1], partitions=[10], #np.arange(10,100,2), + progress=False, type='distribution', + distributed=False, nodes=['192.168.0.106', '192.168.0.105', '192.168.0.110'], + save=True, file="pwfts_taiex_interval.csv") \ No newline at end of file