- Benchmark refactoring to expose a unified and simpler interface (the method sliding_window_benchmarks)

- Improvements at FTS.fit and FTS.predict
 - PWFTS bugfixes and improvements
This commit is contained in:
Petrônio Cândido 2018-04-09 11:09:28 -03:00
parent 8ac8fec14c
commit 1d7801bdbf
9 changed files with 755 additions and 870 deletions

View File

@ -5,12 +5,14 @@ pyFTS module for common benchmark metrics
"""
import time
import numba
import numpy as np
import pandas as pd
from pyFTS.common import FuzzySet,SortedCollection
from pyFTS.probabilistic import ProbabilityDistribution
def acf(data, k):
"""
Autocorrelation function estimative
@ -28,6 +30,7 @@ def acf(data, k):
return 1/((n-k)*sigma)*s
def rmse(targets, forecasts):
"""
Root Mean Squared Error
@ -42,6 +45,7 @@ def rmse(targets, forecasts):
return np.sqrt(np.nanmean((targets - forecasts) ** 2))
def rmse_interval(targets, forecasts):
"""
Root Mean Squared Error
@ -53,6 +57,7 @@ def rmse_interval(targets, forecasts):
return np.sqrt(np.nanmean((fmean - targets) ** 2))
def mape(targets, forecasts):
"""
Mean Average Percentual Error
@ -67,6 +72,7 @@ def mape(targets, forecasts):
return np.mean(np.abs(targets - forecasts) / targets) * 100
def smape(targets, forecasts, type=2):
"""
Symmetric Mean Average Percentual Error
@ -87,11 +93,13 @@ def smape(targets, forecasts, type=2):
return sum(np.abs(forecasts - targets)) / sum(forecasts + targets)
def mape_interval(targets, forecasts):
fmean = [np.mean(i) for i in forecasts]
return np.mean(abs(fmean - targets) / fmean) * 100
def UStatistic(targets, forecasts):
"""
Theil's U Statistic
@ -113,6 +121,7 @@ def UStatistic(targets, forecasts):
return np.sqrt(sum(y) / sum(naive))
def TheilsInequality(targets, forecasts):
"""
Theils Inequality Coefficient
@ -128,6 +137,7 @@ def TheilsInequality(targets, forecasts):
return us / (ys + fs)
def BoxPierceStatistic(data, h):
"""
Q Statistic for Box-Pierce test
@ -204,12 +214,15 @@ def pinball_mean(tau, targets, forecasts):
:param forecasts: list of prediction intervals
:return:
"""
preds = []
try:
if tau <= 0.5:
preds = [pinball(tau, targets[i], forecasts[i][0]) for i in np.arange(0, len(forecasts))]
else:
preds = [pinball(tau, targets[i], forecasts[i][1]) for i in np.arange(0, len(forecasts))]
return np.nanmean(preds)
except Exception as ex:
print(ex)
def pmf_to_cdf(density):
@ -259,18 +272,17 @@ def crps(targets, densities):
return _crps / float(l * n)
def get_point_statistics(data, model, indexer=None):
def get_point_statistics(data, model, **kwargs):
"""Condensate all measures for point forecasters"""
indexer = kwargs.get('indexer', None)
if indexer is not None:
ndata = np.array(indexer.get_data(data))
else:
ndata = np.array(data[model.order:])
if model.is_multivariate or indexer is None:
forecasts = model.forecast(data)
elif not model.is_multivariate and indexer is not None:
forecasts = model.forecast(indexer.get_data(data))
forecasts = model.predict(data, **kwargs)
try:
if model.has_seasonality:
@ -281,29 +293,18 @@ def get_point_statistics(data, model, indexer=None):
print(ex)
return [np.nan,np.nan,np.nan]
ret = list()
try:
ret.append(np.round(rmse(ndata, nforecasts), 2))
except Exception as ex:
print('Error in RMSE: {}'.format(ex))
ret.append(np.nan)
try:
ret.append(np.round(smape(ndata, nforecasts), 2))
except Exception as ex:
print('Error in SMAPE: {}'.format(ex))
ret.append(np.nan)
try:
ret.append(np.round(UStatistic(ndata, nforecasts), 2))
except Exception as ex:
print('Error in U: {}'.format(ex))
ret.append(np.nan)
return ret
def get_interval_statistics(original, model):
def get_interval_statistics(original, model, **kwargs):
"""Condensate all measures for point_to_interval forecasters"""
ret = list()
forecasts = model.forecast_interval(original)
forecasts = model.predict(original, **kwargs)
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
ret.append(round(coverage(original[model.order:], forecasts[:-1]), 2))
@ -314,27 +315,13 @@ def get_interval_statistics(original, model):
return ret
def get_distribution_statistics(original, model, steps, resolution):
def get_distribution_statistics(original, model, **kwargs):
ret = list()
try:
_s1 = time.time()
densities1 = model.forecast_ahead_distribution(original, steps, parameters=3)
densities1 = model.predict(original, **kwargs)
_e1 = time.time()
ret.append(round(crps(original, densities1), 3))
ret.append(round(_e1 - _s1, 3))
except Exception as e:
print('Erro: ', e)
ret.append(np.nan)
ret.append(np.nan)
try:
_s2 = time.time()
densities2 = model.forecast_ahead_distribution(original, steps, parameters=2)
_e2 = time.time()
ret.append( round(crps(original, densities2), 3))
ret.append(round(_e2 - _s2, 3))
except:
ret.append(np.nan)
ret.append(np.nan)
return ret

View File

@ -2,6 +2,7 @@
Benchmark utility functions
"""
import numba
import matplotlib as plt
import matplotlib.cm as cmx
import matplotlib.colors as pltcolors
@ -11,8 +12,6 @@ import pandas as pd
#from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import pandas as pd
from copy import deepcopy
from pyFTS.common import Util
@ -124,7 +123,7 @@ def save_dataframe_point(experiments, file, objs, rmse, save, synthetic, smape,
s = '-'
p = '-'
l = '-'
print([n, o, s, p, l])
tmp = [n, o, s, p, l, 'RMSE']
tmp.extend(rmse[k])
ret.append(deepcopy(tmp))
@ -194,23 +193,31 @@ def cast_dataframe_to_synthetic_point(infile, outfile, experiments):
dat.to_csv(outfile, sep=";", index=False)
def analytical_data_columns(experiments):
data_columns = [str(k) for k in np.arange(0, experiments)]
return data_columns
def scale_params(data):
vmin = np.nanmin(data)
vlen = np.nanmax(data) - vmin
return (vmin, vlen)
def scale(data, params):
ndata = [(k-params[0])/params[1] for k in data]
return ndata
def stats(measure, data):
print(measure, np.nanmean(data), np.nanstd(data))
def unified_scaled_point(experiments, tam, save=False, file=None,
sort_columns=['UAVG', 'RMSEAVG', 'USTD', 'RMSESTD'],
sort_ascend=[1, 1, 1, 1],save_best=False,
@ -320,6 +327,7 @@ def unified_scaled_point(experiments, tam, save=False, file=None,
Util.show_and_save_image(fig, file, save)
def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam, save=False, file=None,
sort_columns=['UAVG', 'RMSEAVG', 'USTD', 'RMSESTD'],
sort_ascend=[1, 1, 1, 1],save_best=False,
@ -375,6 +383,7 @@ def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam, save=F
Util.show_and_save_image(fig, file, save)
def check_replace_list(m, replace):
if replace is not None:
for r in replace:
@ -383,6 +392,7 @@ def check_replace_list(m, replace):
return m
def check_ignore_list(b, ignore):
flag = False
if ignore is not None:
@ -475,6 +485,7 @@ def save_dataframe_interval(coverage, experiments, file, objs, resolution, save,
if save: dat.to_csv(Util.uniquefilename(file), sep=";")
return dat
def interval_dataframe_analytic_columns(experiments):
columns = [str(k) for k in np.arange(0, experiments)]
columns.insert(0, "Model")
@ -486,12 +497,14 @@ def interval_dataframe_analytic_columns(experiments):
return columns
def interval_dataframe_synthetic_columns():
columns = ["Model", "Order", "Scheme", "Partitions", "SHARPAVG", "SHARPSTD", "RESAVG", "RESSTD", "COVAVG",
"COVSTD", "TIMEAVG", "TIMESTD", "Q05AVG", "Q05STD", "Q25AVG", "Q25STD", "Q75AVG", "Q75STD", "Q95AVG", "Q95STD"]
return columns
def cast_dataframe_to_synthetic_interval(infile, outfile, experiments):
columns = interval_dataframe_analytic_columns(experiments)
dat = pd.read_csv(infile, sep=";", usecols=columns)
@ -545,6 +558,7 @@ def cast_dataframe_to_synthetic_interval(infile, outfile, experiments):
dat.to_csv(outfile, sep=";", index=False)
def unified_scaled_interval(experiments, tam, save=False, file=None,
sort_columns=['COVAVG', 'SHARPAVG', 'COVSTD', 'SHARPSTD'],
sort_ascend=[True, False, True, True],save_best=False,
@ -643,6 +657,7 @@ def unified_scaled_interval(experiments, tam, save=False, file=None,
Util.show_and_save_image(fig, file, save)
def plot_dataframe_interval(file_synthetic, file_analytic, experiments, tam, save=False, file=None,
sort_columns=['COVAVG', 'SHARPAVG', 'COVSTD', 'SHARPSTD'],
sort_ascend=[True, False, True, True],save_best=False,
@ -698,6 +713,7 @@ def plot_dataframe_interval(file_synthetic, file_analytic, experiments, tam, sav
Util.show_and_save_image(fig, file, save)
def unified_scaled_interval_pinball(experiments, tam, save=False, file=None,
sort_columns=['COVAVG','SHARPAVG','COVSTD','SHARPSTD'],
sort_ascend=[True, False, True, True], save_best=False,
@ -795,6 +811,8 @@ def unified_scaled_interval_pinball(experiments, tam, save=False, file=None,
Util.show_and_save_image(fig, file, save)
def plot_dataframe_interval_pinball(file_synthetic, file_analytic, experiments, tam, save=False, file=None,
sort_columns=['COVAVG','SHARPAVG','COVSTD','SHARPSTD'],
sort_ascend=[True, False, True, True], save_best=False,
@ -846,7 +864,7 @@ def plot_dataframe_interval_pinball(file_synthetic, file_analytic, experiments,
Util.show_and_save_image(fig, file, save)
def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic):
def save_dataframe_probabilistic(experiments, file, objs, crps, times, save, synthetic):
"""
Save benchmark results for m-step ahead probabilistic forecasters
:param experiments:
@ -854,7 +872,7 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim
:param objs:
:param crps_interval:
:param crps_distr:
:param times1:
:param times:
:param times2:
:param save:
:param synthetic:
@ -881,13 +899,11 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim
mod.append('-')
mod.append('-')
l = '-'
mod.append(np.round(np.nanmean(crps_interval[k]), 2))
mod.append(np.round(np.nanstd(crps_interval[k]), 2))
mod.append(np.round(np.nanmean(crps_distr[k]), 2))
mod.append(np.round(np.nanstd(crps_distr[k]), 2))
mod.append(np.round(np.nanmean(crps[k]), 2))
mod.append(np.round(np.nanstd(crps[k]), 2))
mod.append(l)
mod.append(np.round(np.nanmean(times1[k]), 4))
mod.append(np.round(np.nanmean(times2[k]), 4))
mod.append(np.round(np.nanmean(times[k]), 4))
mod.append(np.round(np.nanstd(times[k]), 4))
ret.append(mod)
except Exception as e:
print('Erro: %s' % e)
@ -895,7 +911,7 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim
print("Erro ao salvar ", k)
print("Exceção ", ex)
columns = ahead_dataframe_synthetic_columns()
columns = probabilistic_dataframe_synthetic_columns()
else:
for k in sorted(objs.keys()):
try:
@ -910,28 +926,23 @@ def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, tim
s = '-'
p = '-'
l = '-'
tmp = [n, o, s, p, l, 'CRPS_Interval']
tmp.extend(crps_interval[k])
tmp = [n, o, s, p, l, 'CRPS']
tmp.extend(crps[k])
ret.append(deepcopy(tmp))
tmp = [n, o, s, p, l, 'CRPS_Distribution']
tmp.extend(crps_distr[k])
ret.append(deepcopy(tmp))
tmp = [n, o, s, p, l, 'TIME_Interval']
tmp.extend(times1[k])
ret.append(deepcopy(tmp))
tmp = [n, o, s, p, l, 'TIME_Distribution']
tmp.extend(times2[k])
tmp = [n, o, s, p, l, 'TIME']
tmp.extend(times[k])
ret.append(deepcopy(tmp))
except Exception as ex:
print("Erro ao salvar ", k)
print("Exceção ", ex)
columns = ahead_dataframe_analytic_columns(experiments)
columns = probabilistic_dataframe_analytic_columns(experiments)
dat = pd.DataFrame(ret, columns=columns)
if save: dat.to_csv(Util.uniquefilename(file), sep=";")
return dat
def ahead_dataframe_analytic_columns(experiments):
def probabilistic_dataframe_analytic_columns(experiments):
columns = [str(k) for k in np.arange(0, experiments)]
columns.insert(0, "Model")
columns.insert(1, "Order")
@ -942,14 +953,14 @@ def ahead_dataframe_analytic_columns(experiments):
return columns
def ahead_dataframe_synthetic_columns():
columns = ["Model", "Order", "Scheme", "Partitions", "CRPS1AVG", "CRPS1STD", "CRPS2AVG", "CRPS2STD",
"TIME1AVG", "TIME1STD", "TIME2AVG", "TIME2STD"]
def probabilistic_dataframe_synthetic_columns():
columns = ["Model", "Order", "Scheme", "Partitions", "CRPSAVG", "CRPSSTD",
"TIMEAVG", "TIMESTD"]
return columns
def cast_dataframe_to_synthetic_ahead(infile, outfile, experiments):
columns = ahead_dataframe_analytic_columns(experiments)
def cast_dataframe_to_synthetic_probabilistic(infile, outfile, experiments):
columns = probabilistic_dataframe_analytic_columns(experiments)
dat = pd.read_csv(infile, sep=";", usecols=columns)
models = dat.Model.unique()
orders = dat.Order.unique()
@ -967,36 +978,31 @@ def cast_dataframe_to_synthetic_ahead(infile, outfile, experiments):
mod = []
df = dat[(dat.Model == m) & (dat.Order == o) & (dat.Scheme == s) & (dat.Partitions == p)]
if not df.empty:
crps1 = extract_measure(df, 'CRPS_Interval', data_columns)
crps2 = extract_measure(df, 'CRPS_Distribution', data_columns)
times1 = extract_measure(df, 'TIME_Interval', data_columns)
times2 = extract_measure(df, 'TIME_Distribution', data_columns)
crps1 = extract_measure(df, 'CRPS', data_columns)
times1 = extract_measure(df, 'TIME', data_columns)
mod.append(m)
mod.append(o)
mod.append(s)
mod.append(p)
mod.append(np.round(np.nanmean(crps1), 2))
mod.append(np.round(np.nanstd(crps1), 2))
mod.append(np.round(np.nanmean(crps2), 2))
mod.append(np.round(np.nanstd(crps2), 2))
mod.append(np.round(np.nanmean(times1), 2))
mod.append(np.round(np.nanstd(times1), 2))
mod.append(np.round(np.nanmean(times2), 4))
mod.append(np.round(np.nanstd(times2), 4))
ret.append(mod)
dat = pd.DataFrame(ret, columns=ahead_dataframe_synthetic_columns())
dat = pd.DataFrame(ret, columns=probabilistic_dataframe_synthetic_columns())
dat.to_csv(outfile, sep=";", index=False)
def unified_scaled_ahead(experiments, tam, save=False, file=None,
sort_columns=['CRPS1AVG', 'CRPS2AVG', 'CRPS1STD', 'CRPS2STD'],
sort_ascend=[True, True, True, True], save_best=False,
ignore=None, replace=None):
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=tam)
axes[0].set_title('CRPS Interval Ahead')
axes[1].set_title('CRPS Distribution Ahead')
def unified_scaled_probabilistic(experiments, tam, save=False, file=None,
sort_columns=['CRPSAVG', 'CRPSSTD'],
sort_ascend=[True, True], save_best=False,
ignore=None, replace=None):
fig, axes = plt.subplots(nrows=1, ncols=1, figsize=tam)
axes.set_title('CRPS')
#axes[1].set_title('CRPS Distribution Ahead')
models = {}
@ -1006,11 +1012,11 @@ def unified_scaled_ahead(experiments, tam, save=False, file=None,
mdl = {}
dat_syn = pd.read_csv(experiment[0], sep=";", usecols=ahead_dataframe_synthetic_columns())
dat_syn = pd.read_csv(experiment[0], sep=";", usecols=probabilistic_dataframe_synthetic_columns())
bests = find_best(dat_syn, sort_columns, sort_ascend)
dat_ana = pd.read_csv(experiment[1], sep=";", usecols=ahead_dataframe_analytic_columns(experiment[2]))
dat_ana = pd.read_csv(experiment[1], sep=";", usecols=probabilistic_dataframe_analytic_columns(experiment[2]))
crps1 = []
crps2 = []
@ -1070,21 +1076,22 @@ def unified_scaled_ahead(experiments, tam, save=False, file=None,
Util.show_and_save_image(fig, file, save)
def plot_dataframe_ahead(file_synthetic, file_analytic, experiments, tam, save=False, file=None,
def plot_dataframe_probabilistic(file_synthetic, file_analytic, experiments, tam, save=False, file=None,
sort_columns=['CRPS1AVG', 'CRPS2AVG', 'CRPS1STD', 'CRPS2STD'],
sort_ascend=[True, True, True, True], save_best=False,
ignore=None, replace=None):
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=tam)
axes[0].set_title('CRPS Interval Ahead')
axes[1].set_title('CRPS Distribution Ahead')
axes[0].set_title('CRPS')
axes[1].set_title('CRPS')
dat_syn = pd.read_csv(file_synthetic, sep=";", usecols=ahead_dataframe_synthetic_columns())
dat_syn = pd.read_csv(file_synthetic, sep=";", usecols=probabilistic_dataframe_synthetic_columns())
bests = find_best(dat_syn, sort_columns, sort_ascend)
dat_ana = pd.read_csv(file_analytic, sep=";", usecols=ahead_dataframe_analytic_columns(experiments))
dat_ana = pd.read_csv(file_analytic, sep=";", usecols=probabilistic_dataframe_analytic_columns(experiments))
data_columns = analytical_data_columns(experiments)

View File

@ -6,6 +6,7 @@
import datetime
import time
import numba
from copy import deepcopy
import matplotlib as plt
@ -20,15 +21,15 @@ from pyFTS.models import song, chen, yu, ismailefendi, sadaei, hofts, pwfts, ift
from pyFTS.models.ensemble import ensemble
from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, quantreg
from pyFTS.benchmarks import Util as bUtil
from pyFTS.common import Util
from pyFTS.common import Util as cUtil
# from sklearn.cross_validation import KFold
from pyFTS.partitioners import Grid
from matplotlib import rc
rc('font',**{'family':'sans-serif','sans-serif':['Helvetica']})
#rc('font',**{'family':'sans-serif','sans-serif':['Helvetica']})
## for Palatino and other serif fonts use:
#rc('font',**{'family':'serif','serif':['Palatino']})
rc('text', usetex=True)
#rc('text', usetex=True)
colors = ['grey', 'darkgrey', 'rosybrown', 'maroon', 'red','orange', 'gold', 'yellow', 'olive', 'green',
'darkgreen', 'cyan', 'lightblue','blue', 'darkblue', 'purple', 'darkviolet' ]
@ -40,6 +41,193 @@ styles = ['-','--','-.',':','.']
nsty = len(styles)
def __pop(key, default, kwargs):
if key in kwargs:
return kwargs.pop(key)
else:
return default
def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
"""
Sliding window benchmarks for FTS point forecasters
:param data:
:param windowsize: size of sliding window
:param train: percentual of sliding window data used to train the models
:param models: FTS point forecasters
:param partitioners: Universe of Discourse partitioner
:param partitions: the max number of partitions on the Universe of Discourse
:param max_order: the max order of the models (for high order models)
:param transformation: data transformation
:param indexer: seasonal indexer
:param dump:
:param benchmark_methods: Non FTS models to benchmark
:param benchmark_methods_parameters: Non FTS models parameters
:param save: save results
:param file: file path to save the results
:param sintetic: if true only the average and standard deviation of the results
:return: DataFrame with the results
"""
distributed = __pop('distributed', False, kwargs)
save = __pop('save', False, kwargs)
transformation = kwargs.get('transformation', None)
progress = kwargs.get('progress', None)
type = kwargs.get("type", 'point')
orders = __pop("orders", [1,2,3], kwargs)
partitioners_models = __pop("partitioners_models", None, kwargs)
partitioners_methods = __pop("partitioners_methods", [Grid.GridPartitioner], kwargs)
partitions = __pop("partitions", [10], kwargs)
methods = __pop('methods', None, kwargs)
models = __pop('models', None, kwargs)
pool = [] if models is None else models
if models is None and methods is None:
if type == 'point':
methods = get_point_methods()
elif type == 'interval':
methods = get_interval_methods()
elif type == 'distribution':
methods = get_probabilistic_methods()
if models is None:
for method in methods:
mfts = method("")
if mfts.is_high_order:
for order in orders:
if order >= mfts.min_order:
mfts = method("")
mfts.order = order
pool.append(mfts)
else:
mfts.order = 1
pool.append(mfts)
benchmark_models = __pop("benchmark_models", None, kwargs)
benchmark_methods = __pop("benchmark_methods", None, kwargs)
benchmark_methods_parameters = __pop("benchmark_methods_parameters", None, kwargs)
if benchmark_models != False:
if benchmark_models is None and benchmark_methods is None:
if type == 'point'or type == 'partition':
benchmark_methods = get_benchmark_point_methods()
elif type == 'interval':
benchmark_methods = get_benchmark_interval_methods()
elif type == 'distribution':
benchmark_methods = get_benchmark_probabilistic_methods()
if benchmark_models is not None:
pool.extend(benchmark_models)
elif benchmark_methods is not None:
for count, model in enumerate(benchmark_methods, start=0):
par = benchmark_methods_parameters[count]
mfts = model(str(par if par is not None else ""))
mfts.order = par
pool.append(mfts)
if type == 'point':
experiment_method = run_point
synthesis_method = process_point_jobs
elif type == 'interval':
experiment_method = run_interval
synthesis_method = process_interval_jobs
elif type == 'distribution':
experiment_method = run_probabilistic
synthesis_method = process_probabilistic_jobs
if distributed:
import dispy, dispy.httpd
nodes = kwargs.get("nodes", ['127.0.0.1'])
cluster, http_server = cUtil.start_dispy_cluster(experiment_method, nodes)
experiments = 0
jobs = []
if progress:
from tqdm import tqdm
progressbar = tqdm(total=len(data), desc="Sliding Window:")
inc = __pop("inc", 0.1, kwargs)
for ct, train, test in cUtil.sliding_window(data, windowsize, train, inc=inc, **kwargs):
experiments += 1
if progress:
progressbar.update(windowsize * inc)
partitioners_pool = []
if partitioners_models is None:
for partition in partitions:
for partitioner in partitioners_methods:
data_train_fs = partitioner(data=train, npart=partition, transformation=transformation)
partitioners_pool.append(data_train_fs)
else:
partitioners_pool = partitioners_models
rng1 = partitioners_pool
if progress:
rng1 = tqdm(partitioners_pool, desc="Partitioners")
for partitioner in rng1:
rng2 = enumerate(pool,start=0)
if progress:
rng2 = enumerate(tqdm(pool, desc="Models"),start=0)
for _id, model in rng2:
if not distributed:
job = experiment_method(deepcopy(model), deepcopy(partitioner), train, test, **kwargs)
jobs.append(job)
else:
job = cluster.submit(deepcopy(model), deepcopy(partitioner), train, test, **kwargs)
job.id = id # associate an ID to identify jobs (if needed later)
jobs.append(job)
if progress:
progressbar.close()
if distributed:
jobs2 = []
rng = jobs
cluster.wait() # wait for all jobs to finish
if progress:
rng = tqdm(jobs)
for job in rng:
if job.status == dispy.DispyJob.Finished and job is not None:
tmp = job()
jobs2.append(tmp)
jobs = deepcopy(jobs2)
cUtil.stop_dispy_cluster(cluster, http_server)
file = kwargs.get('file', None)
sintetic = kwargs.get('sintetic', False)
return synthesis_method(jobs, experiments, save, file, sintetic)
def get_benchmark_point_methods():
"""Return all non FTS methods for point forecasting"""
return [naive.Naive, arima.ARIMA, quantreg.QuantileRegression]
@ -64,12 +252,17 @@ def get_interval_methods():
def get_probabilistic_methods():
"""Return all FTS methods for probabilistic forecasting"""
return [arima.ARIMA, ensemble.AllMethodEnsembleFTS, pwfts.ProbabilisticWeightedFTS]
return [ensemble.AllMethodEnsembleFTS, pwfts.ProbabilisticWeightedFTS]
def run_point(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None):
def get_benchmark_probabilistic_methods():
"""Return all FTS methods for probabilistic forecasting"""
return [arima.ARIMA, quantreg.QuantileRegression]
def run_point(mfts, partitioner, train_data, test_data, window_key=None, **kwargs):
"""
Point forecast benchmark function to be executed on sliding window
Point forecast benchmark function to be executed on cluster nodes
:param mfts: FTS model
:param partitioner: Universe of Discourse partitioner
:param train_data: data used to train the model
@ -79,6 +272,26 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo
:param indexer: seasonal indexer
:return: a dictionary with the benchmark results
"""
import time
from pyFTS.models import yu, chen, hofts, pwfts,ismailefendi,sadaei, song, cheng, hwang
from pyFTS.partitioners import Grid, Entropy, FCM
from pyFTS.benchmarks import Measures, naive, arima, quantreg
from pyFTS.common import Transformations
tmp = [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS,
cheng.TrendWeightedFTS, sadaei.ExponentialyWeightedFTS, hofts.HighOrderFTS, hwang.HighOrderFTS,
pwfts.ProbabilisticWeightedFTS]
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
tmp4 = [naive.Naive, arima.ARIMA, quantreg.QuantileRegression]
tmp3 = [Measures.get_point_statistics]
tmp5 = [Transformations.Differential]
transformation = kwargs.get('transformation', None)
indexer = kwargs.get('indexer', None)
if mfts.benchmark_only:
_key = mfts.shortname + str(mfts.order if mfts.order is not None else "")
@ -86,16 +299,17 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.train(train_data, sets=partitioner.sets, order=mfts.order)
mfts.fit(train_data, order=mfts.order, **kwargs)
_end = time.time()
times = _end - _start
_start = time.time()
_rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer)
_rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, **kwargs)
_end = time.time()
times += _end - _start
@ -104,113 +318,120 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo
return ret
def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
benchmark_models=None, benchmark_models_parameters = None,
save=False, file=None, sintetic=False):
def run_interval(mfts, partitioner, train_data, test_data, window_key=None, **kwargs):
"""
Sliding window benchmarks for FTS point forecasters
:param data:
:param windowsize: size of sliding window
:param train: percentual of sliding window data used to train the models
:param models: FTS point forecasters
:param partitioners: Universe of Discourse partitioner
:param partitions: the max number of partitions on the Universe of Discourse
:param max_order: the max order of the models (for high order models)
Interval forecast benchmark function to be executed on cluster nodes
:param mfts: FTS model
:param partitioner: Universe of Discourse partitioner
:param train_data: data used to train the model
:param test_data: ata used to test the model
:param window_key: id of the sliding window
:param transformation: data transformation
:param indexer: seasonal indexer
:param dump:
:param benchmark_models: Non FTS models to benchmark
:param benchmark_models_parameters: Non FTS models parameters
:param save: save results
:param file: file path to save the results
:param sintetic: if true only the average and standard deviation of the results
:return: DataFrame with the results
:return: a dictionary with the benchmark results
"""
import time
from pyFTS.models import hofts,ifts,pwfts
from pyFTS.partitioners import Grid, Entropy, FCM
from pyFTS.benchmarks import Measures, arima, quantreg
if benchmark_models is None: # and models is None:
benchmark_models = [naive.Naive, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA,
quantreg.QuantileRegression, quantreg.QuantileRegression]
tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS]
if benchmark_models_parameters is None:
benchmark_models_parameters = [1, (1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2]
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
_process_start = time.time()
tmp4 = [arima.ARIMA, quantreg.QuantileRegression]
print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now()))
tmp3 = [Measures.get_interval_statistics]
pool = []
jobs = []
objs = {}
rmse = {}
smape = {}
u = {}
times = {}
transformation = kwargs.get('transformation', None)
indexer = kwargs.get('indexer', None)
if models is None:
models = get_point_methods()
for model in models:
mfts = model("")
if mfts.is_high_order:
for order in np.arange(1, max_order + 1):
if order >= mfts.min_order:
mfts = model("")
mfts.order = order
pool.append(mfts)
if mfts.benchmark_only:
_key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha)
else:
mfts.order = 1
pool.append(mfts)
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if benchmark_models is not None:
for count, model in enumerate(benchmark_models, start=0):
par = benchmark_models_parameters[count]
mfts = model(str(par if par is not None else ""))
mfts.order = par
pool.append(mfts)
if transformation is not None:
mfts.append_transformation(transformation)
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
experiments += 1
_start = time.time()
mfts.fit(train_data, order=mfts.order, **kwargs)
_end = time.time()
times = _end - _start
benchmarks_only = {}
_start = time.time()
_sharp, _res, _cov, _q05, _q25, _q75, _q95 = Measures.get_interval_statistics(test_data, mfts, **kwargs)
_end = time.time()
times += _end - _start
if dump: print('\nWindow: {0}\n'.format(ct))
ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times,
'Q05': _q05, 'Q25': _q25, 'Q75': _q75, 'Q95': _q95, 'window': window_key}
for partition in partitions:
return ret
for partitioner in partitioners:
data_train_fs = partitioner(data=train, npart=partition, transformation=transformation)
def run_probabilistic(mfts, partitioner, train_data, test_data, window_key=None, **kwargs):
"""
Probabilistic forecast benchmark function to be executed on cluster nodes
:param mfts: FTS model
:param partitioner: Universe of Discourse partitioner
:param train_data: data used to train the model
:param test_data: ata used to test the model
:param steps:
:param resolution:
:param window_key: id of the sliding window
:param transformation: data transformation
:param indexer: seasonal indexer
:return: a dictionary with the benchmark results
"""
import time
import numpy as np
from pyFTS.models import hofts, ifts, pwfts
from pyFTS.models.ensemble import ensemble
from pyFTS.partitioners import Grid, Entropy, FCM
from pyFTS.benchmarks import Measures, arima
from pyFTS.models.seasonal import SeasonalIndexer
for _id, m in enumerate(pool,start=0):
if m.benchmark_only and m.shortname in benchmarks_only:
continue
tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS, arima.ARIMA, ensemble.AllMethodEnsembleFTS]
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
tmp3 = [Measures.get_distribution_statistics, SeasonalIndexer.SeasonalIndexer, SeasonalIndexer.LinearSeasonalIndexer]
transformation = kwargs.get('transformation', None)
indexer = kwargs.get('indexer', None)
if mfts.benchmark_only:
_key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha)
else:
benchmarks_only[m.shortname] = m
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
tmp = run_point(deepcopy(m), data_train_fs, train, test, ct, transformation)
if transformation is not None:
mfts.append_transformation(transformation)
if tmp['key'] not in objs:
objs[tmp['key']] = tmp['obj']
rmse[tmp['key']] = []
smape[tmp['key']] = []
u[tmp['key']] = []
times[tmp['key']] = []
rmse[tmp['key']].append_rhs(tmp['rmse'])
smape[tmp['key']].append_rhs(tmp['smape'])
u[tmp['key']].append_rhs(tmp['u'])
times[tmp['key']].append_rhs(tmp['time'])
print(tmp['key'], tmp['window'])
if mfts.has_seasonality:
mfts.indexer = indexer
_process_end = time.time()
try:
_start = time.time()
mfts.fit(train_data, order=mfts.order)
_end = time.time()
times = _end - _start
print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now()))
_crps1, _t1 = Measures.get_distribution_statistics(test_data, mfts, **kwargs)
_t1 += times
except Exception as e:
print(e)
_crps1 = np.nan
_t1 = np.nan
print("Process Duration: {0}".format(_process_end - _process_start))
ret = {'key': _key, 'obj': mfts, 'CRPS': _crps1, 'time': _t1, 'window': window_key}
return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u)
return ret
def build_model_pool_point(models, max_order, benchmark_models, benchmark_models_parameters):
@ -239,71 +460,84 @@ def build_model_pool_point(models, max_order, benchmark_models, benchmark_models
return pool
def all_point_forecasters(data_train, data_test, partitions, max_order=3, statistics=True, residuals=True,
series=True, save=False, file=None, tam=[20, 5], models=None, transformation=None,
distributions=False, benchmark_models=None, benchmark_models_parameters=None):
"""
Fixed data benchmark for FTS point forecasters
:param data_train: data used to train the models
:param data_test: data used to test the models
:param partitions: the max number of partitions on the Universe of Discourse
:param max_order: the max order of the models (for high order models)
:param statistics: print statistics
:param residuals: print and plot residuals
:param series: plot time series
:param save: save results
:param file: file path to save the results
:param tam: figure dimensions to plot the graphs
:param models: list of models to benchmark
:param transformation: data transformation
:param distributions: plot distributions
:return:
"""
models = build_model_pool_point(models, max_order, benchmark_models, benchmark_models_parameters)
def process_point_jobs(jobs, experiments, save=False, file=None, sintetic=False):
objs = {}
rmse = {}
smape = {}
u = {}
times = {}
objs = []
for job in jobs:
_key = job['key']
if _key not in objs:
objs[_key] = job['obj']
rmse[_key] = []
smape[_key] = []
u[_key] = []
times[_key] = []
rmse[_key].append(job['rmse'])
smape[_key].append(job['smape'])
u[_key].append(job['u'])
times[_key].append(job['time'])
data_train_fs = Grid.GridPartitioner(data=data_train, npart=partitions, transformation=transformation)
return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u)
count = 1
lcolors = []
def process_interval_jobs(jobs, experiments, save=False, file=None, sintetic=False):
objs = {}
sharpness = {}
resolution = {}
coverage = {}
q05 = {}
q25 = {}
q75 = {}
q95 = {}
times = {}
for count, model in enumerate(models, start=0):
#print(model)
if transformation is not None:
model.append_transformation(transformation)
model.train(data_train, sets=data_train_fs.sets, order=model.order)
objs.append(model)
lcolors.append( colors[count % ncol] )
for job in jobs:
_key = job['key']
if _key not in objs:
objs[_key] = job['obj']
sharpness[_key] = []
resolution[_key] = []
coverage[_key] = []
times[_key] = []
q05[_key] = []
q25[_key] = []
q75[_key] = []
q95[_key] = []
if statistics:
print_point_statistics(data_test, objs)
sharpness[_key].append(job['sharpness'])
resolution[_key].append(job['resolution'])
coverage[_key].append(job['coverage'])
times[_key].append(job['time'])
q05[_key].append(job['Q05'])
q25[_key].append(job['Q25'])
q75[_key].append(job['Q75'])
q95[_key].append(job['Q95'])
if residuals:
print(ResidualAnalysis.compare_residuals(data_test, objs))
ResidualAnalysis.plot_residuals(data_test, objs, save=save, file=file, tam=tam)
if series:
plot_compared_series(data_test, objs, lcolors, typeonlegend=False, save=save, file=file, tam=tam,
intervals=False)
return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic,
times, q05, q25, q75, q95)
if distributions:
lcolors.insert(0,'black')
pmfs = []
pmfs.append(
ProbabilityDistribution.ProbabilityDistribution("Original", 100, [min(data_test), max(data_test)], data=data_test) )
for m in objs:
forecasts = m.forecast(data_test)
pmfs.append(
ProbabilityDistribution.ProbabilityDistribution(m.shortname, 100, [min(data_test), max(data_test)],
data=forecasts))
print(getProbabilityDistributionStatistics(pmfs,data_test))
def process_probabilistic_jobs(jobs, experiments, save=False, file=None, sintetic=False):
objs = {}
crps = {}
times = {}
plot_probability_distributions(pmfs, lcolors, tam=tam)
for job in jobs:
_key = job['key']
if _key not in objs:
objs[_key] = job['obj']
crps[_key] = []
times[_key] = []
crps[_key].append(job['CRPS'])
times[_key].append(job['time'])
return bUtil.save_dataframe_probabilistic(experiments, file, objs, crps, times, save, sintetic)
return models
def print_point_statistics(data, models, externalmodels = None, externalforecasts = None, indexers=None):
@ -330,163 +564,6 @@ def print_point_statistics(data, models, externalmodels = None, externalforecast
def getProbabilityDistributionStatistics(pmfs, data):
ret = "Model & Entropy & Empirical Likelihood & Pseudo Likelihood \\\\ \n"
for k in pmfs:
ret += k.name + " & "
ret += str(k.entropy()) + " & "
ret += str(k.empiricalloglikelihood())+ " & "
ret += str(k.pseudologlikelihood(data))
ret += " \\\\ \n"
return ret
def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, synthetic=True):
if models is None:
models = get_interval_methods()
objs = {}
lcolors = {}
sharpness = {}
resolution = {}
coverage = {}
times = {}
experiments = 0
for ct, training,test in Util.sliding_window(data, windowsize, train):
experiments += 1
for partition in partitions:
for partitioner in partitioners:
pttr = str(partitioner.__module__).split('.')[-1]
data_train_fs = partitioner(data=training, npart=partition, transformation=transformation)
for count, model in enumerate(models, start=0):
mfts = model("")
_key = mfts.shortname + " " + pttr+ " q = " +str(partition)
mfts.partitioner = data_train_fs
if not mfts.is_high_order:
if dump: print(ct,_key)
if _key not in objs:
objs[_key] = mfts
lcolors[_key] = colors[count % ncol]
sharpness[_key] = []
resolution[_key] = []
coverage[_key] = []
times[_key] = []
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.train(training, sets=data_train_fs.sets)
_end = time.time()
_tdiff = _end - _start
_start = time.time()
_sharp, _res, _cov = Measures.get_interval_statistics(test, mfts)
_end = time.time()
_tdiff += _end - _start
sharpness[_key].append_rhs(_sharp)
resolution[_key].append_rhs(_res)
coverage[_key].append_rhs(_cov)
times[_key].append_rhs(_tdiff)
else:
for order in np.arange(1, max_order + 1):
if order >= mfts.min_order:
mfts = model("")
_key = mfts.shortname + " n = " + str(order) + " " + pttr + " q = " + str(partition)
mfts.partitioner = data_train_fs
if dump: print(ct,_key)
if _key not in objs:
objs[_key] = mfts
lcolors[_key] = colors[count % ncol]
sharpness[_key] = []
resolution[_key] = []
coverage[_key] = []
times[_key] = []
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.train(training, sets=data_train_fs.sets, order=order)
_end = time.time()
_tdiff = _end - _start
_start = time.time()
_sharp, _res, _cov = Measures.get_interval_statistics(test, mfts)
_end = time.time()
_tdiff += _end - _start
sharpness[_key].append_rhs(_sharp)
resolution[_key].append_rhs(_res)
coverage[_key].append_rhs(_cov)
times[_key].append_rhs(_tdiff)
return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times)
def build_model_pool_interval(models, max_order, benchmark_models, benchmark_models_parameters):
pool = []
if models is None:
models = get_interval_methods()
for model in models:
mfts = model("")
if mfts.is_high_order:
for order in np.arange(1, max_order + 1):
if order >= mfts.min_order:
mfts = model("")
mfts.order = order
pool.append(mfts)
else:
mfts.order = 1
pool.append(mfts)
alphas = [0.05, 0.25]
if benchmark_models is not None:
for count, model in enumerate(benchmark_models, start=0):
par = benchmark_models_parameters[count]
for alpha in alphas:
mfts = model(str(alpha), alpha=alpha)
mfts.order = par
pool.append(mfts)
return pool
def all_interval_forecasters(data_train, data_test, partitions, max_order=3,save=False, file=None, tam=[20, 5],
statistics=False, models=None, transformation=None,
benchmark_models=None, benchmark_models_parameters=None):
models = build_model_pool_interval(models, max_order, benchmark_models, benchmark_models_parameters)
data_train_fs = Grid.GridPartitioner(data=data_train, npart=partitions, transformation=transformation).sets
lcolors = []
objs = []
for count, model in Util.enumerate2(models, start=0, step=2):
if transformation is not None:
model.append_transformation(transformation)
model.train(data_train, sets=data_train_fs, order=model.order)
objs.append(model)
lcolors.append( colors[count % ncol] )
if statistics:
print_interval_statistics(data_test, objs)
plot_compared_series(data_test, objs, lcolors, typeonlegend=False, save=save, file=file, tam=tam,
points=False, intervals=True)
def print_interval_statistics(original, models):
ret = "Model & Order & Sharpness & Resolution & Coverage & .05 & .25 & .75 & .95 \\\\ \n"
for fts in models:
@ -503,6 +580,7 @@ def print_interval_statistics(original, models):
print(ret)
def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False, ls='-', linewidth=1):
lower = [kk[0] for kk in intervals]
upper = [kk[1] for kk in intervals]
@ -517,6 +595,7 @@ def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False
return [mi, ma]
def plot_compared_series(original, models, colors, typeonlegend=False, save=False, file=None, tam=[20, 5],
points=True, intervals=True, linewidth=1.5):
"""
@ -596,6 +675,7 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]):
ax.legend(handles0, labels0)
def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution = None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, synthetic=False):
@ -610,7 +690,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution
times2 = {}
experiments = 0
for ct, train,test in Util.sliding_window(data, windowsize, train):
for ct, train,test in cUtil.sliding_window(data, windowsize, train):
experiments += 1
for partition in partitions:
for partitioner in partitioners:
@ -692,6 +772,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution
return bUtil.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic)
def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resolution = None, max_order=3,save=False, file=None, tam=[20, 5],
models=None, transformation=None, option=2):
if models is None:
@ -704,7 +785,7 @@ def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resol
data_train_fs = Grid.GridPartitioner(data=data_train, npart=partitions, transformation=transformation).sets
lcolors = []
for count, model in Util.enumerate2(models, start=0, step=2):
for count, model in cUtil.enumerate2(models, start=0, step=2):
mfts = model("")
if not mfts.is_high_order:
if transformation is not None:
@ -746,6 +827,7 @@ def print_distribution_statistics(original, models, steps, resolution):
print(ret)
def plot_compared_intervals_ahead(original, models, colors, distributions, time_from, time_to, intervals = True,
save=False, file=None, tam=[20, 5], resolution=None,
cmap='Blues', linewidth=1.5):
@ -820,7 +902,8 @@ def plot_compared_intervals_ahead(original, models, colors, distributions, time_
ax.set_xlabel('T')
ax.set_xlim([0, len(original)])
Util.show_and_save_image(fig, file, save, lgd=lgd)
cUtil.show_and_save_image(fig, file, save, lgd=lgd)
def plot_density_rectange(ax, cmap, density, fig, resolution, time_from, time_to):
@ -845,6 +928,7 @@ def plot_density_rectange(ax, cmap, density, fig, resolution, time_from, time_to
from pyFTS.common import Transformations
def plot_probabilitydistribution_density(ax, cmap, probabilitydist, fig, time_from):
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
@ -1102,86 +1186,11 @@ def simpleSearch_RMSE(train, test, model, partitions, orders, save=False, file=N
# plt.tight_layout()
Util.show_and_save_image(fig, file, save)
cUtil.show_and_save_image(fig, file, save)
return ret
def sliding_window_simple_search(data, windowsize, model, partitions, orders, save=False, file=None, tam=[10, 15],
plotforecasts=False, elev=30, azim=144, intervals=False, parameters=None):
_3d = len(orders) > 1
ret = []
errors = np.array([[0 for k in range(len(partitions))] for kk in range(len(orders))])
forecasted_best = []
fig = plt.figure(figsize=tam)
# fig.suptitle("Comparação de modelos ")
if plotforecasts:
ax0 = fig.add_axes([0, 0.4, 0.9, 0.5]) # left, bottom, width, height
ax0.set_xlim([0, len(data)])
ax0.set_ylim([min(data) * 0.9, max(data) * 1.1])
ax0.set_title('Forecasts')
ax0.set_ylabel('F(T)')
ax0.set_xlabel('T')
min_rmse = 1000000.0
best = None
for pc, p in enumerate(partitions, start=0):
sets = Grid.GridPartitioner(data=data, npart=p).sets
for oc, o in enumerate(orders, start=0):
_error = []
for ct, train, test in Util.sliding_window(data, windowsize, 0.8):
fts = model("q = " + str(p) + " n = " + str(o))
fts.train(data, sets=sets, order=o, parameters=parameters)
if not intervals:
forecasted = fts.forecast(test)
if not fts.has_seasonality:
_error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted[:-1])) )
else:
_error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted)) )
for kk in range(o):
forecasted.insert(0, None)
if plotforecasts: ax0.plot(forecasted, label=fts.name)
else:
forecasted = fts.forecast_interval(test)
_error.append( 1.0 - Measures.rmse_interval(np.array(test[o:]), np.array(forecasted[:-1])) )
error = np.nanmean(_error)
errors[oc, pc] = error
if error < min_rmse:
min_rmse = error
best = fts
forecasted_best = forecasted
# print(min_rmse)
if plotforecasts:
# handles0, labels0 = ax0.get_legend_handles_labels()
# ax0.legend(handles0, labels0)
ax0.plot(test, label="Original", linewidth=3.0, color="black")
if _3d: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim)
if not plotforecasts: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim)
# ax1 = fig.add_axes([0.6, 0.5, 0.45, 0.45], projection='3d')
if _3d:
ax1.set_title('Error Surface')
ax1.set_ylabel('Model order')
ax1.set_xlabel('Number of partitions')
ax1.set_zlabel('RMSE')
X, Y = np.meshgrid(partitions, orders)
surf = ax1.plot_surface(X, Y, errors, rstride=1, cstride=1, antialiased=True)
else:
ax1 = fig.add_axes([0, 1, 0.9, 0.9])
ax1.set_title('Error Curve')
ax1.set_ylabel('Number of partitions')
ax1.set_xlabel('RMSE')
ax0.plot(errors,partitions)
ret.append(best)
ret.append(forecasted_best)
# plt.tight_layout()
Util.show_and_save_image(fig, file, save)
return ret
def pftsExploreOrderAndPartitions(data,save=False, file=None):
fig, axes = plt.subplots(nrows=4, ncols=1, figsize=[6, 8])
@ -1242,5 +1251,5 @@ def pftsExploreOrderAndPartitions(data,save=False, file=None):
plt.tight_layout()
Util.show_and_save_image(fig, file, save)
cUtil.show_and_save_image(fig, file, save)

View File

@ -8,6 +8,7 @@ python3 /usr/local/bin/dispynode.py -i [local IP] -d
import datetime
import time
import numba
import dispy
import dispy.httpd
@ -18,61 +19,7 @@ from pyFTS.common import Util
from pyFTS.partitioners import Grid
def run_point(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None):
"""
Point forecast benchmark function to be executed on cluster nodes
:param mfts: FTS model
:param partitioner: Universe of Discourse partitioner
:param train_data: data used to train the model
:param test_data: ata used to test the model
:param window_key: id of the sliding window
:param transformation: data transformation
:param indexer: seasonal indexer
:return: a dictionary with the benchmark results
"""
import time
from pyFTS.models import yu, chen, hofts, pwfts,ismailefendi,sadaei, song, cheng, hwang
from pyFTS.partitioners import Grid, Entropy, FCM
from pyFTS.benchmarks import Measures, naive, arima, quantreg
from pyFTS.common import Transformations
tmp = [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS,
cheng.TrendWeightedFTS, sadaei.ExponentialyWeightedFTS, hofts.HighOrderFTS, hwang.HighOrderFTS,
pwfts.ProbabilisticWeightedFTS]
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
tmp4 = [naive.Naive, arima.ARIMA, quantreg.QuantileRegression]
tmp3 = [Measures.get_point_statistics]
tmp5 = [Transformations.Differential]
if mfts.benchmark_only:
_key = mfts.shortname + str(mfts.order if mfts.order is not None else "")
else:
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time()
times = _end - _start
_start = time.time()
_rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer)
_end = time.time()
times += _end - _start
ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times, 'window': window_key}
return ret
@numba.jit()
def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
benchmark_models=None, benchmark_models_parameters = None,
@ -100,7 +47,7 @@ def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, part
:return: DataFrame with the results
"""
cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies)
cluster = dispy.JobCluster(benchmarks.run_point, nodes=nodes) #, depends=dependencies)
http_server = dispy.httpd.DispyHTTPServer(cluster)
@ -174,7 +121,7 @@ def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, part
return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u)
@numba.jit()
def build_model_pool_point(models, max_order, benchmark_models, benchmark_models_parameters):
pool = []
@ -209,57 +156,7 @@ def build_model_pool_point(models, max_order, benchmark_models, benchmark_models
return pool
def run_interval(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None):
"""
Interval forecast benchmark function to be executed on cluster nodes
:param mfts: FTS model
:param partitioner: Universe of Discourse partitioner
:param train_data: data used to train the model
:param test_data: ata used to test the model
:param window_key: id of the sliding window
:param transformation: data transformation
:param indexer: seasonal indexer
:return: a dictionary with the benchmark results
"""
import time
from pyFTS.models import hofts,ifts,pwfts
from pyFTS.partitioners import Grid, Entropy, FCM
from pyFTS.benchmarks import Measures, arima, quantreg
tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS]
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
tmp4 = [arima.ARIMA, quantreg.QuantileRegression]
tmp3 = [Measures.get_interval_statistics]
if mfts.benchmark_only:
_key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha)
else:
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time()
times = _end - _start
_start = time.time()
_sharp, _res, _cov, _q05, _q25, _q75, _q95 = Measures.get_interval_statistics(test_data, mfts)
_end = time.time()
times += _end - _start
ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times,
'Q05': _q05, 'Q25': _q25, 'Q75': _q75, 'Q95': _q95, 'window': window_key}
return ret
@numba.jit()
def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
benchmark_models=None, benchmark_models_parameters = None,
@ -296,7 +193,7 @@ def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None,
if benchmark_models_parameters is None:
benchmark_models_parameters = [(1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2]
cluster = dispy.JobCluster(run_interval, nodes=nodes) #, depends=dependencies)
cluster = dispy.JobCluster(benchmarks.run_interval, nodes=nodes) #, depends=dependencies)
http_server = dispy.httpd.DispyHTTPServer(cluster)
@ -407,70 +304,7 @@ def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None,
times, q05, q25, q75, q95)
def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, window_key=None, transformation=None, indexer=None):
"""
Probabilistic m-step ahead forecast benchmark function to be executed on cluster nodes
:param mfts: FTS model
:param partitioner: Universe of Discourse partitioner
:param train_data: data used to train the model
:param test_data: ata used to test the model
:param steps:
:param resolution:
:param window_key: id of the sliding window
:param transformation: data transformation
:param indexer: seasonal indexer
:return: a dictionary with the benchmark results
"""
import time
import numpy as np
from pyFTS.models import hofts, ifts, pwfts
from pyFTS.models.ensemble import ensemble
from pyFTS.partitioners import Grid, Entropy, FCM
from pyFTS.benchmarks import Measures, arima
from pyFTS.models.seasonal import SeasonalIndexer
tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS, arima.ARIMA, ensemble.AllMethodEnsembleFTS]
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
tmp3 = [Measures.get_distribution_statistics, SeasonalIndexer.SeasonalIndexer, SeasonalIndexer.LinearSeasonalIndexer]
if mfts.benchmark_only:
_key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha)
else:
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.append_transformation(transformation)
if mfts.has_seasonality:
mfts.indexer = indexer
try:
_start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time()
times = _end - _start
_crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(test_data, mfts, steps=steps,
resolution=resolution)
_t1 += times
_t2 += times
except Exception as e:
print(e)
_crps1 = np.nan
_crps2 = np.nan
_t1 = np.nan
_t2 = np.nan
ret = {'key': _key, 'obj': mfts, 'CRPS_Interval': _crps1, 'CRPS_Distribution': _crps2, 'TIME_Interval': _t1,
'TIME_Distribution': _t2, 'window': window_key}
return ret
@numba.jit()
def ahead_sliding_window(data, windowsize, steps, resolution, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
benchmark_models=None, benchmark_models_parameters = None,
@ -505,7 +339,7 @@ def ahead_sliding_window(data, windowsize, steps, resolution, train=0.8, inc=0.1
if benchmark_models_parameters is None:
benchmark_models_parameters = [(1, 0, 0), (1, 0, 1), (2, 0, 0), (2, 0, 1), (2, 0, 2)]
cluster = dispy.JobCluster(run_ahead, nodes=nodes) # , depends=dependencies)
cluster = dispy.JobCluster(benchmarks.run_ahead, nodes=nodes) # , depends=dependencies)
http_server = dispy.httpd.DispyHTTPServer(cluster)

View File

@ -1,4 +1,5 @@
import time
import numba
import matplotlib.pyplot as plt
import dill
import numpy as np
@ -15,6 +16,7 @@ def uniquefilename(name):
return name + str(current_milli_time())
def show_and_save_image(fig, file, flag, lgd=None):
"""
Show and image and save on file
@ -38,7 +40,7 @@ def enumerate2(xs, start=0, step=1):
start += step
def sliding_window(data, windowsize, train=0.8, inc=0.1):
def sliding_window(data, windowsize, train=0.8, inc=0.1, **kwargs):
"""
Sliding window method of cross validation for time series
:param data: the entire dataset
@ -50,7 +52,16 @@ def sliding_window(data, windowsize, train=0.8, inc=0.1):
l = len(data)
ttrain = int(round(windowsize * train, 0))
ic = int(round(windowsize * inc, 0))
for count in np.arange(0,l-windowsize+ic,ic):
progressbar = kwargs.get('progress', None)
rng = np.arange(0,l-windowsize+ic,ic)
if progressbar:
from tqdm import tqdm
rng = tqdm(rng)
for count in rng:
if count + windowsize > l:
_end = l
else:
@ -91,11 +102,34 @@ def load_env(file):
dill.load_session(file)
def start_dispy_cluster(method, nodes):
import dispy, dispy.httpd, logging
cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000)
http_server = dispy.httpd.DispyHTTPServer(cluster)
return cluster, http_server
def stop_dispy_cluster(cluster, http_server):
cluster.wait() # wait for all jobs to finish
cluster.print_status()
http_server.shutdown() # this waits until browser gets all updates
cluster.close()
def simple_model_train(model, data, parameters):
model.train(data, **parameters)
return model
def distributed_train(model, train_method, nodes, fts_method, data, num_batches=10,
train_parameters={}, **kwargs):
import dispy, dispy.httpd, datetime
@ -106,9 +140,7 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches=
file_path = kwargs.get('file_path', None)
cluster = dispy.JobCluster(train_method, nodes=nodes) # , depends=dependencies)
http_server = dispy.httpd.DispyHTTPServer(cluster)
cluster, http_server = start_dispy_cluster(train_method, nodes)
print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now()))
@ -149,26 +181,21 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches=
print("[{0: %H:%M:%S}] Distrituted Train Finished".format(datetime.datetime.now()))
cluster.wait() # wait for all jobs to finish
cluster.print_status()
http_server.shutdown() # this waits until browser gets all updates
cluster.close()
stop_dispy_cluster(cluster, http_server)
return model
def simple_model_predict(model, data, parameters):
return model.predict(data, **parameters)
def distributed_predict(model, parameters, nodes, data, num_batches):
import dispy, dispy.httpd
cluster = dispy.JobCluster(simple_model_predict, nodes=nodes) # , depends=dependencies)
http_server = dispy.httpd.DispyHTTPServer(cluster)
cluster, http_server = start_dispy_cluster(simple_model_predict, nodes)
jobs = []
n = len(data)
@ -199,11 +226,6 @@ def distributed_predict(model, parameters, nodes, data, num_batches):
print(job.exception)
print(job.stdout)
cluster.wait() # wait for all jobs to finish
cluster.print_status()
http_server.shutdown() # this waits until browser gets all updates
cluster.close()
stop_dispy_cluster(cluster, http_server)
return ret

View File

@ -5,7 +5,7 @@ from pyFTS.common import FuzzySet, SortedCollection, tree, Util
class FTS(object):
"""
Fuzzy Time Series
Fuzzy Time Series object model
"""
def __init__(self, order, name, **kwargs):
"""
@ -60,7 +60,14 @@ class FTS(object):
Forecast using trained model
:param data: time series with minimal length to the order of the model
:param kwargs:
:return:
:keyword
type: the forecasting type, one of these values: point(default), interval or distribution.
steps_ahead: The forecasting horizon, i. e., the number of steps ahead to forecast
start: in the multi step forecasting, the index of the data where to start forecasting
distributed: boolean, indicate if the forecasting procedure will be distributed in a dispy cluster
nodes: a list with the dispy cluster nodes addresses
:return: a numpy array with the forecasted data
"""
if 'distributed' in kwargs:
@ -181,7 +188,7 @@ class FTS(object):
def fit(self, data, **kwargs):
"""
:param data:
:param data: the training time series
:param kwargs:
:keyword
@ -189,12 +196,17 @@ class FTS(object):
save_model: save final model on disk
batch_save: save the model between each batch
file_path: path to save the model
distributed: boolean, indicate if the training procedure will be distributed in a dispy cluster
nodes: a list with the dispy cluster nodes addresses
:return:
"""
import datetime
num_batches = kwargs.get('num_batches', 10)
dump = kwargs.get('dump', None)
num_batches = kwargs.get('num_batches', None)
save = kwargs.get('save_model', False) # save model on disk
@ -214,13 +226,23 @@ class FTS(object):
batch_save_interval=batch_save_interval)
else:
if dump == 'time':
print("[{0: %H:%M:%S}] Start training".format(datetime.datetime.now()))
if num_batches is not None:
n = len(data)
batch_size = int(n / num_batches)
bcount = 1
for ct in range(self.order, n, batch_size):
rng = range(self.order, n, batch_size)
if dump == 'tqdm':
from tqdm import tqdm
rng = tqdm(rng)
for ct in rng:
if dump == 'time':
print("[{0: %H:%M:%S}] Starting batch ".format(datetime.datetime.now()) + str(bcount))
if self.is_multivariate:
ndata = data.iloc[ct - self.order:ct + batch_size]
@ -232,6 +254,7 @@ class FTS(object):
if batch_save:
Util.persist_obj(self,file_path)
if dump == 'time':
print("[{0: %H:%M:%S}] Finish batch ".format(datetime.datetime.now()) + str(bcount))
bcount += 1
@ -239,6 +262,7 @@ class FTS(object):
else:
self.train(data, **kwargs)
if dump == 'time':
print("[{0: %H:%M:%S}] Finish training".format(datetime.datetime.now()))
if save:

View File

@ -201,8 +201,9 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
if flrg.get_key() in self.flrgs:
return self.flrgs[flrg.get_key()].frequency_count / self.global_frequency_count
else:
self.add_new_PWFLGR(flrg)
return self.flrg_lhs_unconditional_probability(flrg)
return 0.0
#self.add_new_PWFLGR(flrg)
#return self.flrg_lhs_unconditional_probability(flrg)
def flrg_lhs_conditional_probability(self, x, flrg):
mv = flrg.get_membership(x, self.sets)
@ -214,8 +215,11 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
tmp = self.flrgs[flrg.get_key()]
ret = tmp.get_midpoint(self.sets) #sum(np.array([tmp.rhs_unconditional_probability(s) * self.setsDict[s].centroid for s in tmp.RHS]))
else:
if len(flrg.LHS) > 0:
pi = 1 / len(flrg.LHS)
ret = sum(np.array([pi * self.sets[s].centroid for s in flrg.LHS]))
else:
ret = np.nan
return ret
def flrg_rhs_conditional_probability(self, x, flrg):
@ -241,8 +245,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
tmp = self.flrgs[flrg.get_key()]
ret = tmp.get_upper(self.sets)
else:
pi = 1 / len(flrg.LHS)
ret = sum(np.array([pi * self.sets[s].upper for s in flrg.LHS]))
ret = 0
return ret
def get_lower(self, flrg):
@ -250,8 +253,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
tmp = self.flrgs[flrg.get_key()]
ret = tmp.get_lower(self.sets)
else:
pi = 1 / len(flrg.LHS)
ret = sum(np.array([pi * self.sets[s].lower for s in flrg.LHS]))
ret = 0
return ret
def forecast(self, data, **kwargs):
@ -322,78 +324,16 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
ret.append_rhs([lo_qt, up_qt])
def interval_extremum(self, k, ndata, ret):
affected_flrgs = []
affected_flrgs_memberships = []
norms = []
sample = ndata[k - (self.order - 1): k + 1]
flrgs = self.generate_lhs_flrg(sample)
up = []
lo = []
# Find the sets which membership > 0 for each lag
count = 0
lags = {}
if self.order > 1:
subset = ndata[k - (self.order - 1): k + 1]
for instance in subset:
mb = FuzzySet.fuzzyfy_instance(instance, self.sets)
tmp = np.argwhere(mb)
idx = np.ravel(tmp) # flatten the array
if idx.size == 0: # the element is out of the bounds of the Universe of Discourse
if math.isclose(instance, self.sets[0].lower) or instance < self.sets[0].lower:
idx = [0]
elif math.isclose(instance, self.sets[-1].upper) or instance > self.sets[-1].upper:
idx = [len(self.sets) - 1]
else:
raise Exception("Data exceed the known bounds [%s, %s] of universe of discourse: %s" %
(self.sets[0].lower, self.sets[-1].upper, instance))
lags[count] = idx
count += 1
# Build the tree with all possible paths
root = tree.FLRGTreeNode(None)
self.build_tree(root, lags, 0)
# Trace the possible paths and build the PFLRG's
for p in root.paths():
path = list(reversed(list(filter(None.__ne__, p))))
flrg = hofts.HighOrderFLRG(self.order)
for kk in path: flrg.append_lhs(self.sets[kk])
assert len(flrg.LHS) == subset.size, str(subset) + " -> " + str([s.name for s in flrg.LHS])
##
affected_flrgs.append(flrg)
# Find the general membership of FLRG
affected_flrgs_memberships.append(min(self.get_sequence_membership(subset, flrg.LHS)))
else:
mv = FuzzySet.fuzzyfy_instance(ndata[k], self.sets) # get all membership values
tmp = np.argwhere(mv) # get the indices of values > 0
idx = np.ravel(tmp) # flatten the array
if idx.size == 0: # the element is out of the bounds of the Universe of Discourse
if math.isclose(ndata[k], self.sets[0].lower) or ndata[k] < self.sets[0].lower:
idx = [0]
elif math.isclose(ndata[k], self.sets[-1].upper) or ndata[k] > self.sets[-1].upper:
idx = [len(self.sets) - 1]
else:
raise Exception("Data exceed the known bounds [%s, %s] of universe of discourse: %s" %
(self.sets[0].lower, self.sets[-1].upper, ndata[k]))
for kk in idx:
flrg = hofts.HighOrderFLRG(self.order)
flrg.append_lhs(self.sets[kk])
affected_flrgs.append(flrg)
affected_flrgs_memberships.append(mv[kk])
for count, flrg in enumerate(affected_flrgs):
# achar o os bounds de cada FLRG, ponderados pela probabilidade e pertinência
norm = self.flrg_lhs_unconditional_probability(flrg) * affected_flrgs_memberships[count]
norms = []
for flrg in flrgs:
norm = self.flrg_lhs_conditional_probability(sample, flrg)
if norm == 0:
norm = self.flrg_lhs_unconditional_probability(flrg) # * 0.001
up.append(norm * self.get_upper(flrg))
@ -403,11 +343,12 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
# gerar o intervalo
norm = sum(norms)
if norm == 0:
ret.append_rhs([0, 0])
ret.append([0, 0])
else:
lo_ = sum(lo) / norm
up_ = sum(up) / norm
ret.append_rhs([lo_, up_])
ret.append([lo_, up_])
def forecast_distribution(self, data, **kwargs):
@ -415,15 +356,18 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
data = [data]
smooth = kwargs.get("smooth", "none")
nbins = kwargs.get("num_bins", 100)
ndata = np.array(self.apply_transformations(data))
l = len(ndata)
uod = self.get_UoD()
if 'bins' in kwargs:
_bins = kwargs.pop('bins')
else:
nbins = kwargs.get("num_bins", 100)
_bins = np.linspace(uod[0], uod[1], nbins)
ret = []
uod = self.get_UoD()
_bins = np.linspace(uod[0], uod[1], nbins)
for k in np.arange(self.order - 1, l):
sample = ndata[k - (self.order - 1): k + 1]
@ -487,120 +431,63 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
ret = []
method = kwargs.get('method', 2)
smooth = "KDE" if method != 4 else "none"
nbins = kwargs.get("num_bins", 100)
smooth = kwargs.get("smooth", "none")
ndata = np.array(self.apply_transformations(data))
uod = self.get_UoD()
_bins = np.linspace(uod[0], uod[1], nbins).tolist()
if method != 4:
intervals = self.forecast_ahead_interval(data, steps)
if 'bins' in kwargs:
_bins = kwargs.pop('bins')
else:
l = len(data)
for k in np.arange(l - self.order, l):
dist = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, bins=_bins, **kwargs)
dist.set(data[k], 1.0)
nbins = kwargs.get("num_bins", 100)
_bins = np.linspace(uod[0], uod[1], nbins)
start = kwargs.get('start', self.order)
sample = ndata[start - (self.order - 1): start + 1]
for dat in sample:
tmp = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, bins=_bins, **kwargs)
tmp.set(dat, 1.0)
ret.append(tmp)
dist = self.forecast_distribution(sample, bins=_bins)
ret.append(dist)
for k in np.arange(self.order, steps+self.order):
data = []
if method == 1:
dist = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, bins=_bins, **kwargs)
lags = {}
cc = 0
# Find all bins of past distributions with probability greater than zero
for i in intervals[k - self.order : k]:
quantiles = []
for qt in np.arange(0, 50, 2):
quantiles.append(i[0] + qt * ((i[1] - i[0]) / 100))
quantiles.append(i[1] - qt * ((i[1] - i[0]) / 100))
quantiles.append(i[0] + ((i[1] - i[0]) / 2))
quantiles = list(set(quantiles))
quantiles.sort()
lags[cc] = quantiles
cc += 1
# Build the tree with all possible paths
root = tree.FLRGTreeNode(None)
self.build_tree_without_order(root, lags, 0)
# Trace the possible paths
for p in root.paths():
path = list(reversed(list(filter(None.__ne__, p))))
qtle = np.ravel(self.forecast_interval(path))
data.extend(np.linspace(qtle[0],qtle[1],100).tolist())
elif method == 2:
for qt in np.arange(0, 50, 1):
# print(qt)
qtle_lower = self.forecast_interval(
[intervals[x][0] + qt * ((intervals[x][1] - intervals[x][0]) / 100) for x in
np.arange(k - self.order, k)])
qtle_lower = np.ravel(qtle_lower)
data.extend(np.linspace(qtle_lower[0], qtle_lower[1], 100).tolist())
qtle_upper = self.forecast_interval(
[intervals[x][1] - qt * ((intervals[x][1] - intervals[x][0]) / 100) for x in
np.arange(k - self.order, k)])
qtle_upper = np.ravel(qtle_upper)
data.extend(np.linspace(qtle_upper[0], qtle_upper[1], 100).tolist())
qtle_mid = self.forecast_interval(
[intervals[x][0] + (intervals[x][1] - intervals[x][0]) / 2 for x in np.arange(k - self.order, k)])
qtle_mid = np.ravel(qtle_mid)
data.extend(np.linspace(qtle_mid[0], qtle_mid[1], 100).tolist())
elif method == 3:
i = intervals[k]
data = np.linspace(i[0],i[1],100).tolist()
else:
dist = ProbabilityDistribution.ProbabilityDistribution(smooth, bins=_bins,
uod=uod, **kwargs)
lags = {}
cc = 0
for dd in ret[k - self.order: k]:
for ct, dd in enumerate(ret[k - self.order: k]):
vals = [float(v) for v in dd.bins if round(dd.density(v), 4) > 0]
lags[cc] = sorted(vals)
cc += 1
lags[ct] = sorted(vals)
root = tree.FLRGTreeNode(None)
self.build_tree_without_order(root, lags, 0)
# Trace the possible paths
# Trace all possible combinations between the bins of past distributions
for p in root.paths():
path = list(reversed(list(filter(None.__ne__, p))))
# get the combined probabilities for this path
pk = np.prod([ret[k - self.order + o].density(path[o])
for o in np.arange(0, self.order)])
d = self.forecast_distribution(path)[0]
for bin in _bins:
dist.set(bin, dist.density(bin) + pk * d.density(bin))
if method != 4:
dist = ProbabilityDistribution.ProbabilityDistribution(smooth, bins=_bins, data=data,
uod=uod, **kwargs)
ret.append(dist)
ret = ret[self.order:]
return ret

View File

@ -1,9 +1,12 @@
import numba
import numpy as np
import pandas as pd
import matplotlib as plt
import matplotlib.colors as pltcolors
import matplotlib.pyplot as plt
#from mpl_toolkits.mplot3d import Axes3D
from pyFTS.benchmarks import Measures
from pyFTS.common import Membership, Util
from pyFTS.partitioners import Grid,Huarng,FCM,Entropy
@ -12,6 +15,107 @@ all_methods = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitio
mfs = [Membership.trimf, Membership.gaussmf, Membership.trapmf]
@numba.jit()
def sliding_window_simple_search(data, windowsize, model, partitions, orders, **kwargs):
_3d = len(orders) > 1
ret = []
errors = np.array([[0 for k in range(len(partitions))] for kk in range(len(orders))])
forecasted_best = []
figsize = kwargs.get('figsize', [10, 15])
fig = plt.figure(figsize=figsize)
plotforecasts = kwargs.get('plotforecasts',False)
if plotforecasts:
ax0 = fig.add_axes([0, 0.4, 0.9, 0.5]) # left, bottom, width, height
ax0.set_xlim([0, len(data)])
ax0.set_ylim([min(data) * 0.9, max(data) * 1.1])
ax0.set_title('Forecasts')
ax0.set_ylabel('F(T)')
ax0.set_xlabel('T')
min_rmse = 1000000.0
best = None
intervals = kwargs.get('intervals',False)
threshold = kwargs.get('threshold',0.5)
progressbar = kwargs.get('progressbar', None)
rng1 = enumerate(partitions, start=0)
if progressbar:
from tqdm import tqdm
rng1 = enumerate(tqdm(partitions), start=0)
for pc, p in rng1:
fs = Grid.GridPartitioner(data=data, npart=p)
rng2 = enumerate(orders, start=0)
if progressbar:
rng2 = enumerate(tqdm(orders), start=0)
for oc, o in rng2:
_error = []
for ct, train, test in Util.sliding_window(data, windowsize, 0.8, **kwargs):
fts = model("q = " + str(p) + " n = " + str(o), partitioner=fs)
fts.fit(train, order=o)
if not intervals:
forecasted = fts.forecast(test)
if not fts.has_seasonality:
_error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted[:-1])) )
else:
_error.append( Measures.rmse(np.array(test[o:]), np.array(forecasted)) )
for kk in range(o):
forecasted.insert(0, None)
if plotforecasts: ax0.plot(forecasted, label=fts.name)
else:
forecasted = fts.forecast_interval(test)
_error.append( 1.0 - Measures.rmse_interval(np.array(test[o:]), np.array(forecasted[:-1])) )
error = np.nanmean(_error)
errors[oc, pc] = error
if (min_rmse - error) > threshold:
min_rmse = error
best = fts
forecasted_best = forecasted
# print(min_rmse)
if plotforecasts:
# handles0, labels0 = ax0.get_legend_handles_labels()
# ax0.legend(handles0, labels0)
elev = kwargs.get('elev', 30)
azim = kwargs.get('azim', 144)
ax0.plot(test, label="Original", linewidth=3.0, color="black")
if _3d: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim)
if not plotforecasts: ax1 = Axes3D(fig, rect=[0, 1, 0.9, 0.9], elev=elev, azim=azim)
# ax1 = fig.add_axes([0.6, 0.5, 0.45, 0.45], projection='3d')
if _3d:
ax1.set_title('Error Surface')
ax1.set_ylabel('Model order')
ax1.set_xlabel('Number of partitions')
ax1.set_zlabel('RMSE')
X, Y = np.meshgrid(partitions, orders)
surf = ax1.plot_surface(X, Y, errors, rstride=1, cstride=1, antialiased=True)
else:
ax1 = fig.add_axes([0, 1, 0.9, 0.9])
ax1.set_title('Error Curve')
ax1.set_ylabel('Number of partitions')
ax1.set_xlabel('RMSE')
ax0.plot(errors,partitions)
ret.append(best)
ret.append(forecasted_best)
# plt.tight_layout()
file = kwargs.get('file', None)
save = kwargs.get('save', False)
Util.show_and_save_image(fig, file, save)
return ret
def plot_sets(data, sets, titles, tam=[12, 10], save=False, file=None):
num = len(sets)
#fig = plt.figure(figsize=tam)

View File

@ -9,6 +9,17 @@ import numpy as np
import pandas as pd
from pyFTS.common import Transformations
from pyFTS.data import INMET
from pyFTS.data import TAIEX
print(INMET.get_dataframe())
dataset = TAIEX.get_data()
from pyFTS.benchmarks import benchmarks as bchmk
from pyFTS.models import pwfts
bchmk.sliding_window_benchmarks(dataset, 1000, train=0.8, inc=0.2, methods=[pwfts.ProbabilisticWeightedFTS],
benchmark_models=False, orders=[1], partitions=[10], #np.arange(10,100,2),
progress=False, type='distribution',
distributed=False, nodes=['192.168.0.106', '192.168.0.105', '192.168.0.110'],
save=True, file="pwfts_taiex_interval.csv")