Improvements on forecast_ahead benchmarks

This commit is contained in:
Petrônio Cândido 2019-06-10 13:33:53 -03:00
parent 471e096208
commit 48fcf8daca
7 changed files with 148 additions and 88 deletions

View File

@ -72,6 +72,9 @@ class ARIMA(fts.FTS):
def forecast(self, ndata, **kwargs):
raise NotImplementedError()
def forecast_ahead(self, data, steps, **kwargs):
return self.model.predict(steps, intervals=False).values.flatten().tolist()
def forecast_interval(self, data, **kwargs):
raise NotImplementedError()
@ -92,7 +95,16 @@ class ARIMA(fts.FTS):
return ret
def forecast_distribution(self, data, **kwargs):
raise NotImplementedError()
sim_vector = self.inference(steps)
ret = []
for ct, sample in enumerate(sim_vector):
pd = ProbabilityDistribution.ProbabilityDistribution(type='histogram', data=sample, nbins=500)
ret.append(pd)
return ret
def forecast_ahead_distribution(self, data, steps, **kwargs):

View File

@ -105,12 +105,20 @@ def UStatistic(targets, forecasts):
:param forecasts:
:return:
"""
l = len(targets)
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
if not isinstance(forecasts, (list, np.ndarray)):
forecasts = np.array([forecasts])
else:
forecasts = np.array(forecasts)
if not isinstance(targets, (list, np.ndarray)):
targets = np.array([targets])
else:
targets = np.array(targets)
l = forecasts.size
l = 2 if l == 1 else l
naive = []
y = []
for k in np.arange(0, l - 1):
@ -359,6 +367,38 @@ def get_point_statistics(data, model, **kwargs):
return ret
def get_point_ahead_statistics(data, forecasts, **kwargs):
"""
Condensate all measures for point forecasters
:param data: test data
:param model: FTS model with point forecasting capability
:param kwargs:
:return: a list with the RMSE, SMAPE and U Statistic
"""
l = len(forecasts)
if len(data) != l:
raise Exception("Data and intervals have different lenghts!")
lags = {}
for lag in range(l):
ret = {}
datum = data[lag]
forecast = forecasts[lag]
ret['steps'] = lag
ret['method'] = ''
ret['rmse'] = rmse(datum, forecast)
ret['mape'] = mape(datum, forecast)
sample = data[lag-1:lag+1] if lag > 0 else [datum, datum]
ret['u'] = UStatistic(sample, forecast)
lags[lag] = ret
return lags
def get_interval_statistics(data, model, **kwargs):
"""
Condensate all measures for point interval forecasters
@ -411,7 +451,7 @@ def get_interval_ahead_statistics(data, intervals, **kwargs):
Condensate all measures for point interval forecasters
:param data: test data
:param model: FTS model with interval forecasting capability
:param intervals: predicted intervals for each datapoint
:param kwargs:
:return: a list with the sharpness, resolution, coverage, .05 pinball mean,
.25 pinball mean, .75 pinball mean and .95 pinball mean.

View File

@ -102,17 +102,10 @@ def sliding_window_benchmarks2(data, windowsize, train=0.8, **kwargs):
steps_ahead = [k for k in steps_ahead]
fts_methods = __pop('methods', None, kwargs)
fts_methods = __pop('methods', [], kwargs)
methods_parameters = __pop('methods_parameters', None, kwargs)
if fts_methods is None:
if type == 'point':
fts_methods = get_point_methods()
elif type == 'interval':
fts_methods = get_interval_methods()
elif type == 'distribution':
fts_methods = get_probabilistic_methods()
if fts_methods is not None:
methods_parameters = __pop('methods_parameters', None, kwargs)
ix_methods = [k for k in np.arange(len(fts_methods))]
@ -162,7 +155,8 @@ def sliding_window_benchmarks2(data, windowsize, train=0.8, **kwargs):
else:
job = cluster.submit(method, None, None, None, None, train, test, ct, **kwargs)
jobs.append(job)
else:
if fts_methods is not None:
params = [ix_methods, orders, partitioners_methods, partitions, transformations]
for id, instance in enumerate(product(*params)):
fts_method = fts_methods[instance[0]]
@ -670,17 +664,36 @@ def run_point2(fts_method, order, partitioner_method, partitions, transformation
_end = time.time()
times = _end - _start
if steps_ahead == 1:
_start = time.time()
_rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, **kwargs)
_end = time.time()
times += _end - _start
_start = time.time()
_rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, **kwargs)
_end = time.time()
times += _end - _start
ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions,
'transformation': '' if transformation is None else transformation.name,
'size': len(mfts), 'time': times,
'rmse': _rmse, 'smape': _smape, 'u': _u, 'window': window_key,
'steps': steps_ahead, 'method': method}
ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions,
'transformation': '' if transformation is None else transformation.name,
'size': len(mfts), 'time': times,
'rmse': _rmse, 'smape': _smape, 'u': _u, 'window': window_key,
'steps': steps_ahead, 'method': method}
else:
_start = time.time()
forecasts = mfts.predict(test_data, **kwargs)
_end = time.time()
times += _end - _start
eval = Measures.get_point_ahead_statistics(test_data[mfts.order:mfts.order+steps_ahead], forecasts)
for key in eval.keys():
eval[key]["time"] = times
eval[key]["method"] = method
ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions,
'transformation': '' if transformation is None else transformation.name,
'size': len(mfts), 'time': times,
'window': window_key, 'steps': steps_ahead, 'method': method,
'ahead_results': eval
}
return ret
@ -812,20 +825,14 @@ def run_probabilistic2(fts_method, order, partitioner_method, partitions, transf
def common_process_point_jobs(conn, data, job):
data.append(job['steps'])
data.append(job['method'])
rmse = deepcopy(data)
rmse.extend(["rmse", job["rmse"]])
bUtil.insert_benchmark(rmse, conn)
smape = deepcopy(data)
smape.extend(["smape", job["smape"]])
bUtil.insert_benchmark(smape, conn)
u = deepcopy(data)
u.extend(["u", job["u"]])
bUtil.insert_benchmark(u, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
dta = deepcopy(data)
dta.append(job['steps'])
dta.append(job['method'])
for key in ["rmse", "mape", "u", "time"]:
if key in job:
data2 = deepcopy(dta)
data2.extend([key, job[key]])
bUtil.insert_benchmark(data2, conn)
def process_point_jobs(dataset, tag, job, conn):

View File

@ -26,6 +26,7 @@ class KNearestNeighbors(fts.FTS):
self.benchmark_only = True
self.min_order = 1
self.alpha = kwargs.get("alpha", 0.05)
self.max_lag = self.order
self.lag = None
self.k = kwargs.get("k", 30)
self.uod = None
@ -70,8 +71,9 @@ class KNearestNeighbors(fts.FTS):
return [self.values[k] for k in ix.flatten() ]
def forecast(self, data, **kwargs):
l = len(data)
ret = []
for k in np.arange(self.order, len(data)):
for k in np.arange(self.order, l+(1 if self.order == l else 0)):
sample = data[k-self.order : k]
@ -81,17 +83,6 @@ class KNearestNeighbors(fts.FTS):
return ret
def forecast_ahead(self, data, steps, **kwargs):
start = kwargs.get('start', self.order)
sample = [k for k in data[start - self.order: start]]
for k in np.arange(self.order, steps + self.order):
tmp = self.forecast(sample[k-self.order:k])
sample.append(tmp)
return sample[-steps]
def forecast_interval(self, data, **kwargs):
alpha = kwargs.get('alpha',self.alpha)

View File

@ -241,7 +241,7 @@ class FTS(object):
start = kwargs.get('start_at',0)
ret = []
for k in np.arange(start+self.max_lag, steps):
for k in np.arange(start+self.max_lag, steps+start+self.max_lag):
tmp = self.forecast(data[k-self.max_lag:k], **kwargs)
if isinstance(tmp,(list, np.ndarray)):

View File

@ -422,9 +422,9 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
l = len(data)
start = kwargs.get('start', self.max_lag)
start = kwargs.get('start_at', 0)
ret = data[start - self.max_lag: start].tolist()
ret = data[start: start+self.max_lag].tolist()
for k in np.arange(self.max_lag, steps+self.max_lag):
@ -434,7 +434,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
mp = self.forecast(ret[k - self.max_lag: k], **kwargs)
ret.append(mp[0])
return ret[self.max_lag:]
return ret[-steps:]
def __check_interval_bounds(self, interval):
if len(self.transformations) > 0:
@ -446,11 +446,9 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
def forecast_ahead_interval(self, data, steps, **kwargs):
l = len(data)
start = kwargs.get('start_at', 0)
start = kwargs.get('start', self.max_lag)
sample = data[start - self.max_lag: start]
sample = data[start: start + self.max_lag]
ret = [[k, k] for k in sample]
@ -466,7 +464,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
ret.append([np.min(lower), np.max(upper)])
return ret[self.order:]
return ret[-steps:]
def forecast_ahead_distribution(self, ndata, steps, **kwargs):
@ -483,9 +481,9 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
nbins = kwargs.get("num_bins", 100)
_bins = np.linspace(uod[0], uod[1], nbins)
start = kwargs.get('start', self.max_lag)
start = kwargs.get('start_at', 0)
sample = ndata[start - self.max_lag: start]
sample = ndata[start: start + self.max_lag]
for dat in sample:
if 'type' in kwargs:
@ -527,7 +525,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
ret.append(dist)
return ret[self.order:]
return ret[-steps:]
def __str__(self):
tmp = self.name + ":\n"

View File

@ -52,34 +52,46 @@ datasets['TAIEX'] = TAIEX.get_data()[:5000]
datasets['NASDAQ'] = NASDAQ.get_data()[:5000]
datasets['SP500'] = SP500.get_data()[10000:15000]
methods = [
arima.ARIMA,arima.ARIMA,
quantreg.QuantileRegression,
BSTS.ARIMA,BSTS.ARIMA,
knn.KNearestNeighbors
]
competitor_methods = []
competitor_methods.extend([arima.ARIMA]*3)
competitor_methods.extend([quantreg.QuantileRegression]*2)
competitor_methods.extend([BSTS.ARIMA]*3)
competitor_methods.extend([knn.KNearestNeighbors]*2)
methods_parameters = [
{'order':(1,0,0), 'alpha':.05},
{'order':(1,0,1), 'alpha':.05},
{'order':1, 'dist': True},
{'order': (1, 0, 0), 'alpha': .05},
{'order': (1, 0, 1), 'alpha': .05},
{'order': 1}
competitor_methods_parameters = [
{'order': (1, 0, 0)},
{'order': (1, 0, 1)},
{'order': (2, 0, 0)},
{'order': 1, 'alpha': .5},
{'order': 2, 'alpha': .5},
{'order': (1, 0, 0)},
{'order': (1, 0, 1)},
{'order': (2, 0, 0)},
{'order': 1},
{'order': 2}
]
proposed_methods = [
hofts.HighOrderFTS, hofts.WeightedHighOrderFTS, pwfts.ProbabilisticWeightedFTS
]
proposed_methods_parameters=[
{},{},{}
]
for dataset_name, dataset in datasets.items():
bchmk.sliding_window_benchmarks2(dataset, 1000, train=0.8, inc=0.2,
benchmark_models=True,
benchmark_methods=methods,
benchmark_methods_parameters=methods_parameters,
methods=[],
methods_parameters=[{},{}],
transformations=[None],
orders=[],
steps_ahead=[10],
partitions=[],
type='distribution',
distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name, tag="experiments")
benchmark_methods=competitor_methods,
benchmark_methods_parameters=competitor_methods_parameters,
methods=proposed_methods,
methods_parameters=proposed_methods_parameters,
orders=[1],
partitions=[35],
steps_ahead=[10],
progress=False, type='point',
distributed=False, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="tmp.db", dataset=dataset_name,
tag="experiments")
#'''