SQLITE3 interface for benchmarks; Support for for ProbabilityDistribution on Differentiation

This commit is contained in:
Petrônio Cândido 2018-04-22 16:59:17 -03:00
parent 6adf4710b6
commit f3c6eda2ec
6 changed files with 198 additions and 300 deletions

View File

@ -288,7 +288,7 @@ def get_point_statistics(data, model, **kwargs):
ret = list()
if steps_ahead == 1:
forecasts = model.forecast(data, **kwargs)
forecasts = model.predict(data, **kwargs)
if model.has_seasonality:
nforecasts = np.array(forecasts)
else:
@ -304,7 +304,7 @@ def get_point_statistics(data, model, **kwargs):
tmp = model.forecast_ahead(sample, steps_ahead, **kwargs)
nforecasts.append(tmp[-1])
start = model.order + steps_ahead
start = model.order + steps_ahead -1
ret.append(np.round(rmse(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
ret.append(np.round(smape(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
ret.append(np.round(UStatistic(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
@ -327,7 +327,7 @@ def get_interval_statistics(data, model, **kwargs):
ret = list()
if steps_ahead == 1:
forecasts = model.forecast_interval(data, **kwargs)
forecasts = model.predict(data, **kwargs)
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
ret.append(round(coverage(data[model.order:], forecasts[:-1]), 2))
@ -339,10 +339,10 @@ def get_interval_statistics(data, model, **kwargs):
forecasts = []
for k in np.arange(model.order, len(data) - steps_ahead):
sample = data[k - model.order: k]
tmp = model.forecast_ahead_interval(sample, steps_ahead, **kwargs)
tmp = model.predict(sample, steps_ahead, **kwargs)
forecasts.append(tmp[-1])
start = model.order + steps_ahead
start = model.order + steps_ahead -1
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
ret.append(round(coverage(data[model.order:], forecasts), 2))

View File

@ -8,6 +8,7 @@ import matplotlib.colors as pltcolors
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sqlite3
#from mpl_toolkits.mplot3d import Axes3D
@ -15,6 +16,57 @@ from copy import deepcopy
from pyFTS.common import Util
def open_benchmark_db(name):
conn = sqlite3.connect(name)
create_benchmark_tables(conn)
return conn
def create_benchmark_tables(conn):
c = conn.cursor()
c.execute('''CREATE TABLE if not exists benchmarks(
ID integer primary key, Date int, Dataset text, Tag text,
Type text, Model text, Transformation text, 'Order' int,
Scheme text, Partitions int,
Size int, Steps int, Method text, Measure text, Value real)''')
# Save (commit) the changes
conn.commit()
def insert_benchmark(data, conn):
c = conn.cursor()
c.execute("INSERT INTO benchmarks(Date, Dataset, Tag, Type, Model, "
+ "Transformation, 'Order', Scheme, Partitions, "
+ "Size, Steps, Method, Measure, Value) "
+ "VALUES(datetime('now'),?,?,?,?,?,?,?,?,?,?,?,?,?)", data)
conn.commit()
def process_common_data(dataset, tag, type, job):
model = job["obj"]
if not model.benchmark_only:
data = [dataset, tag, type, model.shortname,
str(model.partitioner.transformation) if model.partitioner.transformation is not None else None,
model.order, model.partitioner.name, str(model.partitioner.partitions),
len(model), job['steps'], job['method']]
else:
data = [tag, type, model.shortname, None, model.order, None, None,
None, job['steps'], job['method']]
return data
def get_dataframe_from_bd(file, filter):
con = sqlite3.connect(file)
sql = "SELECT * from benchmarks"
if filter is not None:
sql += " WHERE " + filter
return pd.read_sql_query(sql, con)
def extract_measure(dataframe, measure, data_columns):
if not dataframe.empty:
df = dataframe[(dataframe.Measure == measure)][data_columns]
@ -45,6 +97,7 @@ def find_best(dataframe, criteria, ascending):
return ret
def analytic_tabular_dataframe(dataframe):
experiments = len(dataframe.columns) - len(base_dataframe_columns()) - 1
models = dataframe.Model.unique()

View File

@ -90,10 +90,14 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
:return: DataFrame with the benchmark results
"""
tag = __pop('tag', None, kwargs)
dataset = __pop('dataset', None, kwargs)
distributed = __pop('distributed', False, kwargs)
save = __pop('save', False, kwargs)
transformation = kwargs.get('transformation', None)
transformations = kwargs.get('transformations', [None])
progress = kwargs.get('progress', None)
type = kwargs.get("type", 'point')
@ -192,6 +196,8 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
if partitioners_models is None:
for transformation in transformations:
for partition in partitions:
for partitioner in partitioners_methods:
@ -206,6 +212,10 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
if progress:
rng1 = tqdm(steps_ahead, desc="Steps")
file = kwargs.get('file', "benchmarks.db")
conn = bUtil.open_benchmark_db(file)
for step in rng1:
rng2 = partitioners_pool
@ -225,7 +235,7 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
if not distributed:
job = experiment_method(deepcopy(model), deepcopy(partitioner), train, test, **kwargs)
jobs.append(job)
synthesis_method(dataset, tag, job, conn)
else:
job = cluster.submit(deepcopy(model), deepcopy(partitioner), train, test, **kwargs)
job.id = id # associate an ID to identify jobs (if needed later)
@ -239,29 +249,29 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
rng = jobs
cluster.wait() # wait for all jobs to finish
if progress:
rng = tqdm(jobs)
for job in rng:
job()
if job.status == dispy.DispyJob.Finished and job is not None:
tmp = job()
jobs2.append(tmp)
tmp = job.result
synthesis_method(dataset, tag, tmp, conn)
else:
print("status",job.status)
print("result",job.result)
print("stdout",job.stdout)
print("stderr",job.exception)
jobs = deepcopy(jobs2)
cluster.wait() # wait for all jobs to finish
cUtil.stop_dispy_cluster(cluster, http_server)
file = kwargs.get('file', None)
conn.close()
sintetic = kwargs.get('sintetic', False)
return synthesis_method(jobs, experiments, save, file, sintetic)
#return synthesis_method(jobs, experiments, save, file, sintetic)
def get_benchmark_point_methods():
@ -326,7 +336,6 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, **kwarg
tmp5 = [Transformations.Differential]
transformation = kwargs.get('transformation', None)
indexer = kwargs.get('indexer', None)
steps_ahead = kwargs.get('steps_ahead', 1)
@ -338,13 +347,11 @@ def run_point(mfts, partitioner, train_data, test_data, window_key=None, **kwarg
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
mfts.append_transformation(partitioner.transformation)
_key += str(steps_ahead)
_key += str(method) if method is not None else ""
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.fit(train_data, order=mfts.order, **kwargs)
_end = time.time()
@ -386,9 +393,6 @@ def run_interval(mfts, partitioner, train_data, test_data, window_key=None, **kw
tmp3 = [Measures.get_interval_statistics]
transformation = kwargs.get('transformation', None)
indexer = kwargs.get('indexer', None)
steps_ahead = kwargs.get('steps_ahead', 1)
method = kwargs.get('method', None)
@ -398,9 +402,7 @@ def run_interval(mfts, partitioner, train_data, test_data, window_key=None, **kw
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
if transformation is not None:
mfts.append_transformation(transformation)
mfts.append_transformation(partitioner.transformation)
_key += str(steps_ahead)
_key += str(method) if method is not None else ""
@ -450,7 +452,6 @@ def run_probabilistic(mfts, partitioner, train_data, test_data, window_key=None,
tmp3 = [Measures.get_distribution_statistics, SeasonalIndexer.SeasonalIndexer, SeasonalIndexer.LinearSeasonalIndexer]
transformation = kwargs.get('transformation', None)
indexer = kwargs.get('indexer', None)
steps_ahead = kwargs.get('steps_ahead', 1)
@ -462,13 +463,11 @@ def run_probabilistic(mfts, partitioner, train_data, test_data, window_key=None,
pttr = str(partitioner.__module__).split('.')[-1]
_key = mfts.shortname + " n = " + str(mfts.order) + " " + pttr + " q = " + str(partitioner.partitions)
mfts.partitioner = partitioner
mfts.append_transformation(partitioner.transformation)
_key += str(steps_ahead)
_key += str(method) if method is not None else ""
if transformation is not None:
mfts.append_transformation(transformation)
if mfts.has_seasonality:
mfts.indexer = indexer
@ -491,126 +490,64 @@ def run_probabilistic(mfts, partitioner, train_data, test_data, window_key=None,
return ret
def build_model_pool_point(models, max_order, benchmark_models, benchmark_models_parameters):
pool = []
if models is None:
models = get_point_methods()
for model in models:
mfts = model("")
def process_point_jobs(dataset, tag, job, conn):
if mfts.is_high_order:
for order in np.arange(1, max_order + 1):
if order >= mfts.min_order:
mfts = model("")
mfts.order = order
pool.append(mfts)
else:
mfts.order = 1
pool.append(mfts)
data = bUtil.process_common_data(dataset, tag, 'point',job)
if benchmark_models is not None:
for count, model in enumerate(benchmark_models, start=0):
par = benchmark_models_parameters[count]
mfts = model(str(par if par is not None else ""))
mfts.order = par
pool.append(mfts)
return pool
rmse = deepcopy(data)
rmse.extend(["rmse", job["rmse"]])
bUtil.insert_benchmark(rmse, conn)
smape = deepcopy(data)
smape.extend(["smape", job["smape"]])
bUtil.insert_benchmark(smape, conn)
u = deepcopy(data)
u.extend(["u", job["u"]])
bUtil.insert_benchmark(u, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
def process_point_jobs(jobs, experiments, save=False, file=None, sintetic=False):
objs = {}
rmse = {}
smape = {}
u = {}
times = {}
steps = {}
method = {}
def process_interval_jobs(dataset, tag, job, conn):
for job in jobs:
_key = job['key']
if _key not in objs:
objs[_key] = job['obj']
rmse[_key] = []
smape[_key] = []
u[_key] = []
times[_key] = []
steps[_key] = []
method[_key] = []
steps[_key] = job['steps']
method[_key] = job['method']
rmse[_key].append(job['rmse'])
smape[_key].append(job['smape'])
u[_key].append(job['u'])
times[_key].append(job['time'])
data = bUtil.process_common_data(dataset, tag, 'interval', job)
return bUtil.save_dataframe_point(experiments, file, objs, rmse, save, sintetic, smape, times, u, steps, method)
sharpness = deepcopy(data)
sharpness.extend(["sharpness", job["sharpness"]])
bUtil.insert_benchmark(sharpness, conn)
resolution = deepcopy(data)
resolution.extend(["resolution", job["resolution"]])
bUtil.insert_benchmark(resolution, conn)
coverage = deepcopy(data)
coverage.extend(["coverage", job["coverage"]])
bUtil.insert_benchmark(coverage, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
Q05 = deepcopy(data)
Q05.extend(["Q05", job["Q05"]])
bUtil.insert_benchmark(Q05, conn)
Q25 = deepcopy(data)
Q25.extend(["Q25", job["Q25"]])
bUtil.insert_benchmark(Q25, conn)
Q75 = deepcopy(data)
Q75.extend(["Q75", job["Q75"]])
bUtil.insert_benchmark(Q75, conn)
Q95 = deepcopy(data)
Q95.extend(["Q95", job["Q95"]])
bUtil.insert_benchmark(Q95, conn)
def process_interval_jobs(jobs, experiments, save=False, file=None, sintetic=False):
objs = {}
sharpness = {}
resolution = {}
coverage = {}
q05 = {}
q25 = {}
q75 = {}
q95 = {}
times = {}
steps = {}
method = {}
def process_probabilistic_jobs(dataset, tag, job, conn):
for job in jobs:
_key = job['key']
if _key not in objs:
objs[_key] = job['obj']
sharpness[_key] = []
resolution[_key] = []
coverage[_key] = []
times[_key] = []
q05[_key] = []
q25[_key] = []
q75[_key] = []
q95[_key] = []
steps[_key] = []
method[_key] = []
sharpness[_key].append(job['sharpness'])
resolution[_key].append(job['resolution'])
coverage[_key].append(job['coverage'])
times[_key].append(job['time'])
q05[_key].append(job['Q05'])
q25[_key].append(job['Q25'])
q75[_key].append(job['Q75'])
q95[_key].append(job['Q95'])
steps[_key] = job['steps']
method[_key] = job['method']
return bUtil.save_dataframe_interval(coverage, experiments, file, objs, resolution, save, sharpness, sintetic,
times, q05, q25, q75, q95, steps, method)
def process_probabilistic_jobs(jobs, experiments, save=False, file=None, sintetic=False):
objs = {}
crps = {}
times = {}
steps = {}
method = {}
for job in jobs:
_key = job['key']
if _key not in objs:
objs[_key] = job['obj']
crps[_key] = []
times[_key] = []
steps[_key] = []
method[_key] = []
crps[_key].append(job['CRPS'])
times[_key].append(job['time'])
steps[_key] = job['steps']
method[_key] = job['method']
return bUtil.save_dataframe_probabilistic(experiments, file, objs, crps, times, save, sintetic, steps, method)
data = bUtil.process_common_data(dataset, tag, 'density', job)
crps = deepcopy(data)
crps.extend(["CRPS",job["CRPS"]])
bUtil.insert_benchmark(crps, conn)
time = deepcopy(data)
time.extend(["time", job["time"]])
bUtil.insert_benchmark(time, conn)
def print_point_statistics(data, models, externalmodels = None, externalforecasts = None, indexers=None):
@ -636,7 +573,6 @@ def print_point_statistics(data, models, externalmodels = None, externalforecast
print(ret)
def print_interval_statistics(original, models):
ret = "Model & Order & Sharpness & Resolution & Coverage & .05 & .25 & .75 & .95 \\\\ \n"
for fts in models:
@ -653,151 +589,6 @@ def print_interval_statistics(original, models):
print(ret)
def ahead_sliding_window(data, windowsize, train, steps, models=None, resolution = None, partitioners=[Grid.GridPartitioner],
partitions=[10], max_order=3, transformation=None, indexer=None, dump=False,
save=False, file=None, synthetic=False):
if models is None:
models = [pwfts.ProbabilisticWeightedFTS]
objs = {}
lcolors = {}
crps_interval = {}
crps_distr = {}
times1 = {}
times2 = {}
experiments = 0
for ct, train,test in cUtil.sliding_window(data, windowsize, train):
experiments += 1
for partition in partitions:
for partitioner in partitioners:
pttr = str(partitioner.__module__).split('.')[-1]
data_train_fs = partitioner(data=train, npart=partition, transformation=transformation)
for count, model in enumerate(models, start=0):
mfts = model("")
_key = mfts.shortname + " " + pttr+ " q = " +str(partition)
mfts.partitioner = data_train_fs
if not mfts.is_high_order:
if dump: print(ct,_key)
if _key not in objs:
objs[_key] = mfts
lcolors[_key] = colors[count % ncol]
crps_interval[_key] = []
crps_distr[_key] = []
times1[_key] = []
times2[_key] = []
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.train(train, sets=data_train_fs.sets)
_end = time.time()
_tdiff = _end - _start
_crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(test,mfts,steps=steps,resolution=resolution)
crps_interval[_key].append_rhs(_crps1)
crps_distr[_key].append_rhs(_crps2)
times1[_key] = _tdiff + _t1
times2[_key] = _tdiff + _t2
if dump: print(_crps1, _crps2, _tdiff, _t1, _t2)
else:
for order in np.arange(1, max_order + 1):
if order >= mfts.min_order:
mfts = model("")
_key = mfts.shortname + " n = " + str(order) + " " + pttr + " q = " + str(partition)
mfts.partitioner = data_train_fs
if dump: print(ct,_key)
if _key not in objs:
objs[_key] = mfts
lcolors[_key] = colors[count % ncol]
crps_interval[_key] = []
crps_distr[_key] = []
times1[_key] = []
times2[_key] = []
if transformation is not None:
mfts.append_transformation(transformation)
_start = time.time()
mfts.train(train, sets=data_train_fs.sets, order=order)
_end = time.time()
_tdiff = _end - _start
_crps1, _crps2, _t1, _t2 = Measures.get_distribution_statistics(test, mfts, steps=steps,
resolution=resolution)
crps_interval[_key].append_rhs(_crps1)
crps_distr[_key].append_rhs(_crps2)
times1[_key] = _tdiff + _t1
times2[_key] = _tdiff + _t2
if dump: print(_crps1, _crps2, _tdiff, _t1, _t2)
return bUtil.save_dataframe_ahead(experiments, file, objs, crps_interval, crps_distr, times1, times2, save, synthetic)
def all_ahead_forecasters(data_train, data_test, partitions, start, steps, resolution = None, max_order=3,save=False, file=None, tam=[20, 5],
models=None, transformation=None, option=2):
if models is None:
models = [pwfts.ProbabilisticWeightedFTS]
if resolution is None: resolution = (max(data_train) - min(data_train)) / 100
objs = []
data_train_fs = Grid.GridPartitioner(data=data_train, npart=partitions, transformation=transformation).sets
lcolors = []
for count, model in cUtil.enumerate2(models, start=0, step=2):
mfts = model("")
if not mfts.is_high_order:
if transformation is not None:
mfts.append_transformation(transformation)
mfts.train(data_train, sets=data_train_fs.sets)
objs.append(mfts)
lcolors.append( colors[count % ncol] )
else:
for order in np.arange(1,max_order+1):
if order >= mfts.min_order:
mfts = model(" n = " + str(order))
if transformation is not None:
mfts.append_transformation(transformation)
mfts.train(data_train, sets=data_train_fs.sets, order=order)
objs.append(mfts)
lcolors.append(colors[count % ncol])
distributions = [False for k in objs]
distributions[0] = True
print_distribution_statistics(data_test[start:], objs, steps, resolution)
plot_compared_intervals_ahead(data_test, objs, lcolors, distributions=distributions, time_from=start, time_to=steps,
interpol=False, save=save, file=file, tam=tam, resolution=resolution, option=option)
def print_distribution_statistics(original, models, steps, resolution):
ret = "Model & Order & Interval & Distribution \\\\ \n"
for fts in models:

View File

@ -44,11 +44,15 @@ class Differential(Transformation):
"""
Differentiation data transform
"""
def __init__(self, parameters):
def __init__(self, lag):
super(Differential, self).__init__()
self.lag = parameters
self.lag = lag
self.minimal_length = 2
@property
def parameters(self):
return self.lag
def apply(self, data, param=None, **kwargs):
if param is not None:
self.lag = param
@ -66,7 +70,7 @@ class Differential(Transformation):
def inverse(self, data, param, **kwargs):
interval = kwargs.get("point_to_interval",False)
type = kwargs.get("type","point")
if isinstance(data, (np.ndarray, np.generic)):
data = data.tolist()
@ -79,10 +83,14 @@ class Differential(Transformation):
# print(n)
# print(len(param))
if not interval:
if type == "point":
inc = [data[t] + param[t] for t in np.arange(0, n)]
else:
elif type == "interval":
inc = [[data[t][0] + param[t], data[t][1] + param[t]] for t in np.arange(0, n)]
elif type == "distribution":
for t in np.arange(0, n):
data[t].differential_offset(param[t])
inc = data
if n == 1:
return inc[0]
@ -103,6 +111,10 @@ class Scale(Transformation):
self.transf_max = max
self.transf_min = min
@property
def parameters(self):
return [self.transf_max, self.transf_min]
def apply(self, data, param=None,**kwargs):
if self.data_max is None:
self.data_max = np.nanmax(data)
@ -138,6 +150,10 @@ class AdaptiveExpectation(Transformation):
super(AdaptiveExpectation, self).__init__(parameters)
self.h = parameters
@property
def parameters(self):
return self.parameters
def apply(self, data, param=None,**kwargs):
return data
@ -160,6 +176,10 @@ class BoxCox(Transformation):
super(BoxCox, self).__init__()
self.plambda = plambda
@property
def parameters(self):
return self.plambda
def apply(self, data, param=None, **kwargs):
if self.plambda != 0:
modified = [(dat ** self.plambda - 1) / self.plambda for dat in data]

View File

@ -95,6 +95,25 @@ class ProbabilityDistribution(object):
return ret
def differential_offset(self, value):
nbins = []
dist = {}
for k in self.bins:
nk = k+value
nbins.append(nk)
dist[nk] = self.distribution[k]
self.bins = nbins
self.distribution = dist
self.labels = [str(k) for k in self.bins]
self.bin_index = SortedCollection.SortedCollection(iterable=sorted(self.bins))
self.quantile_index = None
self.cdf = None
self.qtl = None
def expected_value(self):
return np.nansum([v * self.distribution[v] for v in self.bins])

View File

@ -19,19 +19,34 @@ from pyFTS.benchmarks import benchmarks as bchmk, Util as bUtil
from pyFTS.models import pwfts
from pyFTS.partitioners import Grid, Util as pUtil
partitioner = Grid.GridPartitioner(data=dataset[:800], npart=10, transformation=tdiff)
model = pwfts.ProbabilisticWeightedFTS('',partitioner=partitioner)
#model.append_transformation(tdiff)
model.fit(dataset[:800])
print(model.predict(dataset[800:1000], type='interval'))
'''
bchmk.sliding_window_benchmarks(dataset, 1000, train=0.8, inc=0.2, methods=[pwfts.ProbabilisticWeightedFTS],
benchmark_models=False, orders=[1,2,3], partitions=np.arange(10,100,5),
progress=False, type='point',
#steps_ahead=[1,4,7,10], steps_ahead_sampler=10,
distributed=True, nodes=['192.168.0.102','192.168.0.106','192.168.0.110'],
save=True, file="pwfts_taiex_partitioning.csv")
'''
benchmark_models=False,
#transformations=[tdiff],
orders=[1, 2, 3],
partitions=np.arange(10, 100, 5),
progress=False, type='distribution',
#steps_ahead=[1,4,7,10], #steps_ahead=[1]
distributed=True, nodes=['192.168.0.110', '192.168.0.100','192.168.0.106'],
file="benchmarks.db", dataset="TAIEX", tag="partitioning")
#save=True, file="tmp.db")
'''
'''
dat = pd.read_csv('pwfts_taiex_partitioning.csv', sep=';')
print(bUtil.analytic_tabular_dataframe(dat))
#print(dat["Size"].values[0])
'''
'''
train_split = 2000
test_length = 200