- Optimizations for interval distributed benchmarks

This commit is contained in:
Petrônio Cândido de Lima e Silva 2017-05-13 21:03:49 -03:00
parent 3b21889d6c
commit 2c0e685e87
8 changed files with 219 additions and 55 deletions

View File

@ -164,6 +164,36 @@ def coverage(targets, forecasts):
return np.mean(preds)
def pinball(tau, target, forecast):
"""
Pinball loss function. Measure the distance of forecast to the tau-quantile of the target
:param tau: quantile value in the range (0,1)
:param target:
:param forecast:
:return: distance of forecast to the tau-quantile of the target
"""
if target >= forecast:
return (target - forecast) * tau
else:
return (forecast - target) * (1 - tau)
def pinball_mean(tau, targets, forecasts):
"""
Mean pinball loss value of the forecast for a given tau-quantile of the targets
:param tau: quantile value in the range (0,1)
:param targets: list of target values
:param forecasts: list of prediction intervals
:return:
"""
preds = []
if tau <= 0.5:
preds = [pinball(tau, targets[i], forecasts[i][0]) for i in np.arange(0, len(forecasts))]
else:
preds = [pinball(tau, targets[i], forecasts[i][1]) for i in np.arange(0, len(forecasts))]
return np.nanmean(preds)
def pmf_to_cdf(density):
ret = []
for row in density.index:
@ -248,6 +278,10 @@ def get_interval_statistics(original, model):
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
ret.append(round(coverage(original[model.order:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.05, original[model.order:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.25, original[model.order:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.75, original[model.order:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.95, original[model.order:], forecasts[:-1]), 2))
return ret

View File

@ -240,8 +240,7 @@ def plot_dataframe_point(file_synthetic, file_analytic, experiments, tam):
plt.show()
def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times):
def save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, synthetic, times, q05, q25, q75, q95):
ret = []
if synthetic:
for k in sorted(objs.keys()):
@ -265,6 +264,14 @@ def save_dataframe_interval(coverage, experiments, file, objs, resolution, save,
mod.append(round(np.nanstd(coverage[k]), 2))
mod.append(round(np.nanmean(times[k]), 2))
mod.append(round(np.nanstd(times[k]), 2))
mod.append(round(np.nanmean(q05[k]), 2))
mod.append(round(np.nanstd(q05[k]), 2))
mod.append(round(np.nanmean(q25[k]), 2))
mod.append(round(np.nanstd(q25[k]), 2))
mod.append(round(np.nanmean(q75[k]), 2))
mod.append(round(np.nanstd(q75[k]), 2))
mod.append(round(np.nanmean(q95[k]), 2))
mod.append(round(np.nanstd(q95[k]), 2))
mod.append(l)
ret.append(mod)
@ -296,6 +303,18 @@ def save_dataframe_interval(coverage, experiments, file, objs, resolution, save,
tmp = [n, o, s, p, l, 'TIME']
tmp.extend(times[k])
ret.append(deepcopy(tmp))
tmp = [n, o, s, p, l, 'Q05']
tmp.extend(q05[k])
ret.append(deepcopy(tmp))
tmp = [n, o, s, p, l, 'Q25']
tmp.extend(q25[k])
ret.append(deepcopy(tmp))
tmp = [n, o, s, p, l, 'Q75']
tmp.extend(q75[k])
ret.append(deepcopy(tmp))
tmp = [n, o, s, p, l, 'Q95']
tmp.extend(q95[k])
ret.append(deepcopy(tmp))
except Exception as ex:
print("Erro ao salvar ", k)
print("Exceção ", ex)
@ -317,7 +336,7 @@ def interval_dataframe_analytic_columns(experiments):
def interval_dataframe_synthetic_columns():
columns = ["Model", "Order", "Scheme", "Partitions", "SHARPAVG", "SHARPSTD", "RESAVG", "RESSTD", "COVAVG",
"COVSTD", "TIMEAVG", "TIMESTD", "SIZE"]
"COVSTD", "TIMEAVG", "TIMESTD", "Q05AVG", "Q05STD", "Q25AVG", "Q25STD", "Q75AVG", "Q75STD", "Q95AVG", "Q95STD", "SIZE"]
return columns

View File

@ -3,6 +3,7 @@
import numpy as np
from statsmodels.tsa.arima_model import ARIMA as stats_arima
import scipy.stats as st
from pyFTS import fts
@ -15,6 +16,8 @@ class ARIMA(fts.FTS):
self.name = "ARIMA"
self.detail = "Auto Regressive Integrated Moving Average"
self.is_high_order = True
self.has_point_forecasting = True
self.has_interval_forecasting = True
self.model = None
self.model_fit = None
self.trained_data = None
@ -23,6 +26,7 @@ class ARIMA(fts.FTS):
self.q = 0
self.benchmark_only = True
self.min_order = 1
self.alpha = (1 - kwargs.get("alpha", 0.90))/2
def train(self, data, sets, order, parameters=None):
self.p = order[0]
@ -35,6 +39,7 @@ class ARIMA(fts.FTS):
try:
self.model = stats_arima(data, order=(self.p, self.d, self.q))
self.model_fit = self.model.fit(disp=0)
print(np.sqrt(self.model_fit.sigma2))
except Exception as ex:
print(ex)
self.model_fit = None
@ -72,3 +77,64 @@ class ARIMA(fts.FTS):
ret = self.doInverseTransformations(ret, params=[data[self.order - 1:]])
return ret
def forecastInterval(self, data, **kwargs):
if self.model_fit is None:
return np.nan
sigma = np.sqrt(self.model_fit.sigma2)
ndata = np.array(self.doTransformations(data))
l = len(ndata)
ret = []
for k in np.arange(self.order, l+1):
tmp = []
sample = [ndata[i] for i in np.arange(k - self.order, k)]
mean = self.forecast(sample)[0]
tmp.append(mean + st.norm.ppf(self.alpha) * sigma)
tmp.append(mean + st.norm.ppf(1 - self.alpha) * sigma)
ret.append(tmp)
ret = self.doInverseTransformations(ret, params=[data[self.order - 1:]])
return ret
def forecastAheadInterval(self, data, steps, **kwargs):
if self.model_fit is None:
return np.nan
smoothing = kwargs.get("smoothing",0.2)
alpha = (1 - kwargs.get("alpha", 0.95))/2
sigma = np.sqrt(self.model_fit.sigma2)
ndata = np.array(self.doTransformations(data))
l = len(ndata)
means = self.forecastAhead(data,steps,kwargs)
ret = []
for k in np.arange(0, steps):
tmp = []
hsigma = (1 + k*smoothing)*sigma
tmp.append(means[k] + st.norm.ppf(alpha) * hsigma)
tmp.append(means[k] + st.norm.ppf(1 - alpha) * hsigma)
ret.append(tmp)
ret = self.doInverseTransformations(ret, params=[data[self.order - 1:]])
return ret

View File

@ -73,7 +73,7 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo
return ret
def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner],
def point_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
benchmark_models=None, benchmark_models_parameters = None,
save=False, file=None, sintetic=False,nodes=None, depends=None):
@ -82,6 +82,7 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=
:param data:
:param windowsize: size of sliding window
:param train: percentual of sliding window data used to train the models
:param inc: percentual of window is used do increment
:param models: FTS point forecasters
:param partitioners: Universe of Discourse partitioner
:param partitions: the max number of partitions on the Universe of Discourse
@ -146,7 +147,7 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=
pool.append(mfts)
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
for ct, train, test in Util.sliding_window(data, windowsize, train, inc):
experiments += 1
benchmarks_only = {}
@ -223,13 +224,18 @@ def run_interval(mfts, partitioner, train_data, test_data, window_key=None, tran
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
tmp4 = [arima.ARIMA, quantreg.QuantileRegression]
tmp3 = [Measures.get_interval_statistics]
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.appendTransformation(transformation)
if mfts.benchmark_only:
_key = mfts.shortname + str(mfts.order if mfts.order is not None else "") + str(mfts.alpha)
else:
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.appendTransformation(transformation)
_start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order)
@ -237,24 +243,26 @@ def run_interval(mfts, partitioner, train_data, test_data, window_key=None, tran
times = _end - _start
_start = time.time()
_sharp, _res, _cov = Measures.get_interval_statistics(test_data, mfts)
_sharp, _res, _cov, _q05, _q25, _q75, _q95 = Measures.get_interval_statistics(test_data, mfts)
_end = time.time()
times += _end - _start
ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times,
'window': window_key}
'Q05': _q05, 'Q25': _q25, 'Q75': _q75, 'Q95': _q95, 'window': window_key}
return ret
def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, sintetic=False,nodes=None, depends=None):
def interval_sliding_window(data, windowsize, train=0.8, inc=0.1, models=None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
benchmark_models=None, benchmark_models_parameters = None,
save=False, file=None, sintetic=False,nodes=None, depends=None):
"""
Distributed sliding window benchmarks for FTS interval forecasters
:param data:
:param windowsize: size of sliding window
:param train: percentual of sliding window data used to train the models
:param inc:
:param models: FTS point forecasters
:param partitioners: Universe of Discourse partitioner
:param partitions: the max number of partitions on the Universe of Discourse
@ -262,6 +270,8 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione
:param transformation: data transformation
:param indexer: seasonal indexer
:param dump:
:param benchmark_models:
:param benchmark_models_parameters:
:param save: save results
:param file: file path to save the results
:param sintetic: if true only the average and standard deviation of the results
@ -270,6 +280,15 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione
:return: DataFrame with the results
"""
alphas = [0.5, 0.25]
if benchmark_models is None and models is None:
benchmark_models = [arima.ARIMA, arima.ARIMA, arima.ARIMA, arima.ARIMA,
quantreg.QuantileRegression, quantreg.QuantileRegression]
if benchmark_models_parameters is None:
benchmark_models_parameters = [(1, 0, 0), (1, 0, 1), (2, 0, 1), (2, 0, 2), 1, 2]
cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies)
http_server = dispy.httpd.DispyHTTPServer(cluster)
@ -284,6 +303,10 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione
sharpness = {}
resolution = {}
coverage = {}
q05 = {}
q25 = {}
q75 = {}
q95 = {}
times = {}
if models is None:
@ -299,12 +322,23 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione
mfts.order = order
pool.append(mfts)
else:
mfts.order = 1
pool.append(mfts)
if benchmark_models is not None:
for count, model in enumerate(benchmark_models, start=0):
for a in alphas:
par = benchmark_models_parameters[count]
mfts = model(str(par if par is not None else ""), alpha=a)
mfts.order = par
pool.append(mfts)
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
for ct, train, test in Util.sliding_window(data, windowsize, train, inc=inc):
experiments += 1
benchmarks_only = {}
if dump: print('\nWindow: {0}\n'.format(ct))
for partition in partitions:
@ -314,6 +348,10 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione
data_train_fs = partitioner(train, partition, transformation=transformation)
for id, m in enumerate(pool,start=0):
if m.benchmark_only and m.shortname in benchmarks_only:
continue
else:
benchmarks_only[m.shortname] = m
job = cluster.submit(m, data_train_fs, train, test, ct, transformation)
job.id = id # associate an ID to identify jobs (if needed later)
jobs.append(job)
@ -327,11 +365,19 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione
resolution[tmp['key']] = []
coverage[tmp['key']] = []
times[tmp['key']] = []
q05[tmp['key']] = []
q25[tmp['key']] = []
q75[tmp['key']] = []
q95[tmp['key']] = []
sharpness[tmp['key']].append(tmp['sharpness'])
resolution[tmp['key']].append(tmp['resolution'])
coverage[tmp['key']].append(tmp['coverage'])
times[tmp['key']].append(tmp['time'])
q05[tmp['key']].append(tmp['Q05'])
q25[tmp['key']].append(tmp['Q25'])
q75[tmp['key']].append(tmp['Q75'])
q95[tmp['key']].append(tmp['Q95'])
print(tmp['key'])
else:
print(job.exception)
@ -350,8 +396,8 @@ def interval_sliding_window(data, windowsize, train=0.8, models=None, partitione
http_server.shutdown() # this waits until browser gets all updates
cluster.close()
return benchmarks.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic,
times)
return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic,
times, q05, q25, q75, q95)
def run_ahead(mfts, partitioner, train_data, test_data, steps, resolution, window_key=None, transformation=None, indexer=None):

View File

@ -19,7 +19,7 @@ class QuantileRegression(fts.FTS):
self.has_probability_forecasting = True
self.benchmark_only = True
self.minOrder = 1
self.alpha = 0.5
self.alpha = kwargs.get("alpha", 0.05)
self.upper_qt = None
self.mean_qt = None
self.lower_qt = None
@ -27,8 +27,6 @@ class QuantileRegression(fts.FTS):
def train(self, data, sets, order=1, parameters=None):
self.order = order
self.alpha = parameters
tmp = np.array(self.doTransformations(data))
lagdata, ndata = lagmat(tmp, maxlag=order, trim="both", original='sep')

View File

@ -72,7 +72,7 @@ class EnsembleFTS(fts.FTS):
ret = []
for k in np.arange(0, l):
for k in np.arange(0, l+1):
tmp = []
for model in self.models:
if k >= model.minOrder - 1:

8
fts.py
View File

@ -88,7 +88,13 @@ class FTS(object):
:param kwargs:
:return:
"""
pass
ret = []
for k in np.arange(0,steps):
tmp = self.forecast(data[-self.order:],kwargs)
ret.append(tmp)
data.append(tmp)
return ret
def forecastAheadInterval(self, data, steps, **kwargs):
"""

View File

@ -22,40 +22,36 @@ os.chdir("/home/petronio/dados/Dropbox/Doutorado/Codigos/")
#enrollments = pd.read_csv("DataSets/Enrollments.csv", sep=";")
#enrollments = np.array(enrollments["Enrollments"])
#from pyFTS import song
#enrollments_fs = Grid.GridPartitioner(enrollments, 10).sets
#model = song.ConventionalFTS('')
#model.train(enrollments,enrollments_fs)
#teste = model.forecast(enrollments)
#print(teste)
#print(FCM.FCMPartitionerTrimf.__module__)
"""
DATASETS
"""
#gauss = random.normal(0,1.0,5000)
#gauss_teste = random.normal(0,1.0,400)
taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",")
taiex = np.array(taiexpd["avg"][:2000])
#taiexpd = pd.read_csv("DataSets/TAIEX.csv", sep=",")
#taiex = np.array(taiexpd["avg"][:5000])
#nasdaqpd = pd.read_csv("DataSets/NASDAQ_IXIC.csv", sep=",")
#nasdaq = np.array(nasdaqpd["avg"][0:5000])
#from statsmodels.tsa.arima_model import ARIMA as stats_arima
from statsmodels.tsa.tsatools import lagmat
#sp500pd = pd.read_csv("DataSets/S&P500.csv", sep=",")
#sp500 = np.array(sp500pd["Avg"][11000:])
#del(sp500pd)
#tmp = np.arange(10)
sondapd = pd.read_csv("DataSets/SONDA_BSB_HOURLY_AVG.csv", sep=";")
sondapd = sondapd.dropna(axis=0, how='any')
sonda = np.array(sondapd["ws_10m"])
del(sondapd)
#lag, a = lagmat(tmp, maxlag=2, trim="both", original='sep')
#bestpd = pd.read_csv("DataSets/BEST_TAVG.csv", sep=";")
#best = np.array(bestpd["Anomaly"])
#print(lag)
#print(a)
from pyFTS.benchmarks import benchmarks as bchmk
#from pyFTS.benchmarks import distributed_benchmarks as bchmk
#from pyFTS.benchmarks import benchmarks as bchmk
from pyFTS.benchmarks import distributed_benchmarks as bchmk
#from pyFTS.benchmarks import parallel_benchmarks as bchmk
from pyFTS.benchmarks import Util
from pyFTS.benchmarks import arima, quantreg
@ -66,9 +62,9 @@ from pyFTS.benchmarks import arima, quantreg
#tmp = arima.ARIMA("")
#tmp.train(taiex[:1600], None, order=(2,0,2))
#teste = tmp.forecast(taiex[1600:1605])
#teste = tmp.forecastInterval(taiex[1600:1605])
#tmp = quantreg.QuantileRegression("")
#tmp = quan#treg.QuantileRegression("")
#tmp.train(taiex[:1600], None, order=2)
#teste = tmp.forecast(taiex[1600:1605])
@ -80,21 +76,20 @@ from pyFTS.benchmarks import arima, quantreg
from pyFTS import song, chen, yu, cheng
bchmk.point_sliding_window(taiex,1000,train=0.8, models=[], #song.ConventionalFTS, chen.ConventionalFTS], #[yu.WeightedFTS, cheng.TrendWeightedFTS], # #
bchmk.point_sliding_window(sonda, 9000, train=0.8, inc=0.4,#models=[yu.WeightedFTS], # #
partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ],
partitions= [10], #np.arange(10,200,step=10), #transformation=diff,
dump=True, save=True, file="experiments/XXXtaiex_point_analytic.csv") #,
# nodes=['192.168.0.102', '192.168.0.109', '192.168.0.106']) #, depends=[hofts, ifts])
partitions= np.arange(10,200,step=10), #transformation=diff,
dump=True, save=True, file="experiments/sondaws_point_analytic.csv",
nodes=['192.168.0.103', '192.168.0.106', '192.168.0.108', '192.168.0.109']) #, depends=[hofts, ifts])
"""
diff = Transformations.Differential(1)
bchmk.point_sliding_window(taiex,2000,train=0.8, #models=[yu.WeightedFTS], # #
bchmk.point_sliding_window(sonda, 9000, train=0.8, inc=0.4, #models=[yu.WeightedFTS], # #
partitioners=[Grid.GridPartitioner], #Entropy.EntropyPartitioner], # FCM.FCMPartitioner, ],
partitions= np.arange(10,200,step=10), transformation=diff,
dump=True, save=True, file="experiments/taiex_point_analytic_diff.csv",
nodes=['192.168.0.102', '192.168.0.109', '192.168.0.106']) #, depends=[hofts, ifts])
"""
partitions= np.arange(3,20,step=2), #transformation=diff,
dump=True, save=True, file="experiments/sondaws_point_analytic_diff.csv",
nodes=['192.168.0.103', '192.168.0.106', '192.168.0.108', '192.168.0.109']) #, depends=[hofts, ifts])
#"""
#bchmk.testa(taiex,[10,20],partitioners=[Grid.GridPartitioner], nodes=['192.168.0.109', '192.168.0.101'])
#parallel_util.explore_partitioners(taiex,20)