- Parallel sliding window benchmarks

This commit is contained in:
Petrônio Cândido de Lima e Silva 2017-04-04 18:39:15 -03:00
parent 16af475646
commit fa676f9f1d
3 changed files with 441 additions and 211 deletions

View File

@ -4,6 +4,7 @@
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib as plt
import matplotlib.colors as pltcolors
import matplotlib.cm as cmx
@ -14,6 +15,7 @@ from pyFTS.partitioners import partitioner, Grid, Huarng, Entropy, FCM
from pyFTS.benchmarks import Measures, naive, arima, ResidualAnalysis, ProbabilityDistribution
from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util
from pyFTS import fts, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts
from copy import deepcopy
colors = ['grey', 'rosybrown', 'maroon', 'red','orange', 'yellow', 'olive', 'green',
'cyan', 'blue', 'darkblue', 'purple', 'darkviolet']
@ -34,7 +36,66 @@ def get_point_methods():
def get_interval_methods():
return [ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS]
def external_point_sliding_window(models, parameters, data, windowsize,train=0.8, dump=False, save=False, file=None):
def save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u):
ret = []
if sintetic:
for k in sorted(objs.keys()):
try:
mod = []
mfts = objs[k]
mod.append(mfts.shortname)
mod.append(mfts.order)
mod.append(mfts.partitioner.name)
mod.append(mfts.partitioner.partitions)
mod.append(len(mfts))
mod.append(np.round(np.nanmean(rmse[k]), 2))
mod.append(np.round(np.nanstd(rmse[k]), 2))
mod.append(np.round(np.nanmean(smape[k]), 2))
mod.append(np.round(np.nanstd(smape[k]), 2))
mod.append(np.round(np.nanmean(u[k]), 2))
mod.append(np.round(np.nanstd(u[k]), 2))
mod.append(np.round(np.nanmean(times[k]), 4))
ret.append(mod)
except Exception as ex:
print("Erro ao salvar ", k)
print("Exceção ", ex)
columns = ["Model", "Order", "Scheme","Partitions", "Size", "RMSEAVG", "RMSESTD", "SMAPEAVG", "SMAPESTD", "UAVG", "USTD", "TIMEAVG"]
else:
for k in sorted(objs.keys()):
try:
mfts = objs[k]
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'RMSE']
tmp.extend(rmse[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'SMAPE']
tmp.extend(smape[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'U']
tmp.extend(u[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME']
tmp.extend(times[k])
ret.append(deepcopy(tmp))
except Exception as ex:
print("Erro ao salvar ", k)
print("Exceção ", ex)
columns = [str(k) for k in np.arange(0, experiments)]
columns.insert(0, "Model")
columns.insert(1, "Order")
columns.insert(2, "Scheme")
columns.insert(3, "Partitions")
columns.insert(4, "Size")
columns.insert(5, "Measure")
dat = pd.DataFrame(ret, columns=columns)
if save: dat.to_csv(Util.uniquefilename(file), sep=";")
return dat
def external_point_sliding_window(models, parameters, data, windowsize,train=0.8, dump=False, save=False, file=None, sintetic=True):
objs = {}
lcolors = {}
rmse = {}
@ -42,7 +103,9 @@ def external_point_sliding_window(models, parameters, data, windowsize,train=0.8
u = {}
times = {}
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
experiments += 1
for count, method in enumerate(models, start=0):
model = method("")
@ -80,36 +143,17 @@ def external_point_sliding_window(models, parameters, data, windowsize,train=0.8
u[_key].append(np.nan)
times[_key].append(np.nan)
ret = []
for k in sorted(objs.keys()):
try:
mod = []
mfts = objs[k]
mod.append(mfts.shortname)
mod.append(np.round(np.nanmean(rmse[k]), 2))
mod.append(np.round(np.nanstd(rmse[k]), 2))
mod.append(np.round(np.nanmean(smape[k]), 2))
mod.append(np.round(np.nanstd(smape[k]), 2))
mod.append(np.round(np.nanmean(u[k]), 2))
mod.append(np.round(np.nanstd(u[k]), 2))
mod.append(np.round(np.nanmean(times[k]), 4))
ret.append(mod)
except Exception as ex:
print("Erro ao salvar ",k)
print("Exceção ", ex)
columns = ["Model", "RMSEAVG", "RMSESTD", "SMAPEAVG", "SMAPESTD", "UAVG", "USTD", "TIMEAVG"]
dat = pd.DataFrame(ret, columns=columns)
if save: dat.to_csv(Util.uniquefilename(file), sep=";")
return dat
return save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u)
def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3,transformation=None,indexer=None,dump=False,
save=False, file=None):
save=False, file=None, sintetic=True):
_process_start = time.time()
print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now()))
if models is None:
models = get_point_methods()
@ -121,8 +165,9 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G
u = {}
times = {}
experiments = 0
for ct, train,test in Util.sliding_window(data, windowsize, train):
mocks = {}
experiments += 1
for partition in partitions:
for partitioner in partitioners:
pttr = str(partitioner.__module__).split('.')[-1]
@ -210,37 +255,14 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G
smape[_key].append(np.nan)
u[_key].append(np.nan)
times[_key].append(np.nan)
ret = []
for k in sorted(objs.keys()):
try:
mod = []
tmp = objs[k]
mod.append(tmp.shortname)
mod.append(tmp.order )
mod.append(tmp.partitioner.name)
mod.append(tmp.partitioner.partitions)
mod.append(np.round(np.nanmean(rmse[k]),2))
mod.append(np.round(np.nanstd(rmse[k]), 2))
mod.append(np.round(np.nanmean(smape[k]), 2))
mod.append(np.round(np.nanstd(smape[k]), 2))
mod.append(np.round(np.nanmean(u[k]), 2))
mod.append(np.round(np.nanstd(u[k]), 2))
mod.append(np.round(np.nanmean(times[k]), 4))
mod.append(np.round(np.nanstd(times[k]), 4))
mod.append(len(tmp))
ret.append(mod)
except Exception as ex:
print("Erro ao salvar ",k)
print("Exceção ", ex)
columns = ["Model","Order","Scheme","Partitions","RMSEAVG","RMSESTD","SMAPEAVG","SMAPESTD","UAVG","USTD","TIMEAVG","TIMESTD","SIZE"]
_process_end = time.time()
dat = pd.DataFrame(ret,columns=columns)
print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now()))
if save: dat.to_csv(Util.uniquefilename(file),sep=";")
return dat
print("Process Duration: {0}".format(_process_end - _process_start))
return save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u)
def all_point_forecasters(data_train, data_test, partitions, max_order=3, statistics=True, residuals=True,
@ -370,10 +392,67 @@ def getProbabilityDistributionStatistics(pmfs, data):
ret += " \\\\ \n"
return ret
def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times):
ret = []
if sintetic:
for k in sorted(objs.keys()):
mod = []
mfts = objs[k]
mod.append(mfts.shortname)
mod.append(mfts.order)
mod.append(mfts.partitioner.name)
mod.append(mfts.partitioner.partitions)
mod.append(round(np.nanmean(sharpness[k]), 2))
mod.append(round(np.nanstd(sharpness[k]), 2))
mod.append(round(np.nanmean(resolution[k]), 2))
mod.append(round(np.nanstd(resolution[k]), 2))
mod.append(round(np.nanmean(coverage[k]), 2))
mod.append(round(np.nanstd(coverage[k]), 2))
mod.append(round(np.nanmean(times[k]), 2))
mod.append(round(np.nanstd(times[k]), 2))
mod.append(len(mfts))
ret.append(mod)
columns = ["Model", "Order", "Scheme", "Partitions", "SHARPAVG", "SHARPSTD", "RESAVG", "RESSTD", "COVAVG",
"COVSTD", "TIMEAVG", "TIMESTD", "SIZE"]
else:
for k in sorted(objs.keys()):
try:
mfts = objs[k]
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts),
'Sharpness']
tmp.extend(sharpness[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts),
'Resolution']
tmp.extend(resolution[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts),
'Coverage']
tmp.extend(coverage[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts),
'TIME']
tmp.extend(times[k])
ret.append(deepcopy(tmp))
except Exception as ex:
print("Erro ao salvar ", k)
print("Exceção ", ex)
columns = [str(k) for k in np.arange(0, experiments)]
columns.insert(0, "Model")
columns.insert(1, "Order")
columns.insert(2, "Scheme")
columns.insert(3, "Partitions")
columns.insert(4, "Size")
columns.insert(5, "Measure")
dat = pd.DataFrame(ret, columns=columns)
if save: dat.to_csv(Util.uniquefilename(file), sep=";")
return dat
def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3,transformation=None,indexer=None,dump=False,
save=False, file=None):
save=False, file=None, sintetic=True):
if models is None:
models = get_interval_methods()
@ -384,7 +463,9 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners
coverage = {}
times = {}
experiments = 0
for ct, training,test in Util.sliding_window(data, windowsize, train):
experiments += 1
for partition in partitions:
for partitioner in partitioners:
pttr = str(partitioner.__module__).split('.')[-1]
@ -460,32 +541,7 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners
coverage[_key].append(_cov)
times[_key].append(_tdiff)
ret = []
for k in sorted(objs.keys()):
mod = []
mfts = objs[k]
mod.append(mfts.shortname)
mod.append(mfts.order )
mod.append(mfts.partitioner.name)
mod.append(mfts.partitioner.partitions)
mod.append(round(np.nanmean(sharpness[k]),2))
mod.append(round(np.nanstd(sharpness[k]), 2))
mod.append(round(np.nanmean(resolution[k]), 2))
mod.append(round(np.nanstd(resolution[k]), 2))
mod.append(round(np.nanmean(coverage[k]), 2))
mod.append(round(np.nanstd(coverage[k]), 2))
mod.append(round(np.nanmean(times[k]), 2))
mod.append(round(np.nanstd(times[k]), 2))
mod.append(len(mfts))
ret.append(mod)
columns = ["Model","Order","Scheme","Partitions","SHARPAVG","SHARPSTD","RESAVG","RESSTD","COVAVG","COVSTD","TIMEAVG","TIMESTD","SIZE"]
dat = pd.DataFrame(ret,columns=columns)
if save: dat.to_csv(Util.uniquefilename(file),sep=";")
return dat
return save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times)
def all_interval_forecasters(data_train, data_test, partitions, max_order=3,save=False, file=None, tam=[20, 5],
@ -613,9 +669,72 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]):
ax.legend(handles0, labels0)
def save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic):
ret = []
if sintetic:
for k in sorted(objs.keys()):
try:
ret = []
for k in sorted(objs.keys()):
try:
mod = []
mfts = objs[k]
mod.append(mfts.shortname)
mod.append(mfts.order)
mod.append(mfts.partitioner.name)
mod.append(mfts.partitioner.partitions)
mod.append(np.round(np.nanmean(crps_interval[k]), 2))
mod.append(np.round(np.nanstd(crps_interval[k]), 2))
mod.append(np.round(np.nanmean(crps_distr[k]), 2))
mod.append(np.round(np.nanstd(crps_distr[k]), 2))
mod.append(len(mfts))
mod.append(np.round(np.nanmean(times1[k]), 4))
mod.append(np.round(np.nanmean(times2[k]), 4))
ret.append(mod)
except Exception as e:
print('Erro: %s' % e)
except Exception as ex:
print("Erro ao salvar ", k)
print("Exceção ", ex)
columns = ["Model", "Order", "Scheme", "Partitions", "CRPS1AVG", "CRPS1STD", "CRPS2AVG", "CRPS2STD",
"SIZE", "TIME1AVG", "TIME2AVG"]
else:
for k in sorted(objs.keys()):
try:
mfts = objs[k]
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'CRPS_Interval']
tmp.extend(crps_interval[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'CRPS_Distribution']
tmp.extend(crps_distr[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME_Interval']
tmp.extend(times1[k])
ret.append(deepcopy(tmp))
tmp = [mfts.shortname, mfts.order, mfts.partitioner.name, mfts.partitioner.partitions, len(mfts), 'TIME_Distribution']
tmp.extend(times2[k])
ret.append(deepcopy(tmp))
except Exception as ex:
print("Erro ao salvar ", k)
print("Exceção ", ex)
columns = [str(k) for k in np.arange(0, experiments)]
columns.insert(0, "Model")
columns.insert(1, "Order")
columns.insert(2, "Scheme")
columns.insert(3, "Partitions")
columns.insert(4, "Size")
columns.insert(5, "Measure")
dat = pd.DataFrame(ret, columns=columns)
if save: dat.to_csv(Util.uniquefilename(file), sep=";")
return dat
def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution = None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3,transformation=None,indexer=None,dump=False,
save=False, file=None):
save=False, file=None, sintetic=False):
if models is None:
models = [pwfts.ProbabilisticWeightedFTS]
@ -626,7 +745,9 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution
times1 = {}
times2 = {}
experiments = 0
for ct, train,test in Util.sliding_window(data, windowsize, train):
experiments += 1
for partition in partitions:
for partitioner in partitioners:
pttr = str(partitioner.__module__).split('.')[-1]
@ -704,33 +825,7 @@ def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution
if dump: print(_crps1, _crps2, _tdiff, _t1, _t2)
ret = []
for k in sorted(objs.keys()):
try:
mod = []
mfts = objs[k]
mod.append(mfts.shortname)
mod.append(mfts.order )
mod.append(mfts.partitioner.name)
mod.append(mfts.partitioner.partitions)
mod.append(np.round(np.nanmean(crps_interval[k]),2))
mod.append(np.round(np.nanstd(crps_interval[k]), 2))
mod.append(np.round(np.nanmean(crps_distr[k]), 2))
mod.append(np.round(np.nanstd(crps_distr[k]), 2))
mod.append(len(mfts))
mod.append(np.round(np.nanmean(times1[k]), 4))
mod.append(np.round(np.nanmean(times2[k]), 4))
ret.append(mod)
except Exception as e:
print ('Erro: %s' % e)
columns = ["Model","Order","Scheme","Partitions","CRPS1AVG","CRPS1STD","CRPS2AVG","CRPS2STD","SIZE","TIME1AVG","TIME2AVG"]
dat = pd.DataFrame(ret,columns=columns)
if save: dat.to_csv(Util.uniquefilename(file),sep=";")
return dat
return save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic)
def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resolution = None, max_order=3,save=False, file=None, tam=[20, 5],

View File

@ -5,6 +5,7 @@ import multiprocessing
import numpy as np
import pandas as pd
import time
import datetime
import matplotlib as plt
import matplotlib.colors as pltcolors
import matplotlib.cm as cmx
@ -17,25 +18,17 @@ from pyFTS.common import Membership, FuzzySet, FLR, Transformations, Util
from pyFTS import fts, chen, yu, ismailefendi, sadaei, hofts, hwang, pwfts, ifts
from pyFTS.benchmarks import benchmarks
def get_first_order_models():
return [chen.ConventionalFTS, yu.WeightedFTS, ismailefendi.ImprovedWeightedFTS,
sadaei.ExponentialyWeightedFTS]
def get_high_order_models():
return [hofts.HighOrderFTS, pwfts.ProbabilisticWeightedFTS]
def run_first_order(method, partitioner, train_data, test_data, transformation = None, indexer=None ):
mfts = method("")
def run_point(mfts, partitioner, train_data, test_data, transformation=None, indexer=None):
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " " + pttr + " q = " + str(partitioner.partitions)
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.appendTransformation(transformation)
try:
_start = time.time()
mfts.train(train_data, partitioner.sets)
mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time()
times = _end - _start
@ -57,62 +50,51 @@ def run_first_order(method, partitioner, train_data, test_data, transformation =
return ret
def run_high_order(method, order, partitioner, train_data, test_data, transformation=None, indexer=None):
mfts = method("")
if order >= mfts.minOrder:
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.appendTransformation(transformation)
try:
_start = time.time()
mfts.train(train_data, partitioner.sets, order=order)
_end = time.time()
times = _end - _start
_start = time.time()
_rmse, _smape, _u = benchmarks.get_point_statistics(test_data, mfts, indexer)
_end = time.time()
times += _end - _start
except Exception as e:
print(e)
_rmse = np.nan
_smape = np.nan
_u = np.nan
times = np.nan
ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times}
print(ret)
return ret
return {'key': None, 'obj': mfts, 'rmse': np.nan, 'smape': np.nan, 'u': np.nan, 'time': np.nan}
def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None):
save=False, file=None, sintetic=False):
_process_start = time.time()
print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now()))
num_cores = multiprocessing.cpu_count()
pool = []
objs = {}
rmse = {}
smape = {}
u = {}
times = {}
for model in benchmarks.get_point_methods():
mfts = model("")
if mfts.isHighOrder:
for order in np.arange(1, max_order + 1):
if order >= mfts.minOrder:
mfts = model("")
mfts.order = order
pool.append(mfts)
else:
pool.append(mfts)
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
mocks = {}
experiments += 1
if dump: print('\nWindow: {0}\n'.format(ct))
for partition in partitions:
for partitioner in partitioners:
pttr = str(partitioner.__module__).split('.')[-1]
data_train_fs = partitioner(train, partition, transformation=transformation)
results = Parallel(n_jobs=num_cores)(delayed(run_first_order)(m, deepcopy(data_train_fs), deepcopy(train), deepcopy(test), transformation)
for m in get_first_order_models())
results = Parallel(n_jobs=num_cores)(
delayed(run_point)(deepcopy(m), deepcopy(data_train_fs), deepcopy(train), deepcopy(test),
transformation)
for m in pool)
for tmp in results:
if tmp['key'] not in objs:
@ -126,56 +108,208 @@ def point_sliding_window(data, windowsize, train=0.8,models=None,partitioners=[G
u[tmp['key']].append(tmp['u'])
times[tmp['key']].append(tmp['time'])
for count, model in enumerate(get_high_order_models(), start=0):
_process_end = time.time()
print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now()))
print("Process Duration: {0}".format(_process_end - _process_start))
return benchmarks.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u)
def run_interval(mfts, partitioner, train_data, test_data, transformation=None, indexer=None):
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.appendTransformation(transformation)
try:
_start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time()
times = _end - _start
_start = time.time()
_sharp, _res, _cov = benchmarks.get_interval_statistics(test_data, mfts)
_end = time.time()
times += _end - _start
except Exception as e:
print(e)
_sharp = np.nan
_res = np.nan
_cov = np.nan
times = np.nan
ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times}
print(ret)
return ret
def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, sintetic=False):
_process_start = time.time()
print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now()))
num_cores = multiprocessing.cpu_count()
pool = []
objs = {}
sharpness = {}
resolution = {}
coverage = {}
times = {}
for model in benchmarks.get_interval_methods():
mfts = model("")
if mfts.isHighOrder:
for order in np.arange(1, max_order + 1):
if order >= mfts.minOrder:
mfts = model("")
mfts.order = order
pool.append(mfts)
else:
pool.append(mfts)
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
experiments += 1
if dump: print('\nWindow: {0}\n'.format(ct))
for partition in partitions:
for partitioner in partitioners:
data_train_fs = partitioner(train, partition, transformation=transformation)
results = Parallel(n_jobs=num_cores)(
delayed(run_high_order)(model, order, deepcopy(data_train_fs), deepcopy(train), deepcopy(test),
delayed(run_interval)(deepcopy(m), deepcopy(data_train_fs), deepcopy(train), deepcopy(test),
transformation)
for order in np.arange(1, max_order + 1))
for m in pool)
for tmp in results:
if tmp['key'] not in objs:
objs[tmp['key']] = tmp['obj']
rmse[tmp['key']] = []
smape[tmp['key']] = []
u[tmp['key']] = []
sharpness[tmp['key']] = []
resolution[tmp['key']] = []
coverage[tmp['key']] = []
times[tmp['key']] = []
rmse[tmp['key']].append(tmp['rmse'])
smape[tmp['key']].append(tmp['smape'])
u[tmp['key']].append(tmp['u'])
sharpness[tmp['key']].append(tmp['sharpness'])
resolution[tmp['key']].append(tmp['resolution'])
coverage[tmp['key']].append(tmp['coverage'])
times[tmp['key']].append(tmp['time'])
ret = []
for k in sorted(objs.keys()):
_process_end = time.time()
print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now()))
print("Process Duration: {0}".format(_process_end - _process_start))
return benchmarks.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, times)
def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, transformation=None, indexer=None):
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.appendTransformation(transformation)
try:
mod = []
tmp = objs[k]
mod.append(tmp.shortname)
mod.append(tmp.order )
mod.append(tmp.partitioner.name)
mod.append(tmp.partitioner.partitions)
mod.append(np.round(np.nanmean(rmse[k]),2))
mod.append(np.round(np.nanstd(rmse[k]), 2))
mod.append(np.round(np.nanmean(smape[k]), 2))
mod.append(np.round(np.nanstd(smape[k]), 2))
mod.append(np.round(np.nanmean(u[k]), 2))
mod.append(np.round(np.nanstd(u[k]), 2))
mod.append(np.round(np.nanmean(times[k]), 4))
mod.append(np.round(np.nanstd(times[k]), 4))
mod.append(len(tmp))
ret.append(mod)
except Exception as ex:
print("Erro ao salvar ",k)
print("Exceção ", ex)
_start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time()
times = _end - _start
columns = ["Model","Order","Scheme","Partitions","RMSEAVG","RMSESTD","SMAPEAVG","SMAPESTD","UAVG","USTD","TIMEAVG","TIMESTD","SIZE"]
_crps1, _crps2, _t1, _t2 = benchmarks.get_distribution_statistics(test_data, mfts, steps=steps,
resolution=resolution)
_t1 += times
_t2 += times
except Exception as e:
print(e)
_crps1 = np.nan
_crps2 = np.nan
_t1 = np.nan
_t2 = np.nan
dat = pd.DataFrame(ret,columns=columns)
ret = {'key': _key, 'obj': mfts, 'CRPS_Interval': _crps1, 'CRPS_Distribution': _crps2, 'TIME_Interval': _t1, 'TIME_Distribution': _t2}
if save: dat.to_csv(Util.uniquefilename(file),sep=";")
print(ret)
return dat
return ret
def ahead_sliding_window(data, windowsize, train, steps,resolution, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, sintetic=False):
_process_start = time.time()
print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now()))
num_cores = multiprocessing.cpu_count()
pool = []
objs = {}
crps_interval = {}
crps_distr = {}
times1 = {}
times2 = {}
for model in benchmarks.get_interval_methods():
mfts = model("")
if mfts.isHighOrder:
for order in np.arange(1, max_order + 1):
if order >= mfts.minOrder:
mfts = model("")
mfts.order = order
pool.append(mfts)
else:
pool.append(mfts)
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
experiments += 1
if dump: print('\nWindow: {0}\n'.format(ct))
for partition in partitions:
for partitioner in partitioners:
data_train_fs = partitioner(train, partition, transformation=transformation)
results = Parallel(n_jobs=num_cores)(
delayed(run_ahead)(deepcopy(m), deepcopy(data_train_fs), deepcopy(train), deepcopy(test),
steps, resolution, transformation)
for m in pool)
for tmp in results:
if tmp['key'] not in objs:
objs[tmp['key']] = tmp['obj']
crps_interval[tmp['key']] = []
crps_distr[tmp['key']] = []
times1[tmp['key']] = []
times2[tmp['key']] = []
crps_interval[tmp['key']].append(tmp['CRPS_Interval'])
crps_distr[tmp['key']].append(tmp['CRPS_Distribution'])
times1[tmp['key']].append(tmp['TIME_Interval'])
times2[tmp['key']].append(tmp['TIME_Distribution'])
_process_end = time.time()
print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now()))
print("Process Duration: {0}".format(_process_end - _process_start))
return benchmarks.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, sintetic)

View File

@ -29,11 +29,12 @@ taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",")
taiex = np.array(taiexpd["avg"][:5000])
from pyFTS.benchmarks import parallel_benchmarks as bchmk
#from pyFTS.benchmarks import benchmarks as bchmk
bchmk.point_sliding_window(taiex,2000,train=0.8, #transformation=diff, #models=[pwfts.ProbabilisticWeightedFTS], # #
partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ],
partitions= np.arange(10,200,step=5), #
dump=True, save=True, file="experiments/nasdaq_point_paralllel.csv")
dump=False, save=True, file="experiments/nasdaq_point_parallel.csv")
#parallel_util.explore_partitioners(taiex,20)