- Distributed sliding window benchmarks with dispy

This commit is contained in:
Petrônio Cândido de Lima e Silva 2017-04-06 19:23:29 -03:00
parent 97246f1510
commit 3ec95232bd
3 changed files with 124 additions and 48 deletions

View File

@ -177,3 +177,12 @@ def get_point_statistics(data, model, indexer=None):
ret.append(np.nan) ret.append(np.nan)
return ret return ret
def get_interval_statistics(original, model):
ret = list()
forecasts = model.forecastInterval(original)
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
ret.append(round(coverage(original[model.order:], forecasts[:-1]), 2))
return ret

View File

@ -408,7 +408,7 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners
_tdiff = _end - _start _tdiff = _end - _start
_start = time.time() _start = time.time()
_sharp, _res, _cov = get_interval_statistics(test, mfts) _sharp, _res, _cov = Measures.get_interval_statistics(test, mfts)
_end = time.time() _end = time.time()
_tdiff += _end - _start _tdiff += _end - _start
sharpness[_key].append(_sharp) sharpness[_key].append(_sharp)
@ -443,7 +443,7 @@ def interval_sliding_window(data, windowsize, train=0.8,models=None,partitioners
_tdiff = _end - _start _tdiff = _end - _start
_start = time.time() _start = time.time()
_sharp, _res, _cov = get_interval_statistics(test, mfts) _sharp, _res, _cov = Measures.get_interval_statistics(test, mfts)
_end = time.time() _end = time.time()
_tdiff += _end - _start _tdiff += _end - _start
sharpness[_key].append(_sharp) sharpness[_key].append(_sharp)
@ -488,19 +488,10 @@ def all_interval_forecasters(data_train, data_test, partitions, max_order=3,save
plot_compared_series(data_test, objs, lcolors, typeonlegend=False, save=save, file=file, tam=tam, intervals=True) plot_compared_series(data_test, objs, lcolors, typeonlegend=False, save=save, file=file, tam=tam, intervals=True)
def get_interval_statistics(original, model):
ret = list()
forecasts = model.forecastInterval(original)
ret.append(round(Measures.sharpness(forecasts), 2))
ret.append(round(Measures.resolution(forecasts), 2))
ret.append(round(Measures.coverage(original[model.order:], forecasts[:-1]), 2))
return ret
def print_interval_statistics(original, models): def print_interval_statistics(original, models):
ret = "Model & Order & Sharpness & Resolution & Coverage \\\\ \n" ret = "Model & Order & Sharpness & Resolution & Coverage \\\\ \n"
for fts in models: for fts in models:
_sharp, _res, _cov = get_interval_statistics(original, fts) _sharp, _res, _cov = Measures.get_interval_statistics(original, fts)
ret += fts.shortname + " & " ret += fts.shortname + " & "
ret += str(fts.order) + " & " ret += str(fts.order) + " & "
ret += str(_sharp) + " & " ret += str(_sharp) + " & "

View File

@ -33,7 +33,6 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo
if transformation is not None: if transformation is not None:
mfts.appendTransformation(transformation) mfts.appendTransformation(transformation)
# try:
_start = time.time() _start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order) mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time() _end = time.time()
@ -43,17 +42,9 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, transfo
_rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer) _rmse, _smape, _u = Measures.get_point_statistics(test_data, mfts, indexer)
_end = time.time() _end = time.time()
times += _end - _start times += _end - _start
# except Exception as e:
# print(e)
# _rmse = np.nan
# _smape = np.nan
# _u = np.nan
# times = np.nan
ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times, 'window': window_key} ret = {'key': _key, 'obj': mfts, 'rmse': _rmse, 'smape': _smape, 'u': _u, 'time': times, 'window': window_key}
# print(ret)
return ret return ret
@ -61,22 +52,8 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False, partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, sintetic=False,nodes=None, depends=None): save=False, file=None, sintetic=False,nodes=None, depends=None):
# dependencies = [fts, Membership, benchmarks]
# if depends is not None: dependencies.extend(depends)
# if models is not None:
# dependencies.extend(models)
# else:
# dependencies.extend(benchmarks.get_point_methods())
# dependencies.extend(partitioners)
# if transformation is not None: dependencies.extend(transformation)
# if indexer is not None: dependencies.extend(indexer)
cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies) cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies)
# import dispy's httpd module, create http server for this cluster
http_server = dispy.httpd.DispyHTTPServer(cluster) http_server = dispy.httpd.DispyHTTPServer(cluster)
_process_start = time.time() _process_start = time.time()
@ -157,23 +134,122 @@ def point_sliding_window(data, windowsize, train=0.8, models=None, partitioners=
return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u) return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u)
def compute(data): def run_interval(mfts, partitioner, train_data, test_data, transformation=None, indexer=None):
import socket import time
return (socket.gethostname(), data) from pyFTS import hofts,ifts,pwfts
from pyFTS.partitioners import Grid, Entropy, FCM
from pyFTS.benchmarks import Measures
tmp = [hofts.HighOrderFTS, ifts.IntervalFTS, pwfts.ProbabilisticWeightedFTS]
tmp2 = [Grid.GridPartitioner, Entropy.EntropyPartitioner, FCM.FCMPartitioner]
tmp3 = [Measures.get_interval_statistics]
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.appendTransformation(transformation)
_start = time.time()
mfts.train(train_data, partitioner.sets, order=mfts.order)
_end = time.time()
times = _end - _start
_start = time.time()
_sharp, _res, _cov = Measures.get_interval_statistics(test_data, mfts)
_end = time.time()
times += _end - _start
ret = {'key': _key, 'obj': mfts, 'sharpness': _sharp, 'resolution': _res, 'coverage': _cov, 'time': times}
return ret
def teste(data,nodes): def interval_sliding_window(data, windowsize, train=0.8, models=None, partitioners=[Grid.GridPartitioner],
cluster = dispy.JobCluster(compute, nodes=nodes) partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, sintetic=False,nodes=None, depends=None):
cluster = dispy.JobCluster(run_point, nodes=nodes) #, depends=dependencies)
http_server = dispy.httpd.DispyHTTPServer(cluster)
_process_start = time.time()
print("Process Start: {0: %H:%M:%S}".format(datetime.datetime.now()))
pool = []
jobs = [] jobs = []
for ct, train, test in Util.sliding_window(data, 2000, 0.8): objs = {}
job = cluster.submit(ct) sharpness = {}
resolution = {}
coverage = {}
times = {}
if models is None:
models = benchmarks.get_interval_methods()
for model in models:
mfts = model("")
if mfts.isHighOrder:
for order in np.arange(1, max_order + 1):
if order >= mfts.minOrder:
mfts = model("")
mfts.order = order
pool.append(mfts)
else:
pool.append(mfts)
experiments = 0
for ct, train, test in Util.sliding_window(data, windowsize, train):
experiments += 1
if dump: print('\nWindow: {0}\n'.format(ct))
for partition in partitions:
for partitioner in partitioners:
data_train_fs = partitioner(train, partition, transformation=transformation)
for id, m in enumerate(pool,start=0):
job = cluster.submit(m, data_train_fs, train, test, ct, transformation)
job.id = id # associate an ID to identify jobs (if needed later)
jobs.append(job) jobs.append(job)
for job in jobs: for job in jobs:
x = job() tmp = job()
print(x) if job.status == dispy.DispyJob.Finished and tmp is not None:
if tmp['key'] not in objs:
objs[tmp['key']] = tmp['obj']
sharpness[tmp['key']] = []
resolution[tmp['key']] = []
coverage[tmp['key']] = []
times[tmp['key']] = []
cluster.wait() sharpness[tmp['key']].append(tmp['sharpness'])
resolution[tmp['key']].append(tmp['resolution'])
coverage[tmp['key']].append(tmp['coverage'])
times[tmp['key']].append(tmp['time'])
print(tmp['key'])
else:
print(job.exception)
print(job.stdout)
_process_end = time.time()
print("Process End: {0: %H:%M:%S}".format(datetime.datetime.now()))
print("Process Duration: {0}".format(_process_end - _process_start))
cluster.wait() # wait for all jobs to finish
cluster.print_status()
http_server.shutdown() # this waits until browser gets all updates
cluster.close() cluster.close()
return benchmarks.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic,
times)