From cbfbf47f5471b8ed9e45768cdf5a4b32982a7705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido=20de=20Lima=20e=20Silva?= Date: Sat, 6 May 2017 21:19:04 -0300 Subject: [PATCH] - Issue #3 - Code documentation with PEP 257 compliance --- benchmarks/Measures.py | 27 +++++ benchmarks/benchmarks.py | 66 +++++------ benchmarks/distributed_benchmarks.py | 167 ++++++++++++++++++++++++++- benchmarks/parallel_benchmarks.py | 87 ++++++++++++++ song.py | 4 +- 5 files changed, 313 insertions(+), 38 deletions(-) diff --git a/benchmarks/Measures.py b/benchmarks/Measures.py index 590460a..4ecd3af 100644 --- a/benchmarks/Measures.py +++ b/benchmarks/Measures.py @@ -4,6 +4,7 @@ pyFTS module for common benchmark metrics """ +import time import numpy as np import pandas as pd from pyFTS.common import FuzzySet,SortedCollection @@ -240,3 +241,29 @@ def get_interval_statistics(original, model): ret.append(round(resolution(forecasts), 2)) ret.append(round(coverage(original[model.order:], forecasts[:-1]), 2)) return ret + + +def get_distribution_statistics(original, model, steps, resolution): + ret = list() + try: + _s1 = time.time() + densities1 = model.forecastAheadDistribution(original, steps, parameters=3) + _e1 = time.time() + ret.append(round(crps(original, densities1), 3)) + ret.append(round(_e1 - _s1, 3)) + except Exception as e: + print('Erro: ', e) + ret.append(np.nan) + ret.append(np.nan) + + try: + _s2 = time.time() + densities2 = model.forecastAheadDistribution(original, steps, parameters=2) + _e2 = time.time() + ret.append( round(crps(original, densities2), 3)) + ret.append(round(_e2 - _s2, 3)) + except: + ret.append(np.nan) + ret.append(np.nan) + + return ret diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index eb0eb05..4433970 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -17,7 +17,7 @@ from mpl_toolkits.mplot3d import Axes3D from pyFTS.partitioners import partitioner, Grid, Huarng, Entropy, FCM from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, ProbabilityDistribution, Util, quantreg from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util -from pyFTS import fts, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts, cheng +from pyFTS import fts, song, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts, cheng, ensemble from copy import deepcopy colors = ['grey', 'rosybrown', 'maroon', 'red','orange', 'yellow', 'olive', 'green', @@ -29,24 +29,34 @@ styles = ['-','--','-.',':','.'] nsty = len(styles) + def get_benchmark_point_methods(): - """Return all non FTS methods for point forecast""" + """Return all non FTS methods for point forecasting""" return [naive.Naive, arima.ARIMA, quantreg.QuantileRegression] + def get_point_methods(): - """Return all FTS methods for point forecast""" - return [chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS, cheng.TrendWeightedFTS, - sadaei.ExponentialyWeightedFTS, hofts.HighOrderFTS, pwfts.ProbabilisticWeightedFTS] + """Return all FTS methods for point forecasting""" + return [song.ConventionalFTS, chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS, + cheng.TrendWeightedFTS, sadaei.ExponentialyWeightedFTS, hofts.HighOrderFTS, + pwfts.ProbabilisticWeightedFTS] + def get_benchmark_interval_methods(): - """Return all non FTS methods for interval forecast""" + """Return all non FTS methods for interval forecasting""" return [quantreg.QuantileRegression] + def get_interval_methods(): - """Return all FTS methods for interval forecast""" + """Return all FTS methods for interval forecasting""" return [ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS] +def get_probabilistic_methods(): + """Return all FTS methods for probabilistic forecasting""" + return [quantreg.QuantileRegression, ensemble.EnsembleFTS, pwfts.ProbabilisticWeightedFTS] + + def external_point_sliding_window(models, parameters, data, windowsize,train=0.8, dump=False, save=False, file=None, sintetic=True): """ @@ -628,6 +638,19 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]): def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic): + """ + Save benchmark results for m-step ahead probabilistic forecasters + :param experiments: + :param file: + :param objs: + :param crps_interval: + :param crps_distr: + :param times1: + :param times2: + :param save: + :param sintetic: + :return: + """ ret = [] if sintetic: @@ -738,7 +761,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution _tdiff = _end - _start - _crps1, _crps2, _t1, _t2 = get_distribution_statistics(test,mfts,steps=steps,resolution=resolution) + _crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(test,mfts,steps=steps,resolution=resolution) crps_interval[_key].append(_crps1) crps_distr[_key].append(_crps2) @@ -773,7 +796,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution _tdiff = _end - _start - _crps1, _crps2, _t1, _t2 = get_distribution_statistics(test, mfts, steps=steps, + _crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(test, mfts, steps=steps, resolution=resolution) crps_interval[_key].append(_crps1) @@ -826,36 +849,13 @@ def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resol interpol=False, save=save, file=file, tam=tam, resolution=resolution, option=option) -def get_distribution_statistics(original, model, steps, resolution): - ret = list() - try: - _s1 = time.time() - densities1 = model.forecastAheadDistribution(original, steps, parameters=3) - _e1 = time.time() - ret.append(round(Measures.crps(original, densities1), 3)) - ret.append(round(_e1 - _s1, 3)) - except Exception as e: - print('Erro: ', e) - ret.append(np.nan) - ret.append(np.nan) - try: - _s2 = time.time() - densities2 = model.forecastAheadDistribution(original, steps, parameters=2) - _e2 = time.time() - ret.append( round(Measures.crps(original, densities2), 3)) - ret.append(round(_e2 - _s2, 3)) - except: - ret.append(np.nan) - ret.append(np.nan) - - return ret def print_distribution_statistics(original, models, steps, resolution): ret = "Model & Order & Interval & Distribution \\\\ \n" for fts in models: - _crps1, _crps2, _t1, _t2 = get_distribution_statistics(original, fts, steps, resolution) + _crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(original, fts, steps, resolution) ret += fts.shortname + " & " ret += str(fts.order) + " & " ret += str(_crps1) + " & " diff --git a/benchmarks/distributed_benchmarks.py b/benchmarks/distributed_benchmarks.py index 0ca3ec3..67755f4 100644 --- a/benchmarks/distributed_benchmarks.py +++ b/benchmarks/distributed_benchmarks.py @@ -172,7 +172,7 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) -def run_interval(mfts, partitioner, train_data, test_data, transformation=None, indexer=None): +def run_interval(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None): """ Interval forecast benchmark function to be executed on cluster nodes :param mfts: FTS model @@ -211,7 +211,8 @@ def run_interval(mfts, partitioner, train_data, test_data, transformation=None, _end = time.time() times += _end - _start - ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times} + ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times, + 'window': window_key} return ret @@ -320,4 +321,164 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione cluster.close() return benchmarks.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, - times) \ No newline at end of file + times) + + +def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, window_key=None, transformation=None, indexer=None): + """ + Probabilistic m-step ahead forecast benchmark function to be executed on cluster nodes + :param mfts: FTS model + :param partitioner: Universe of Discourse partitioner + :param train_data: data used to train the model + :param test_data: ata used to test the model + :param steps: + :param resolution: + :param window_key: id of the sliding window + :param transformation: data transformation + :param indexer: seasonal indexer + :return: a dictionary with the benchmark results + """ + import time + from pyFTS import hofts, ifts, pwfts + from pyFTS.partitioners import Grid, Entropy, FCM + from pyFTS.benchmarks import Measures, arima, quantreg + + tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS, arima.ARIMA, quantreg.QuantileRegression] + + tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] + + tmp3 = [Measures.get_distribution_statistics] + + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) + + try: + _start = time.time() + mfts.train(train_data, partitioner.sets, order=mfts.order) + _end = time.time() + times = _end - _start + + _crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(test_data, mfts, steps=steps, + resolution=resolution) + _t1 += times + _t2 += times + except Exception as e: + print(e) + _crps1 = np.nan + _crps2 = np.nan + _t1 = np.nan + _t2 = np.nan + + ret = {'key': _key, 'obj': mfts, 'CRPS_Interval': _crps1, 'CRPS_Distribution': _crps2, 'TIME_Interval': _t1, + 'TIME_Distribution': _t2, 'window': window_key} + + return ret + + +def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, sintetic=False,nodes=None, depends=None): + """ + Distributed sliding window benchmarks for FTS probabilistic forecasters + :param data: + :param windowsize: size of sliding window + :param train: percentual of sliding window data used to train the models + :param steps: + :param resolution: + :param models: FTS point forecasters + :param partitioners: Universe of Discourse partitioner + :param partitions: the max number of partitions on the Universe of Discourse + :param max_order: the max order of the models (for high order models) + :param transformation: data transformation + :param indexer: seasonal indexer + :param dump: + :param save: save results + :param file: file path to save the results + :param sintetic: if true only the average and standard deviation of the results + :param nodes: list of cluster nodes to distribute tasks + :param depends: list of module dependencies + :return: DataFrame with the results + """ + cluster = dispy.JobCluster(run_point, nodes=nodes) # , depends=dependencies) + + http_server = dispy.httpd.DispyHTTPServer(cluster) + + _process_start = time.time() + + print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) + + pool = [] + jobs = [] + objs = {} + crps_interval = {} + crps_distr = {} + times1 = {} + times2 = {} + + if models is None: + models = benchmarks.get_probabilistic_methods() + + for model in models: + mfts = model("") + + if mfts.is_high_order: + for order in np.arange(1, max_order + 1): + if order >= mfts.min_order: + mfts = model("") + mfts.order = order + pool.append(mfts) + else: + pool.append(mfts) + + experiments = 0 + for ct, train, test in Util.sliding_window(data, windowsize, train): + experiments += 1 + + if dump: print('\nWindow: {0}\n'.format(ct)) + + for partition in partitions: + + for partitioner in partitioners: + + data_train_fs = partitioner(train, partition, transformation=transformation) + + for id, m in enumerate(pool,start=0): + job = cluster.submit(m, data_train_fs, train, test, ct, transformation) + job.id = id # associate an ID to identify jobs (if needed later) + jobs.append(job) + + for job in jobs: + tmp = job() + if job.status == dispy.DispyJob.Finished and tmp is not None: + if tmp['key'] not in objs: + objs[tmp['key']] = tmp['obj'] + crps_interval[tmp['key']] = [] + crps_distr[tmp['key']] = [] + times1[tmp['key']] = [] + times2[tmp['key']] = [] + crps_interval[tmp['key']].append(tmp['CRPS_Interval']) + crps_distr[tmp['key']].append(tmp['CRPS_Distribution']) + times1[tmp['key']].append(tmp['TIME_Interval']) + times2[tmp['key']].append(tmp['TIME_Distribution']) + + else: + print(job.exception) + print(job.stdout) + + _process_end = time.time() + + print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) + + print("Process Duration: {0}".format(_process_end - _process_start)) + + cluster.wait() # wait for all jobs to finish + + cluster.print_status() + + http_server.shutdown() # this waits until browser gets all updates + cluster.close() + + return benchmarks.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic) diff --git a/benchmarks/parallel_benchmarks.py b/benchmarks/parallel_benchmarks.py index a88044f..b2db134 100644 --- a/benchmarks/parallel_benchmarks.py +++ b/benchmarks/parallel_benchmarks.py @@ -18,6 +18,17 @@ from pyFTS.benchmarks import benchmarks def run_point(mfts, partitioner, train_data, test_data, transformation=None, indexer=None): + """ + Point forecast benchmark function to be executed on threads + :param mfts: FTS model + :param partitioner: Universe of Discourse partitioner + :param train_data: data used to train the model + :param test_data: ata used to test the model + :param window_key: id of the sliding window + :param transformation: data transformation + :param indexer: seasonal indexer + :return: a dictionary with the benchmark results + """ pttr = str(partitioner.__module__).split('.')[-1] _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) mfts.partitioner = partitioner @@ -51,6 +62,23 @@ def run_point(mfts, partitioner, train_data, test_data, transformation=None, ind def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, save=False, file=None, sintetic=False): + """ + Parallel sliding window benchmarks for FTS point forecasters + :param data: + :param windowsize: size of sliding window + :param train: percentual of sliding window data used to train the models + :param models: FTS point forecasters + :param partitioners: Universe of Discourse partitioner + :param partitions: the max number of partitions on the Universe of Discourse + :param max_order: the max order of the models (for high order models) + :param transformation: data transformation + :param indexer: seasonal indexer + :param dump: + :param save: save results + :param file: file path to save the results + :param sintetic: if true only the average and standard deviation of the results + :return: DataFrame with the results + """ _process_start = time.time() print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) @@ -116,6 +144,17 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= def run_interval(mfts, partitioner, train_data, test_data, transformation=None, indexer=None): + """ + Interval forecast benchmark function to be executed on threads + :param mfts: FTS model + :param partitioner: Universe of Discourse partitioner + :param train_data: data used to train the model + :param test_data: ata used to test the model + :param window_key: id of the sliding window + :param transformation: data transformation + :param indexer: seasonal indexer + :return: a dictionary with the benchmark results + """ pttr = str(partitioner.__module__).split('.')[-1] _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) mfts.partitioner = partitioner @@ -149,6 +188,23 @@ def run_interval(mfts, partitioner, train_data, test_data, transformation=None, def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, save=False, file=None, sintetic=False): + """ + Parallel sliding window benchmarks for FTS interval forecasters + :param data: + :param windowsize: size of sliding window + :param train: percentual of sliding window data used to train the models + :param models: FTS point forecasters + :param partitioners: Universe of Discourse partitioner + :param partitions: the max number of partitions on the Universe of Discourse + :param max_order: the max order of the models (for high order models) + :param transformation: data transformation + :param indexer: seasonal indexer + :param dump: + :param save: save results + :param file: file path to save the results + :param sintetic: if true only the average and standard deviation of the results + :return: DataFrame with the results + """ _process_start = time.time() print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) @@ -215,6 +271,18 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, transformation=None, indexer=None): + """ + Probabilistic m-step ahead forecast benchmark function to be executed on threads + :param mfts: FTS model + :param partitioner: Universe of Discourse partitioner + :param train_data: data used to train the model + :param test_data: ata used to test the model + :param steps: + :param resolution: + :param transformation: data transformation + :param indexer: seasonal indexer + :return: a dictionary with the benchmark results + """ pttr = str(partitioner.__module__).split('.')[-1] _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) mfts.partitioner = partitioner @@ -248,6 +316,25 @@ def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, trans def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, partitioners=[Grid.GridPartitioner], partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, save=False, file=None, sintetic=False): + """ + Parallel sliding window benchmarks for FTS probabilistic forecasters + :param data: + :param windowsize: size of sliding window + :param train: percentual of sliding window data used to train the models + :param steps: + :param resolution: + :param models: FTS point forecasters + :param partitioners: Universe of Discourse partitioner + :param partitions: the max number of partitions on the Universe of Discourse + :param max_order: the max order of the models (for high order models) + :param transformation: data transformation + :param indexer: seasonal indexer + :param dump: + :param save: save results + :param file: file path to save the results + :param sintetic: if true only the average and standard deviation of the results + :return: DataFrame with the results + """ _process_start = time.time() print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) diff --git a/song.py b/song.py index 15578f4..391cab5 100644 --- a/song.py +++ b/song.py @@ -5,8 +5,8 @@ from pyFTS import fts class ConventionalFTS(fts.FTS): """Conventional Fuzzy Time Series""" def __init__(self, name, **kwargs): - super(ConventionalFTS, self).__init__(1, "CFTS " + name) - self.name = "Conventional FTS" + super(ConventionalFTS, self).__init__(1, "FTS " + name) + self.name = "Traditional FTS" self.detail = "Song & Chissom" self.R = None