diff --git a/benchmarks/Measures.py b/benchmarks/Measures.py index e248c0a..526915f 100644 --- a/benchmarks/Measures.py +++ b/benchmarks/Measures.py @@ -145,3 +145,35 @@ def crps(targets, densities): _crps += sum([ (Ff[col][k]-Fa[col][k])**2 for col in densities.columns]) return _crps / float(l * n) + + +def get_point_statistics(data, model, indexer=None): + if indexer is not None: + ndata = np.array(indexer.get_data(data[model.order:])) + else: + ndata = np.array(data[model.order:]) + + if model.isMultivariate or indexer is None: + forecasts = model.forecast(data) + elif not model.isMultivariate and indexer is not None: + forecasts = model.forecast(indexer.get_data(data)) + + if model.hasSeasonality: + nforecasts = np.array(forecasts) + else: + nforecasts = np.array(forecasts[:-1]) + ret = list() + try: + ret.append(np.round(rmse(ndata, nforecasts), 2)) + except: + ret.append(np.nan) + try: + ret.append(np.round(smape(ndata, nforecasts), 2)) + except: + ret.append(np.nan) + try: + ret.append(np.round(UStatistic(ndata, nforecasts), 2)) + except: + ret.append(np.nan) + + return ret \ No newline at end of file diff --git a/benchmarks/Util.py b/benchmarks/Util.py new file mode 100644 index 0000000..445dc63 --- /dev/null +++ b/benchmarks/Util.py @@ -0,0 +1,62 @@ +import numpy as np +import pandas as pd +from copy import deepcopy +from pyFTS.common import Util + + +def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u): + ret = [] + + if sintetic: + + for k in sorted(objs.keys()): + try: + mod = [] + mfts = objs[k] + mod.append(mfts.shortname) + mod.append(mfts.order) + mod.append(mfts.partitioner.name) + mod.append(mfts.partitioner.partitions) + mod.append(len(mfts)) + mod.append(np.round(np.nanmean(rmse[k]), 2)) + mod.append(np.round(np.nanstd(rmse[k]), 2)) + mod.append(np.round(np.nanmean(smape[k]), 2)) + mod.append(np.round(np.nanstd(smape[k]), 2)) + mod.append(np.round(np.nanmean(u[k]), 2)) + mod.append(np.round(np.nanstd(u[k]), 2)) + mod.append(np.round(np.nanmean(times[k]), 4)) + ret.append(mod) + except Exception as ex: + print("Erro ao salvar ", k) + print("Exceção ", ex) + + columns = ["Model", "Order", "Scheme","Partitions", "Size", "RMSEAVG", "RMSESTD", "SMAPEAVG", "SMAPESTD", "UAVG", "USTD", "TIMEAVG"] + else: + for k in sorted(objs.keys()): + try: + mfts = objs[k] + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'RMSE'] + tmp.extend(rmse[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'SMAPE'] + tmp.extend(smape[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'U'] + tmp.extend(u[k]) + ret.append(deepcopy(tmp)) + tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME'] + tmp.extend(times[k]) + ret.append(deepcopy(tmp)) + except Exception as ex: + print("Erro ao salvar ", k) + print("Exceção ", ex) + columns = [str(k) for k in np.arange(0, experiments)] + columns.insert(0, "Model") + columns.insert(1, "Order") + columns.insert(2, "Scheme") + columns.insert(3, "Partitions") + columns.insert(4, "Size") + columns.insert(5, "Measure") + dat = pd.DataFrame(ret, columns=columns) + if save: dat.to_csv(Util.uniquefilename(file), sep=";") + return dat diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 2edeea6..0d71d09 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -12,7 +12,7 @@ import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D # from sklearn.cross_validation import KFold from pyFTS.partitioners import partitioner, Grid, Huarng, Entropy, FCM -from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, ProbabilityDistribution +from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, ProbabilityDistribution, Util from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util from pyFTS import fts, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts from copy import deepcopy @@ -37,64 +37,6 @@ def get_interval_methods(): return [ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS] -def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u): - ret = [] - - if sintetic: - - for k in sorted(objs.keys()): - try: - mod = [] - mfts = objs[k] - mod.append(mfts.shortname) - mod.append(mfts.order) - mod.append(mfts.partitioner.name) - mod.append(mfts.partitioner.partitions) - mod.append(len(mfts)) - mod.append(np.round(np.nanmean(rmse[k]), 2)) - mod.append(np.round(np.nanstd(rmse[k]), 2)) - mod.append(np.round(np.nanmean(smape[k]), 2)) - mod.append(np.round(np.nanstd(smape[k]), 2)) - mod.append(np.round(np.nanmean(u[k]), 2)) - mod.append(np.round(np.nanstd(u[k]), 2)) - mod.append(np.round(np.nanmean(times[k]), 4)) - ret.append(mod) - except Exception as ex: - print("Erro ao salvar ", k) - print("Exceção ", ex) - - columns = ["Model", "Order", "Scheme","Partitions", "Size", "RMSEAVG", "RMSESTD", "SMAPEAVG", "SMAPESTD", "UAVG", "USTD", "TIMEAVG"] - else: - for k in sorted(objs.keys()): - try: - mfts = objs[k] - tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'RMSE'] - tmp.extend(rmse[k]) - ret.append(deepcopy(tmp)) - tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'SMAPE'] - tmp.extend(smape[k]) - ret.append(deepcopy(tmp)) - tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'U'] - tmp.extend(u[k]) - ret.append(deepcopy(tmp)) - tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME'] - tmp.extend(times[k]) - ret.append(deepcopy(tmp)) - except Exception as ex: - print("Erro ao salvar ", k) - print("Exceção ", ex) - columns = [str(k) for k in np.arange(0, experiments)] - columns.insert(0, "Model") - columns.insert(1, "Order") - columns.insert(2, "Scheme") - columns.insert(3, "Partitions") - columns.insert(4, "Size") - columns.insert(5, "Measure") - dat = pd.DataFrame(ret, columns=columns) - if save: dat.to_csv(Util.uniquefilename(file), sep=";") - return dat - - def external_point_sliding_window(models, parameters, data, windowsize,train=0.8, dump=False, save=False, file=None, sintetic=True): objs = {} lcolors = {} @@ -129,7 +71,7 @@ def external_point_sliding_window(models, parameters, data, windowsize,train=0.8 try: _start = time.time() - _rmse, _smape, _u = get_point_statistics(test, model, None) + _rmse, _smape, _u = Measures.get_point_statistics(test, model, None) _end = time.time() rmse[_key].append(_rmse) smape[_key].append(_smape) @@ -143,7 +85,7 @@ def external_point_sliding_window(models, parameters, data, windowsize,train=0.8 u[_key].append(np.nan) times[_key].append(np.nan) - return save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) + return Util.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner], @@ -240,7 +182,7 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G times[_key].append(_end - _start) _start = time.time() - _rmse, _smape, _u = get_point_statistics(test, mfts, indexer) + _rmse, _smape, _u = Measures.get_point_statistics(test, mfts, indexer) _end = time.time() rmse[_key].append(_rmse) smape[_key].append(_smape) @@ -262,7 +204,7 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G print("Process Duration: {0}".format(_process_end - _process_start)) - return save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) + return Util.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) def all_point_forecasters(data_train, data_test, partitions, max_order=3, statistics=True, residuals=True, @@ -326,38 +268,6 @@ def all_point_forecasters(data_train, data_test, partitions, max_order=3, statis plot_probability_distributions(pmfs, lcolors, tam=tam) -def get_point_statistics(data, model, indexer=None): - if indexer is not None: - ndata = np.array(indexer.get_data(data[model.order:])) - else: - ndata = np.array(data[model.order:]) - - if model.isMultivariate or indexer is None: - forecasts = model.forecast(data) - elif not model.isMultivariate and indexer is not None: - forecasts = model.forecast(indexer.get_data(data)) - - if model.hasSeasonality: - nforecasts = np.array(forecasts) - else: - nforecasts = np.array(forecasts[:-1]) - ret = list() - try: - ret.append(np.round(Measures.rmse(ndata, nforecasts), 2)) - except: - ret.append(np.nan) - try: - ret.append(np.round(Measures.smape(ndata, nforecasts), 2)) - except: - ret.append(np.nan) - try: - ret.append(np.round(Measures.UStatistic(ndata, nforecasts), 2)) - except: - ret.append(np.nan) - - return ret - - def print_point_statistics(data, models, externalmodels = None, externalforecasts = None, indexers=None): ret = "Model & Order & RMSE & SMAPE & Theil's U \\\\ \n" for count,model in enumerate(models,start=0): diff --git a/benchmarks/distributed_benchmarks.py b/benchmarks/distributed_benchmarks.py new file mode 100644 index 0000000..6859d5e --- /dev/null +++ b/benchmarks/distributed_benchmarks.py @@ -0,0 +1,179 @@ +import random +import dispy +import dispy.httpd +from copy import deepcopy +import numpy as np +import pandas as pd +import time +import datetime +import pyFTS +from pyFTS.partitioners import partitioner, Grid, Huarng, Entropy, FCM +from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, ProbabilityDistribution +from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util +from pyFTS import fts, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts +from pyFTS.benchmarks import benchmarks, parallel_benchmarks, Util as bUtil + + +def run_point(mfts, partitioner, train_data, test_data, window_key=None, transformation=None, indexer=None): + import time + from pyFTS import yu,chen,hofts,ifts,pwfts,ismailefendi,sadaei + from pyFTS.partitioners import Grid, Entropy, FCM + from pyFTS.benchmarks import Measures + + tmp = [yu.WeightedFTS, chen.ConventionalFTS, hofts.HighOrderFTS, ifts.IntervalFTS, + pwfts.ProbabilisticWeightedFTS, ismailefendi.ImprovedWeightedFTS, sadaei.ExponentialyWeightedFTS] + + tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] + + tmp3 = [Measures.get_point_statistics] + + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) + +# try: + _start = time.time() + mfts.train(train_data, partitioner.sets, order=mfts.order) + _end = time.time() + times = _end - _start + + _start = time.time() + _rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer) + _end = time.time() + times += _end - _start + # except Exception as e: + # print(e) + # _rmse = np.nan + # _smape = np.nan + # _u = np.nan + # times = np.nan + + ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times, 'window': window_key} + + # print(ret) + + return ret + + +def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, sintetic=False,nodes=None, depends=None): + +# dependencies = [fts, Membership, benchmarks] +# if depends is not None: dependencies.extend(depends) + +# if models is not None: +# dependencies.extend(models) +# else: +# dependencies.extend(benchmarks.get_point_methods()) + +# dependencies.extend(partitioners) +# if transformation is not None: dependencies.extend(transformation) +# if indexer is not None: dependencies.extend(indexer) + + cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) + + # import dispy's httpd module, create http server for this cluster + + http_server = dispy.httpd.DispyHTTPServer(cluster) + + _process_start = time.time() + + print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) + + pool = [] + jobs = [] + objs = {} + rmse = {} + smape = {} + u = {} + times = {} + + if models is None: + models = benchmarks.get_point_methods() + + for model in models: + mfts = model("") + + if mfts.isHighOrder: + for order in np.arange(1, max_order + 1): + if order >= mfts.minOrder: + mfts = model("") + mfts.order = order + pool.append(mfts) + else: + pool.append(mfts) + + experiments = 0 + for ct, train, test in Util.sliding_window(data, windowsize, train): + experiments += 1 + + if dump: print('\nWindow: {0}\n'.format(ct)) + + for partition in partitions: + + for partitioner in partitioners: + + data_train_fs = partitioner(train, partition, transformation=transformation) + + for id, m in enumerate(pool,start=0): + job = cluster.submit(m, data_train_fs, train, test, ct, transformation) + job.id = id # associate an ID to identify jobs (if needed later) + jobs.append(job) + + for job in jobs: + tmp = job() + if job.status == dispy.DispyJob.Finished and tmp is not None: + if tmp['key'] not in objs: + objs[tmp['key']] = tmp['obj'] + rmse[tmp['key']] = [] + smape[tmp['key']] = [] + u[tmp['key']] = [] + times[tmp['key']] = [] + rmse[tmp['key']].append(tmp['rmse']) + smape[tmp['key']].append(tmp['smape']) + u[tmp['key']].append(tmp['u']) + times[tmp['key']].append(tmp['time']) + print(tmp['key'], tmp['window']) + else: + print(job.exception) + print(job.stdout) + + _process_end = time.time() + + print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) + + print("Process Duration: {0}".format(_process_end - _process_start)) + + cluster.wait() # wait for all jobs to finish + + cluster.print_status() + + http_server.shutdown() # this waits until browser gets all updates + cluster.close() + + return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) + + +def compute(data): + import socket + return (socket.gethostname(), data) + + +def teste(data,nodes): + cluster = dispy.JobCluster(compute, nodes=nodes) + jobs = [] + for ct, train, test in Util.sliding_window(data, 2000, 0.8): + job = cluster.submit(ct) + jobs.append(job) + + for job in jobs: + x = job() + print(x) + + cluster.wait() + cluster.close() + + diff --git a/benchmarks/parallel_benchmarks.py b/benchmarks/parallel_benchmarks.py index d8061d9..185b59a 100644 --- a/benchmarks/parallel_benchmarks.py +++ b/benchmarks/parallel_benchmarks.py @@ -6,12 +6,6 @@ import numpy as np import pandas as pd import time import datetime -import matplotlib as plt -import matplotlib.colors as pltcolors -import matplotlib.cm as cmx -import matplotlib.pyplot as plt -from mpl_toolkits.mplot3d import Axes3D -# from sklearn.cross_validation import KFold from pyFTS.partitioners import partitioner, Grid, Huarng, Entropy, FCM from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, ProbabilityDistribution from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util diff --git a/fts.py b/fts.py index 9925a41..e1a8306 100644 --- a/fts.py +++ b/fts.py @@ -2,8 +2,6 @@ import numpy as np import pandas as pd from pyFTS import tree from pyFTS.common import FuzzySet, SortedCollection -from pyFTS.benchmarks import Measures - class FTS(object): def __init__(self, order, name): diff --git a/partitioners/partitioner.py b/partitioners/partitioner.py index abdfa0e..a466aca 100644 --- a/partitioners/partitioner.py +++ b/partitioners/partitioner.py @@ -43,4 +43,7 @@ class Partitioner(object): elif s.mf == Membership.gaussmf: tmpx = [kk for kk in np.arange(s.lower, s.upper)] tmpy = [s.membership(kk) for kk in np.arange(s.lower, s.upper)] - ax.plot(tmpx, tmpy) \ No newline at end of file + ax.plot(tmpx, tmpy) + + def __str__(self): + return self.name \ No newline at end of file diff --git a/tests/general.py b/tests/general.py index c4c5a88..32bf43f 100644 --- a/tests/general.py +++ b/tests/general.py @@ -28,13 +28,20 @@ os.chdir("/home/petronio/dados/Dropbox/Doutorado/Codigos/") taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",") taiex = np.array(taiexpd["avg"][:5000]) -from pyFTS.benchmarks import parallel_benchmarks as bchmk +from pyFTS.benchmarks import distributed_benchmarks as bchmk +#from pyFTS.benchmarks import parallel_benchmarks as bchmk #from pyFTS.benchmarks import benchmarks as bchmk +from pyFTS import yu -bchmk.point_sliding_window(taiex,2000,train=0.8, #transformation=diff, #models=[pwfts.ProbabilisticWeightedFTS], # # +#bchmk.teste(taiex,['192.168.0.109', '192.168.0.101']) + +bchmk.point_sliding_window(taiex,2000,train=0.8, #models=[yu.WeightedFTS], # # partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ], - partitions= np.arange(10,200,step=5), # - dump=False, save=True, file="experiments/nasdaq_point_parallel.csv") + partitions= np.arange(10,200,step=5), #transformation=diff, + dump=False, save=True, file="experiments/nasdaq_point_distributed.csv", + nodes=['192.168.0.109', '192.168.0.101']) #, depends=[hofts, ifts]) + +#bchmk.testa(taiex,[10,20],partitioners=[Grid.GridPartitioner], nodes=['192.168.0.109', '192.168.0.101']) #parallel_util.explore_partitioners(taiex,20) @@ -51,7 +58,7 @@ bchmk.point_sliding_window(taiex,2000,train=0.8, #transformation=diff, #models=[ #, , -diff = Transformations.Differential(1) +#diff = Transformations.Differential(1) #bchmk.external_point_sliding_window([naive.Naive, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA],