diff --git a/pyFTS/benchmarks/Measures.py b/pyFTS/benchmarks/Measures.py index 3340ef1..fdf91a4 100644 --- a/pyFTS/benchmarks/Measures.py +++ b/pyFTS/benchmarks/Measures.py @@ -446,6 +446,8 @@ def get_interval_ahead_statistics(data, intervals, **kwargs): ret = {} datum = data[lag] interval = intervals[lag] + ret['steps'] = lag + ret['method'] = '' ret['sharpness'] = round(interval[1] - interval[0], 2) ret['coverage'] = 1 if interval[0] <= datum <= interval[1] else 0 ret['pinball05'] = round(pinball(0.05, datum, interval[0]), 2) @@ -518,6 +520,8 @@ def get_distribution_ahead_statistics(data, distributions): ret = {} datum = data[lag] dist = distributions[lag] + ret['steps'] = lag + ret['method'] = '' ret['crps'] = round(crps(datum, dist), 3) ret['brier'] = round(brier_score(datum, dist), 3) ret['log'] = round(logarithm_score(datum, dist), 3) diff --git a/pyFTS/benchmarks/Util.py b/pyFTS/benchmarks/Util.py index c51630b..6aa73b9 100644 --- a/pyFTS/benchmarks/Util.py +++ b/pyFTS/benchmarks/Util.py @@ -98,12 +98,12 @@ def process_common_data(dataset, tag, type, job): data = [dataset, tag, type, model.shortname, str(model.transformations[0]) if len(model.transformations) > 0 else None, model.order, None, None, - None, job['steps'], job['method']] + None] else: data = [dataset, tag, type, model.shortname, str(model.partitioner.transformation) if model.partitioner.transformation is not None else None, model.order, model.partitioner.name, str(model.partitioner.partitions), - len(model), job['steps'], job['method']] + len(model)] return data @@ -124,9 +124,7 @@ def process_common_data2(dataset, tag, type, job): job['order'], job['partitioner'], job['partitions'], - job['size'], - job['steps'], - job['method'] + job['size'] ] return data diff --git a/pyFTS/benchmarks/benchmarks.py b/pyFTS/benchmarks/benchmarks.py index f6dfeee..4938724 100644 --- a/pyFTS/benchmarks/benchmarks.py +++ b/pyFTS/benchmarks/benchmarks.py @@ -149,26 +149,24 @@ def sliding_window_benchmarks2(data, windowsize, train=0.8, **kwargs): if benchmark_models: for bm, method in enumerate(benchmark_methods): - for step in steps_ahead: + kwargs['steps_ahead'] = max(steps_ahead) + kwargs['parameters'] = benchmark_methods_parameters[bm] - kwargs['steps_ahead'] = step - kwargs['parameters'] = benchmark_methods_parameters[bm] - - if not distributed: - try: - job = experiment_method(method, None, None, None, train, test, ct, **kwargs) - synthesis_method(dataset, tag, job, conn) - except Exception as ex: - print('EXCEPTION! ', method, benchmark_methods_parameters[bm]) - traceback.print_exc() - else: - job = cluster.submit(method, None, None, None, train, test, ct, **kwargs) - jobs.append(job) + if not distributed: + try: + job = experiment_method(method, None, None, None, None, train, test, ct, **kwargs) + synthesis_method(dataset, tag, job, conn) + except Exception as ex: + print('EXCEPTION! ', method, benchmark_methods_parameters[bm]) + traceback.print_exc() + else: + job = cluster.submit(method, None, None, None, None, train, test, ct, **kwargs) + jobs.append(job) else: - params = [ix_methods, orders, partitioners_methods, partitions, transformations, steps_ahead] + params = [ix_methods, orders, partitioners_methods, partitions, transformations] for id, instance in enumerate(product(*params)): fts_method = fts_methods[instance[0]] - kwargs['steps_ahead'] = instance[5] + kwargs['steps_ahead'] = max(steps_ahead) if methods_parameters is not None: kwargs['parameters'] = methods_parameters[instance[0]] if not distributed: @@ -615,7 +613,7 @@ def run_probabilistic(mfts, partitioner, train_data, test_data, window_key=None, _crps1, _t1, _brier = Measures.get_distribution_statistics(test_data, mfts, **kwargs) _t1 += times - ret = {'key': _key, 'obj': mfts, 'CRPS': _crps1, 'time': _t1, 'brier': _brier, 'window': window_key, + ret = {'key': _key, 'obj': mfts, 'crps': _crps1, 'time': _t1, 'brier': _brier, 'window': window_key, 'steps': steps_ahead, 'method': method} return ret @@ -712,19 +710,39 @@ def run_interval2(fts_method, order, partitioner_method, partitions, transformat _end = time.time() times = _end - _start - _start = time.time() - #_sharp, _res, _cov, _q05, _q25, _q75, _q95, _w05, _w25 - metrics = Measures.get_interval_statistics(test_data, mfts, **kwargs) - _end = time.time() - times += _end - _start + if steps_ahead == 1: + + _start = time.time() + metrics = Measures.get_interval_statistics(test_data, mfts, **kwargs) + _end = time.time() + times += _end - _start + + ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions, + 'transformation': '' if transformation is None else transformation.name, + 'size': len(mfts), 'time': times, + 'sharpness': metrics[0], 'resolution': metrics[1], 'coverage': metrics[2], + 'time': times,'pinball05': metrics[3], 'pinball25': metrics[4], 'pinball75': metrics[5], 'pinball95': metrics[6], + 'winkler05': metrics[7], 'winkler25': metrics[8], + 'window': window_key,'steps': steps_ahead, 'method': method} + else: + _start = time.time() + intervals = mfts.predict(test_data, **kwargs) + _end = time.time() + times += _end - _start + + eval = Measures.get_interval_ahead_statistics(test_data[mfts.order:mfts.order+steps_ahead], intervals) + + for key in eval.keys(): + eval[key]["time"] = times + eval[key]["method"] = method + + ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions, + 'transformation': '' if transformation is None else transformation.name, + 'size': len(mfts), 'time': times, + 'window': window_key, 'steps': steps_ahead, 'method': method, + 'ahead_results': eval + } - ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions, - 'transformation': '' if transformation is None else transformation.name, - 'size': len(mfts), 'time': times, - 'sharpness': metrics[0], 'resolution': metrics[1], 'coverage': metrics[2], - 'time': times,'Q05': metrics[3], 'Q25': metrics[4], 'Q75': metrics[5], 'Q95': metrics[6], - 'winkler05': metrics[7], 'winkler25': metrics[8], - 'window': window_key,'steps': steps_ahead, 'method': method} return ret @@ -761,18 +779,55 @@ def run_probabilistic2(fts_method, order, partitioner_method, partitions, transf _end = time.time() times = _end - _start - _crps1, _t1, _brier = Measures.get_distribution_statistics(test_data, mfts, **kwargs) - _t1 += times + if steps_ahead == 1: - ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions, - 'transformation': '' if transformation is None else transformation.name, - 'size': len(mfts), 'time': times, - 'CRPS': _crps1, 'brier': _brier, 'window': window_key, - 'steps': steps_ahead, 'method': method} + _crps1, _t1, _brier = Measures.get_distribution_statistics(test_data, mfts, **kwargs) + times += _t1 + + ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions, + 'transformation': '' if transformation is None else transformation.name, + 'size': len(mfts), 'time': times, + 'crps': _crps1, 'brier': _brier, 'window': window_key, + 'steps': steps_ahead, 'method': method} + else: + _start = time.time() + distributions = mfts.predict(test_data, **kwargs) + _end = time.time() + times += _end - _start + + eval = Measures.get_distribution_ahead_statistics(test_data[mfts.order:mfts.order+steps_ahead], distributions) + + for key in eval.keys(): + eval[key]["time"] = times + eval[key]["method"] = method + + ret = {'model': mfts.shortname, 'partitioner': pttr, 'order': order, 'partitions': partitions, + 'transformation': '' if transformation is None else transformation.name, + 'size': len(mfts), 'time': times, + 'window': window_key, 'steps': steps_ahead, 'method': method, + 'ahead_results': eval + } return ret +def common_process_point_jobs(conn, data, job): + data.append(job['steps']) + data.append(job['method']) + rmse = deepcopy(data) + rmse.extend(["rmse", job["rmse"]]) + bUtil.insert_benchmark(rmse, conn) + smape = deepcopy(data) + smape.extend(["smape", job["smape"]]) + bUtil.insert_benchmark(smape, conn) + u = deepcopy(data) + u.extend(["u", job["u"]]) + bUtil.insert_benchmark(u, conn) + time = deepcopy(data) + time.extend(["time", job["time"]]) + bUtil.insert_benchmark(time, conn) + + def process_point_jobs(dataset, tag, job, conn): """ Extract information from a dictionary with point benchmark results and save it on a database @@ -786,18 +841,8 @@ def process_point_jobs(dataset, tag, job, conn): data = bUtil.process_common_data(dataset, tag, 'point',job) - rmse = deepcopy(data) - rmse.extend(["rmse", job["rmse"]]) - bUtil.insert_benchmark(rmse, conn) - smape = deepcopy(data) - smape.extend(["smape", job["smape"]]) - bUtil.insert_benchmark(smape, conn) - u = deepcopy(data) - u.extend(["u", job["u"]]) - bUtil.insert_benchmark(u, conn) - time = deepcopy(data) - time.extend(["time", job["time"]]) - bUtil.insert_benchmark(time, conn) + common_process_point_jobs(conn, data, job) + def process_point_jobs2(dataset, tag, job, conn): """ @@ -812,18 +857,24 @@ def process_point_jobs2(dataset, tag, job, conn): data = bUtil.process_common_data2(dataset, tag, 'point',job) - rmse = deepcopy(data) - rmse.extend(["rmse", job["rmse"]]) - bUtil.insert_benchmark(rmse, conn) - smape = deepcopy(data) - smape.extend(["smape", job["smape"]]) - bUtil.insert_benchmark(smape, conn) - u = deepcopy(data) - u.extend(["u", job["u"]]) - bUtil.insert_benchmark(u, conn) - time = deepcopy(data) - time.extend(["time", job["time"]]) - bUtil.insert_benchmark(time, conn) + if job['steps'] == 1: + common_process_point_jobs(conn, data, job) + else: + for k in range(job['steps']): + j2 = job['ahead_results'][k] + common_process_point_jobs(conn, data, j2) + + +def common_process_interval_jobs(conn, data, job): + dta = deepcopy(data) + dta.append(job['steps']) + dta.append(job['method']) + for key in ["sharpness","resolution","coverage","time","pinball05", + "pinball25","pinball75","pinball95", "winkler05", "winkler25"]: + if key in job: + data2 = deepcopy(dta) + data2.extend([key, job[key]]) + bUtil.insert_benchmark(data2, conn) def process_interval_jobs(dataset, tag, job, conn): @@ -839,72 +890,30 @@ def process_interval_jobs(dataset, tag, job, conn): data = bUtil.process_common_data(dataset, tag, 'interval', job) - sharpness = deepcopy(data) - sharpness.extend(["sharpness", job["sharpness"]]) - bUtil.insert_benchmark(sharpness, conn) - resolution = deepcopy(data) - resolution.extend(["resolution", job["resolution"]]) - bUtil.insert_benchmark(resolution, conn) - coverage = deepcopy(data) - coverage.extend(["coverage", job["coverage"]]) - bUtil.insert_benchmark(coverage, conn) - time = deepcopy(data) - time.extend(["time", job["time"]]) - bUtil.insert_benchmark(time, conn) - Q05 = deepcopy(data) - Q05.extend(["Q05", job["Q05"]]) - bUtil.insert_benchmark(Q05, conn) - Q25 = deepcopy(data) - Q25.extend(["Q25", job["Q25"]]) - bUtil.insert_benchmark(Q25, conn) - Q75 = deepcopy(data) - Q75.extend(["Q75", job["Q75"]]) - bUtil.insert_benchmark(Q75, conn) - Q95 = deepcopy(data) - Q95.extend(["Q95", job["Q95"]]) - bUtil.insert_benchmark(Q95, conn) - W05 = deepcopy(data) - W05.extend(["winkler05", job["winkler05"]]) - bUtil.insert_benchmark(W05, conn) - W25 = deepcopy(data) - W25.extend(["winkler25", job["winkler25"]]) - bUtil.insert_benchmark(W25, conn) + common_process_interval_jobs(conn, data, job) def process_interval_jobs2(dataset, tag, job, conn): data = bUtil.process_common_data2(dataset, tag, 'interval', job) - sharpness = deepcopy(data) - sharpness.extend(["sharpness", job["sharpness"]]) - bUtil.insert_benchmark(sharpness, conn) - resolution = deepcopy(data) - resolution.extend(["resolution", job["resolution"]]) - bUtil.insert_benchmark(resolution, conn) - coverage = deepcopy(data) - coverage.extend(["coverage", job["coverage"]]) - bUtil.insert_benchmark(coverage, conn) - time = deepcopy(data) - time.extend(["time", job["time"]]) - bUtil.insert_benchmark(time, conn) - Q05 = deepcopy(data) - Q05.extend(["Q05", job["Q05"]]) - bUtil.insert_benchmark(Q05, conn) - Q25 = deepcopy(data) - Q25.extend(["Q25", job["Q25"]]) - bUtil.insert_benchmark(Q25, conn) - Q75 = deepcopy(data) - Q75.extend(["Q75", job["Q75"]]) - bUtil.insert_benchmark(Q75, conn) - Q95 = deepcopy(data) - Q95.extend(["Q95", job["Q95"]]) - bUtil.insert_benchmark(Q95, conn) - W05 = deepcopy(data) - W05.extend(["winkler05", job["winkler05"]]) - bUtil.insert_benchmark(W05, conn) - W25 = deepcopy(data) - W25.extend(["winkler25", job["winkler25"]]) - bUtil.insert_benchmark(W25, conn) + if job['steps'] == 1: + common_process_interval_jobs(conn, data, job) + else: + for k in range(job['steps']): + j2 = job['ahead_results'][k] + common_process_interval_jobs(conn, data, j2) + + +def common_process_probabilistic_jobs(conn, data, job): + dta = deepcopy(data) + dta.append(job['steps']) + dta.append(job['method']) + for key in ["crps","time","brier"]: + if key in job: + data2 = deepcopy(dta) + data2.extend([key, job[key]]) + bUtil.insert_benchmark(data2, conn) def process_probabilistic_jobs(dataset, tag, job, conn): @@ -920,15 +929,7 @@ def process_probabilistic_jobs(dataset, tag, job, conn): data = bUtil.process_common_data(dataset, tag, 'density', job) - crps = deepcopy(data) - crps.extend(["crps",job["CRPS"]]) - bUtil.insert_benchmark(crps, conn) - time = deepcopy(data) - time.extend(["time", job["time"]]) - bUtil.insert_benchmark(time, conn) - brier = deepcopy(data) - brier.extend(["brier", job["brier"]]) - bUtil.insert_benchmark(brier, conn) + common_process_probabilistic_jobs(conn, data, job) def process_probabilistic_jobs2(dataset, tag, job, conn): @@ -942,17 +943,14 @@ def process_probabilistic_jobs2(dataset, tag, job, conn): :return: """ - data = bUtil.process_common_data2(dataset, tag, 'density', job) + data = bUtil.process_common_data2(dataset, tag, 'density', job) - crps = deepcopy(data) - crps.extend(["crps",job["CRPS"]]) - bUtil.insert_benchmark(crps, conn) - time = deepcopy(data) - time.extend(["time", job["time"]]) - bUtil.insert_benchmark(time, conn) - brier = deepcopy(data) - brier.extend(["brier", job["brier"]]) - bUtil.insert_benchmark(brier, conn) + if job['steps'] == 1: + common_process_probabilistic_jobs(conn,data,job) + else: + for k in range(job['steps']): + j2 = job['ahead_results'][k] + common_process_probabilistic_jobs(conn, data, j2) def print_point_statistics(data, models, externalmodels = None, externalforecasts = None, indexers=None): diff --git a/pyFTS/tests/general.py b/pyFTS/tests/general.py index a666f5f..2b0456a 100644 --- a/pyFTS/tests/general.py +++ b/pyFTS/tests/general.py @@ -19,6 +19,7 @@ from pyFTS.fcm import fts, common, GA from pyFTS.data import TAIEX, NASDAQ, SP500 +''' train = TAIEX.get_data()[:800] test = TAIEX.get_data()[800:1000] @@ -51,24 +52,27 @@ datasets['TAIEX'] = TAIEX.get_data()[:5000] datasets['NASDAQ'] = NASDAQ.get_data()[:5000] datasets['SP500'] = SP500.get_data()[10000:15000] -methods = [ensemble.SimpleEnsembleFTS]*4 +methods = [arima.ARIMA, quantreg.QuantileRegression, BSTS.ARIMA, knn.KNearestNeighbors] methods_parameters = [ - {'name': 'EnsembleFTS-HOFTS-10', 'fts_method': hofts.HighOrderFTS, 'partitions': np.arange(20,50,10)}, - {'name': 'EnsembleFTS-HOFTS-5', 'fts_method': hofts.HighOrderFTS, 'partitions': np.arange(20,50,5)}, - {'name': 'EnsembleFTS-WHOFTS-10', 'fts_method': hofts.WeightedHighOrderFTS, 'partitions': np.arange(20,50,10)}, - {'name': 'EnsembleFTS-WHOFTS-5', 'fts_method': hofts.WeightedHighOrderFTS, 'partitions': np.arange(20,50,5)} + {'order':(2,0,0)}, + {'order':2, 'dist': True}, + {'order':(2,0,0)}, + {'order':2 } ] for dataset_name, dataset in datasets.items(): bchmk.sliding_window_benchmarks2(dataset, 1000, train=0.8, inc=0.2, - methods=methods, - methods_parameters=methods_parameters, - benchmark_models=False, - transformations=[None], - orders=[3], - partitions=[None], - type='distribution', - distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'], - file="experiments.db", dataset=dataset_name, tag="gridsearch") -''' \ No newline at end of file + benchmark_models=True, + benchmark_methods=methods, + benchmark_methods_parameters=methods_parameters, + methods=[], + methods_parameters=[], + transformations=[None], + orders=[3], + steps_ahead=[10], + partitions=[None], + type='interval', + #distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'], + file="tmp.db", dataset=dataset_name, tag="experiments") +#''' \ No newline at end of file