From 3ec95232bdb60f2799097f9b09fb4002960effa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido=20de=20Lima=20e=20Silva?= Date: Thu, 6 Apr 2017 19:23:29 -0300 Subject: [PATCH] - Distributed sliding window benchmarks with dispy --- benchmarks/Measures.py | 11 +- benchmarks/benchmarks.py | 15 +-- benchmarks/distributed_benchmarks.py | 146 ++++++++++++++++++++------- 3 files changed, 124 insertions(+), 48 deletions(-) diff --git a/benchmarks/Measures.py b/benchmarks/Measures.py index 526915f..a13ba64 100644 --- a/benchmarks/Measures.py +++ b/benchmarks/Measures.py @@ -176,4 +176,13 @@ def get_point_statistics(data, model, indexer=None): except: ret.append(np.nan) - return ret \ No newline at end of file + return ret + + +def get_interval_statistics(original, model): + ret = list() + forecasts = model.forecastInterval(original) + ret.append(round(sharpness(forecasts), 2)) + ret.append(round(resolution(forecasts), 2)) + ret.append(round(coverage(original[model.order:], forecasts[:-1]), 2)) + return ret diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 0d71d09..2fb3419 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -408,7 +408,7 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners _tdiff = _end - _start _start = time.time() - _sharp, _res, _cov = get_interval_statistics(test, mfts) + _sharp, _res, _cov = Measures.get_interval_statistics(test, mfts) _end = time.time() _tdiff += _end - _start sharpness[_key].append(_sharp) @@ -443,7 +443,7 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners _tdiff = _end - _start _start = time.time() - _sharp, _res, _cov = get_interval_statistics(test, mfts) + _sharp, _res, _cov = Measures.get_interval_statistics(test, mfts) _end = time.time() _tdiff += _end - _start sharpness[_key].append(_sharp) @@ -488,19 +488,10 @@ def all_interval_forecasters(data_train, data_test, partitions, max_order=3,save plot_compared_series(data_test, objs, lcolors, typeonlegend=False, save=save, file=file, tam=tam, intervals=True) -def get_interval_statistics(original, model): - ret = list() - forecasts = model.forecastInterval(original) - ret.append(round(Measures.sharpness(forecasts), 2)) - ret.append(round(Measures.resolution(forecasts), 2)) - ret.append(round(Measures.coverage(original[model.order:], forecasts[:-1]), 2)) - return ret - - def print_interval_statistics(original, models): ret = "Model & Order & Sharpness & Resolution & Coverage \\\\ \n" for fts in models: - _sharp, _res, _cov = get_interval_statistics(original, fts) + _sharp, _res, _cov = Measures.get_interval_statistics(original, fts) ret += fts.shortname + " & " ret += str(fts.order) + " & " ret += str(_sharp) + " & " diff --git a/benchmarks/distributed_benchmarks.py b/benchmarks/distributed_benchmarks.py index 6859d5e..58e26b5 100644 --- a/benchmarks/distributed_benchmarks.py +++ b/benchmarks/distributed_benchmarks.py @@ -33,7 +33,6 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo if transformation is not None: mfts.appendTransformation(transformation) -# try: _start = time.time() mfts.train(train_data, partitioner.sets, order=mfts.order) _end = time.time() @@ -43,17 +42,9 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo _rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer) _end = time.time() times += _end - _start - # except Exception as e: - # print(e) - # _rmse = np.nan - # _smape = np.nan - # _u = np.nan - # times = np.nan ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times, 'window': window_key} - # print(ret) - return ret @@ -61,22 +52,8 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, save=False, file=None, sintetic=False,nodes=None, depends=None): -# dependencies = [fts, Membership, benchmarks] -# if depends is not None: dependencies.extend(depends) - -# if models is not None: -# dependencies.extend(models) -# else: -# dependencies.extend(benchmarks.get_point_methods()) - -# dependencies.extend(partitioners) -# if transformation is not None: dependencies.extend(transformation) -# if indexer is not None: dependencies.extend(indexer) - cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) - # import dispy's httpd module, create http server for this cluster - http_server = dispy.httpd.DispyHTTPServer(cluster) _process_start = time.time() @@ -157,23 +134,122 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners= return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) -def compute(data): - import socket - return (socket.gethostname(), data) +def run_interval(mfts, partitioner, train_data, test_data, transformation=None, indexer=None): + import time + from pyFTS import hofts,ifts,pwfts + from pyFTS.partitioners import Grid, Entropy, FCM + from pyFTS.benchmarks import Measures + + tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS] + + tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner] + + tmp3 = [Measures.get_interval_statistics] + + pttr = str(partitioner.__module__).split('.')[-1] + _key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions) + mfts.partitioner = partitioner + if transformation is not None: + mfts.appendTransformation(transformation) + + _start = time.time() + mfts.train(train_data, partitioner.sets, order=mfts.order) + _end = time.time() + times = _end - _start + + _start = time.time() + _sharp, _res, _cov = Measures.get_interval_statistics(test_data, mfts) + _end = time.time() + times += _end - _start + + ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times} + + return ret -def teste(data,nodes): - cluster = dispy.JobCluster(compute, nodes=nodes) +def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner], + partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, + save=False, file=None, sintetic=False,nodes=None, depends=None): + + cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) + + http_server = dispy.httpd.DispyHTTPServer(cluster) + + _process_start = time.time() + + print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now())) + + pool = [] jobs = [] - for ct, train, test in Util.sliding_window(data, 2000, 0.8): - job = cluster.submit(ct) - jobs.append(job) + objs = {} + sharpness = {} + resolution = {} + coverage = {} + times = {} + + if models is None: + models = benchmarks.get_interval_methods() + + for model in models: + mfts = model("") + + if mfts.isHighOrder: + for order in np.arange(1, max_order + 1): + if order >= mfts.minOrder: + mfts = model("") + mfts.order = order + pool.append(mfts) + else: + pool.append(mfts) + + experiments = 0 + for ct, train, test in Util.sliding_window(data, windowsize, train): + experiments += 1 + + if dump: print('\nWindow: {0}\n'.format(ct)) + + for partition in partitions: + + for partitioner in partitioners: + + data_train_fs = partitioner(train, partition, transformation=transformation) + + for id, m in enumerate(pool,start=0): + job = cluster.submit(m, data_train_fs, train, test, ct, transformation) + job.id = id # associate an ID to identify jobs (if needed later) + jobs.append(job) for job in jobs: - x = job() - print(x) + tmp = job() + if job.status == dispy.DispyJob.Finished and tmp is not None: + if tmp['key'] not in objs: + objs[tmp['key']] = tmp['obj'] + sharpness[tmp['key']] = [] + resolution[tmp['key']] = [] + coverage[tmp['key']] = [] + times[tmp['key']] = [] - cluster.wait() + sharpness[tmp['key']].append(tmp['sharpness']) + resolution[tmp['key']].append(tmp['resolution']) + coverage[tmp['key']].append(tmp['coverage']) + times[tmp['key']].append(tmp['time']) + print(tmp['key']) + else: + print(job.exception) + print(job.stdout) + + _process_end = time.time() + + print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now())) + + print("Process Duration: {0}".format(_process_end - _process_start)) + + cluster.wait() # wait for all jobs to finish + + cluster.print_status() + + http_server.shutdown() # this waits until browser gets all updates cluster.close() - + return benchmarks.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic, + times) \ No newline at end of file