Improvements on benchmarks.sliding_window_benchmarks2

This commit is contained in:
Petrônio Cândido 2019-06-01 17:05:29 -03:00
parent 1957c26105
commit e10aa1e872
4 changed files with 163 additions and 159 deletions

View File

@ -446,6 +446,8 @@ def get_interval_ahead_statistics(data, intervals, **kwargs):
ret = {}
datum = data[lag]
interval = intervals[lag]
ret['steps'] = lag
ret['method'] = ''
ret['sharpness'] = round(interval[1] - interval[0], 2)
ret['coverage'] = 1 if interval[0] <= datum <= interval[1] else 0
ret['pinball05'] = round(pinball(0.05, datum, interval[0]), 2)
@ -518,6 +520,8 @@ def get_distribution_ahead_statistics(data, distributions):
ret = {}
datum = data[lag]
dist = distributions[lag]
ret['steps'] = lag
ret['method'] = ''
ret['crps'] = round(crps(datum, dist), 3)
ret['brier'] = round(brier_score(datum, dist), 3)
ret['log'] = round(logarithm_score(datum, dist), 3)

View File

@ -98,12 +98,12 @@ def process_common_data(dataset, tag, type, job):
data = [dataset, tag, type, model.shortname,
str(model.transformations[0]) if len(model.transformations) > 0 else None,
model.order, None, None,
None, job['steps'], job['method']]
None]
else:
data = [dataset, tag, type, model.shortname,
str(model.partitioner.transformation) if model.partitioner.transformation is not None else None,
model.order, model.partitioner.name, str(model.partitioner.partitions),
len(model), job['steps'], job['method']]
len(model)]
return data
@ -124,9 +124,7 @@ def process_common_data2(dataset, tag, type, job):
job['order'],
job['partitioner'],
job['partitions'],
job['size'],
job['steps'],
job['method']
job['size']
]
return data

View File

@ -149,26 +149,24 @@ def sliding_window_benchmarks2(data, windowsize, train=0.8, **kwargs):
if benchmark_models:
for bm, method in enumerate(benchmark_methods):
for step in steps_ahead:
kwargs['steps_ahead'] = step
kwargs['steps_ahead'] = max(steps_ahead)
kwargs['parameters'] = benchmark_methods_parameters[bm]
if not distributed:
try:
job = experiment_method(method, None, None, None, train, test, ct, **kwargs)
job = experiment_method(method, None, None, None, None, train, test, ct, **kwargs)
synthesis_method(dataset, tag, job, conn)
except Exception as ex:
print('EXCEPTION! ', method, benchmark_methods_parameters[bm])
traceback.print_exc()
else:
job = cluster.submit(method, None, None, None, train, test, ct, **kwargs)
job = cluster.submit(method, None, None, None, None, train, test, ct, **kwargs)
jobs.append(job)
else:
params = [ix_methods, orders, partitioners_methods, partitions, transformations, steps_ahead]
params = [ix_methods, orders, partitioners_methods, partitions, transformations]
for id, instance in enumerate(product(*params)):
fts_method = fts_methods[instance[0]]
kwargs['steps_ahead'] = instance[5]
kwargs['steps_ahead'] = max(steps_ahead)
if methods_parameters is not None:
kwargs['parameters'] = methods_parameters[instance[0]]
if not distributed:
@ -615,7 +613,7 @@ def run_probabilistic(mfts, partitioner, train_data, test_data, window_key=None,
_crps1, _t1, _brier = Measures.get_distribution_statistics(test_data, mfts, **kwargs)
_t1 += times
ret = {'key': _key, 'obj': mfts, 'CRPS': _crps1, 'time': _t1, 'brier': _brier, 'window': window_key,
ret = {'key': _key, 'obj': mfts, 'crps': _crps1, 'time': _t1, 'brier': _brier, 'window': window_key,
'steps': steps_ahead, 'method': method}
return ret
@ -712,8 +710,9 @@ def run_interval2(fts_method, order, partitioner_method, partitions, transformat
_end = time.time()
times = _end - _start
if steps_ahead == 1:
_start = time.time()
#_sharp, _res, _cov, _q05, _q25, _q75, _q95, _w05, _w25
metrics = Measures.get_interval_statistics(test_data, mfts, **kwargs)
_end = time.time()
times += _end - _start
@ -722,9 +721,28 @@ def run_interval2(fts_method, order, partitioner_method, partitions, transformat
'transformation': '' if transformation is None else transformation.name,
'size': len(mfts), 'time': times,
'sharpness': metrics[0], 'resolution': metrics[1], 'coverage': metrics[2],
'time': times,'Q05': metrics[3], 'Q25': metrics[4], 'Q75': metrics[5], 'Q95': metrics[6],
'time': times,'pinball05': metrics[3], 'pinball25': metrics[4], 'pinball75': metrics[5], 'pinball95': metrics[6],
'winkler05': metrics[7], 'winkler25': metrics[8],
'window': window_key,'steps': steps_ahead, 'method': method}
else:
_start = time.time()
intervals = mfts.predict(test_data, **kwargs)
_end = time.time()
times += _end - _start
eval = Measures.get_interval_ahead_statistics(test_data[mfts.order:mfts.order+steps_ahead], intervals)
for key in eval.keys():
eval[key]["time"] = times
eval[key]["method"] = method
ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions,
'transformation': '' if transformation is None else transformation.name,
'size': len(mfts), 'time': times,
'window': window_key, 'steps': steps_ahead, 'method': method,
'ahead_results': eval
}
return ret
@ -761,18 +779,55 @@ def run_probabilistic2(fts_method, order, partitioner_method, partitions, transf
_end = time.time()
times = _end - _start
if steps_ahead == 1:
_crps1, _t1, _brier = Measures.get_distribution_statistics(test_data, mfts, **kwargs)
_t1 += times
times += _t1
ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions,
'transformation': '' if transformation is None else transformation.name,
'size': len(mfts), 'time': times,
'CRPS': _crps1, 'brier': _brier, 'window': window_key,
'crps': _crps1, 'brier': _brier, 'window': window_key,
'steps': steps_ahead, 'method': method}
else:
_start = time.time()
distributions = mfts.predict(test_data, **kwargs)
_end = time.time()
times += _end - _start
eval = Measures.get_distribution_ahead_statistics(test_data[mfts.order:mfts.order+steps_ahead], distributions)
for key in eval.keys():
eval[key]["time"] = times
eval[key]["method"] = method
ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions,
'transformation': '' if transformation is None else transformation.name,
'size': len(mfts), 'time': times,
'window': window_key, 'steps': steps_ahead, 'method': method,
'ahead_results': eval
}
return ret
def common_process_point_jobs(conn, data, job):
data.append(job['steps'])
data.append(job['method'])
rmse = deepcopy(data)
rmse.extend(["rmse", job["rmse"]])
bUtil.insert_benchmark(rmse, conn)
smape = deepcopy(data)
smape.extend(["smape", job["smape"]])
bUtil.insert_benchmark(smape, conn)
u = deepcopy(data)
u.extend(["u", job["u"]])
bUtil.insert_benchmark(u, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
def process_point_jobs(dataset, tag, job, conn):
"""
Extract information from a dictionary with point benchmark results and save it on a database
@ -786,18 +841,8 @@ def process_point_jobs(dataset, tag, job, conn):
data = bUtil.process_common_data(dataset, tag, 'point',job)
rmse = deepcopy(data)
rmse.extend(["rmse", job["rmse"]])
bUtil.insert_benchmark(rmse, conn)
smape = deepcopy(data)
smape.extend(["smape", job["smape"]])
bUtil.insert_benchmark(smape, conn)
u = deepcopy(data)
u.extend(["u", job["u"]])
bUtil.insert_benchmark(u, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
common_process_point_jobs(conn, data, job)
def process_point_jobs2(dataset, tag, job, conn):
"""
@ -812,18 +857,24 @@ def process_point_jobs2(dataset, tag, job, conn):
data = bUtil.process_common_data2(dataset, tag, 'point',job)
rmse = deepcopy(data)
rmse.extend(["rmse", job["rmse"]])
bUtil.insert_benchmark(rmse, conn)
smape = deepcopy(data)
smape.extend(["smape", job["smape"]])
bUtil.insert_benchmark(smape, conn)
u = deepcopy(data)
u.extend(["u", job["u"]])
bUtil.insert_benchmark(u, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
if job['steps'] == 1:
common_process_point_jobs(conn, data, job)
else:
for k in range(job['steps']):
j2 = job['ahead_results'][k]
common_process_point_jobs(conn, data, j2)
def common_process_interval_jobs(conn, data, job):
dta = deepcopy(data)
dta.append(job['steps'])
dta.append(job['method'])
for key in ["sharpness","resolution","coverage","time","pinball05",
"pinball25","pinball75","pinball95", "winkler05", "winkler25"]:
if key in job:
data2 = deepcopy(dta)
data2.extend([key, job[key]])
bUtil.insert_benchmark(data2, conn)
def process_interval_jobs(dataset, tag, job, conn):
@ -839,72 +890,30 @@ def process_interval_jobs(dataset, tag, job, conn):
data = bUtil.process_common_data(dataset, tag, 'interval', job)
sharpness = deepcopy(data)
sharpness.extend(["sharpness", job["sharpness"]])
bUtil.insert_benchmark(sharpness, conn)
resolution = deepcopy(data)
resolution.extend(["resolution", job["resolution"]])
bUtil.insert_benchmark(resolution, conn)
coverage = deepcopy(data)
coverage.extend(["coverage", job["coverage"]])
bUtil.insert_benchmark(coverage, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
Q05 = deepcopy(data)
Q05.extend(["Q05", job["Q05"]])
bUtil.insert_benchmark(Q05, conn)
Q25 = deepcopy(data)
Q25.extend(["Q25", job["Q25"]])
bUtil.insert_benchmark(Q25, conn)
Q75 = deepcopy(data)
Q75.extend(["Q75", job["Q75"]])
bUtil.insert_benchmark(Q75, conn)
Q95 = deepcopy(data)
Q95.extend(["Q95", job["Q95"]])
bUtil.insert_benchmark(Q95, conn)
W05 = deepcopy(data)
W05.extend(["winkler05", job["winkler05"]])
bUtil.insert_benchmark(W05, conn)
W25 = deepcopy(data)
W25.extend(["winkler25", job["winkler25"]])
bUtil.insert_benchmark(W25, conn)
common_process_interval_jobs(conn, data, job)
def process_interval_jobs2(dataset, tag, job, conn):
data = bUtil.process_common_data2(dataset, tag, 'interval', job)
sharpness = deepcopy(data)
sharpness.extend(["sharpness", job["sharpness"]])
bUtil.insert_benchmark(sharpness, conn)
resolution = deepcopy(data)
resolution.extend(["resolution", job["resolution"]])
bUtil.insert_benchmark(resolution, conn)
coverage = deepcopy(data)
coverage.extend(["coverage", job["coverage"]])
bUtil.insert_benchmark(coverage, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
Q05 = deepcopy(data)
Q05.extend(["Q05", job["Q05"]])
bUtil.insert_benchmark(Q05, conn)
Q25 = deepcopy(data)
Q25.extend(["Q25", job["Q25"]])
bUtil.insert_benchmark(Q25, conn)
Q75 = deepcopy(data)
Q75.extend(["Q75", job["Q75"]])
bUtil.insert_benchmark(Q75, conn)
Q95 = deepcopy(data)
Q95.extend(["Q95", job["Q95"]])
bUtil.insert_benchmark(Q95, conn)
W05 = deepcopy(data)
W05.extend(["winkler05", job["winkler05"]])
bUtil.insert_benchmark(W05, conn)
W25 = deepcopy(data)
W25.extend(["winkler25", job["winkler25"]])
bUtil.insert_benchmark(W25, conn)
if job['steps'] == 1:
common_process_interval_jobs(conn, data, job)
else:
for k in range(job['steps']):
j2 = job['ahead_results'][k]
common_process_interval_jobs(conn, data, j2)
def common_process_probabilistic_jobs(conn, data, job):
dta = deepcopy(data)
dta.append(job['steps'])
dta.append(job['method'])
for key in ["crps","time","brier"]:
if key in job:
data2 = deepcopy(dta)
data2.extend([key, job[key]])
bUtil.insert_benchmark(data2, conn)
def process_probabilistic_jobs(dataset, tag, job, conn):
@ -920,15 +929,7 @@ def process_probabilistic_jobs(dataset, tag, job, conn):
data = bUtil.process_common_data(dataset, tag, 'density', job)
crps = deepcopy(data)
crps.extend(["crps",job["CRPS"]])
bUtil.insert_benchmark(crps, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
brier = deepcopy(data)
brier.extend(["brier", job["brier"]])
bUtil.insert_benchmark(brier, conn)
common_process_probabilistic_jobs(conn, data, job)
def process_probabilistic_jobs2(dataset, tag, job, conn):
@ -944,15 +945,12 @@ def process_probabilistic_jobs2(dataset, tag, job, conn):
data = bUtil.process_common_data2(dataset, tag, 'density', job)
crps = deepcopy(data)
crps.extend(["crps",job["CRPS"]])
bUtil.insert_benchmark(crps, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
brier = deepcopy(data)
brier.extend(["brier", job["brier"]])
bUtil.insert_benchmark(brier, conn)
if job['steps'] == 1:
common_process_probabilistic_jobs(conn,data,job)
else:
for k in range(job['steps']):
j2 = job['ahead_results'][k]
common_process_probabilistic_jobs(conn, data, j2)
def print_point_statistics(data, models, externalmodels = None, externalforecasts = None, indexers=None):

View File

@ -19,6 +19,7 @@ from pyFTS.fcm import fts, common, GA
from pyFTS.data import TAIEX, NASDAQ, SP500
'''
train = TAIEX.get_data()[:800]
test = TAIEX.get_data()[800:1000]
@ -51,24 +52,27 @@ datasets['TAIEX'] = TAIEX.get_data()[:5000]
datasets['NASDAQ'] = NASDAQ.get_data()[:5000]
datasets['SP500'] = SP500.get_data()[10000:15000]
methods = [ensemble.SimpleEnsembleFTS]*4
methods = [arima.ARIMA, quantreg.QuantileRegression, BSTS.ARIMA, knn.KNearestNeighbors]
methods_parameters = [
{'name': 'EnsembleFTS-HOFTS-10', 'fts_method': hofts.HighOrderFTS, 'partitions': np.arange(20,50,10)},
{'name': 'EnsembleFTS-HOFTS-5', 'fts_method': hofts.HighOrderFTS, 'partitions': np.arange(20,50,5)},
{'name': 'EnsembleFTS-WHOFTS-10', 'fts_method': hofts.WeightedHighOrderFTS, 'partitions': np.arange(20,50,10)},
{'name': 'EnsembleFTS-WHOFTS-5', 'fts_method': hofts.WeightedHighOrderFTS, 'partitions': np.arange(20,50,5)}
{'order':(2,0,0)},
{'order':2, 'dist': True},
{'order':(2,0,0)},
{'order':2 }
]
for dataset_name, dataset in datasets.items():
bchmk.sliding_window_benchmarks2(dataset, 1000, train=0.8, inc=0.2,
methods=methods,
methods_parameters=methods_parameters,
benchmark_models=False,
benchmark_models=True,
benchmark_methods=methods,
benchmark_methods_parameters=methods_parameters,
methods=[],
methods_parameters=[],
transformations=[None],
orders=[3],
steps_ahead=[10],
partitions=[None],
type='distribution',
distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name, tag="gridsearch")
'''
type='interval',
#distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="tmp.db", dataset=dataset_name, tag="experiments")
#'''