Refactoring to remove FTS.sets attribure; Refactoring to move Partitioner.extract to a class method; Bugfixes in distributed fit and predict

This commit is contained in:
Petrônio Cândido 2019-06-20 17:38:36 -03:00
parent f2bf2b5d87
commit 354a3131c9
18 changed files with 266 additions and 210 deletions

View File

@ -824,6 +824,17 @@ def run_probabilistic2(fts_method, order, partitioner_method, partitions, transf
return ret
def common_process_time_jobs(conn, data, job):
dta = deepcopy(data)
dta.append(job['steps'])
dta.append(job['method'])
for key in ["time"]:
if key in job:
data2 = deepcopy(dta)
data2.extend([key, job[key]])
bUtil.insert_benchmark(data2, conn)
def common_process_point_jobs(conn, data, job):
dta = deepcopy(data)
dta.append(job['steps'])
@ -1416,3 +1427,50 @@ def pftsExploreOrderAndPartitions(data,save=False, file=None):
cUtil.show_and_save_image(fig, file, save)
def train_test_time(data, windowsize, train=0.8, **kwargs):
import time
tag = __pop('tag', None, kwargs)
steps = __pop('steps', 0, kwargs)
dataset = __pop('dataset', None, kwargs)
partitions = __pop('partitions', 10, kwargs)
fts_methods = __pop('methods', [], kwargs)
file = kwargs.get('file', "benchmarks.db")
inc = __pop("inc", 0.1, kwargs)
conn = bUtil.open_benchmark_db(file)
for ct, train, test in cUtil.sliding_window(data, windowsize, train, inc=inc, **kwargs):
partitioner = Grid.GridPartitioner(data=train, npart=partitions)
for id, fts_method in enumerate(fts_methods):
print(dataset, fts_method, ct)
times = []
model = fts_method(partitioner = partitioner, **kwargs)
_start = time.time()
model.fit(train, **kwargs)
_end = time.time()
times.append( _end - _start )
_start = time.time()
model.predict(train, **kwargs)
_end = time.time()
times.append(_end - _start)
for ct, method in enumerate(['train','test']):
job = {
'steps': steps, 'method': method, 'time': times[ct],
'model': model.shortname, 'transformation': None,
'order': model.order, 'partitioner': partitioner.name,
'partitions': partitions, 'size': len(model)
}
data = bUtil.process_common_data2(dataset, tag, 'train', job)
common_process_time_jobs(conn, data, job)
conn.close()

View File

@ -13,8 +13,6 @@ from pyFTS.probabilistic import ProbabilityDistribution
from pyFTS.common import Transformations
def plot_compared_intervals_ahead(original, models, colors, distributions, time_from, time_to, intervals = True,
save=False, file=None, tam=[20, 5], resolution=None,
cmap='Blues', linewidth=1.5):
@ -126,8 +124,9 @@ def plot_probability_distributions(pmfs, lcolors, tam=[15, 7]):
handles0, labels0 = ax.get_legend_handles_labels()
ax.legend(handles0, labels0)
def plot_distribution(ax, cmap, probabilitydist, fig, time_from, reference_data=None):
'''
"""
Plot forecasted ProbabilityDistribution objects on a matplotlib axis
:param ax: matplotlib axis
@ -137,7 +136,7 @@ def plot_distribution(ax, cmap, probabilitydist, fig, time_from, reference_data=
:param time_from: starting time (on x axis) to begin the plots
:param reference_data:
:return:
'''
"""
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
patches = []
@ -163,7 +162,7 @@ def plot_distribution(ax, cmap, probabilitydist, fig, time_from, reference_data=
def plot_distribution2(probabilitydist, data, **kwargs):
'''
"""
Plot distributions over the time (x-axis)
:param probabilitydist: the forecasted probability distributions to plot
@ -173,7 +172,7 @@ def plot_distribution2(probabilitydist, data, **kwargs):
:keyword cmap: a matplotlib colormap name, the default value is 'Blues'
:keyword quantiles: the list of quantiles intervals to plot, e. g. [.05, .25, .75, .95]
:keyword median: a boolean value indicating if the median value will be plot.
'''
"""
import matplotlib.colorbar as cbar
import matplotlib.cm as cm
@ -225,7 +224,7 @@ def plot_distribution2(probabilitydist, data, **kwargs):
def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False, ls='-', linewidth=1):
'''
"""
Plot forecasted intervals on matplotlib
:param axis: matplotlib axis
@ -237,7 +236,7 @@ def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False
:param ls: matplotlib line style
:param linewidth: matplotlib width
:return:
'''
"""
lower = [kk[0] for kk in intervals]
upper = [kk[1] for kk in intervals]
mi = min(lower) * 0.95
@ -252,7 +251,7 @@ def plot_interval(axis, intervals, order, label, color='red', typeonlegend=False
def plot_interval2(intervals, data, **kwargs):
'''
"""
Plot forecasted intervals on matplotlib
:param intervals: list of forecasted intervals
@ -263,7 +262,7 @@ def plot_interval2(intervals, data, **kwargs):
:keyword typeonlegend:
:keyword ls: matplotlib line style
:keyword linewidth: matplotlib width
'''
"""
l = len(intervals)
@ -296,7 +295,7 @@ def plot_interval2(intervals, data, **kwargs):
def plot_rules(model, size=[5, 5], axis=None, rules_by_axis=None, columns=1):
'''
"""
Plot the FLRG rules of a FTS model on a matplotlib axis
:param model: FTS model
@ -305,7 +304,7 @@ def plot_rules(model, size=[5, 5], axis=None, rules_by_axis=None, columns=1):
:param rules_by_axis: number of rules plotted by column
:param columns: number of columns
:return:
'''
"""
if axis is None and rules_by_axis is None:
rows = 1
elif axis is None and rules_by_axis is not None:
@ -412,7 +411,6 @@ def uniquefilename(name):
return name + str(current_milli_time())
def show_and_save_image(fig, file, flag, lgd=None):
"""
Show and image and save on file

View File

@ -11,9 +11,6 @@ class FTS(object):
"""
Create a Fuzzy Time Series model
"""
self.sets = {}
"""The list of fuzzy sets used on this model"""
self.flrgs = {}
"""The list of Fuzzy Logical Relationship Groups - FLRG"""
self.order = kwargs.get('order',1)
@ -83,8 +80,8 @@ class FTS(object):
"""
best = {"fuzzyset": "", "membership": 0.0}
for f in self.sets:
fset = self.sets[f]
for f in self.partitioner.sets:
fset = self.partitioner.sets[f]
if best["membership"] <= fset.membership(data):
best["fuzzyset"] = fset.name
best["membership"] = fset.membership(data)
@ -129,13 +126,13 @@ class FTS(object):
else:
distributed = False
if distributed is None or distributed == False:
if 'type' in kwargs:
type = kwargs.pop("type")
else:
type = 'point'
if distributed is None or distributed == False:
steps_ahead = kwargs.get("steps_ahead", None)
if steps_ahead == None or steps_ahead == 1:
@ -321,25 +318,19 @@ class FTS(object):
self.original_min = np.nanmin(data)
self.original_max = np.nanmax(data)
if 'sets' in kwargs:
self.sets = kwargs.pop('sets')
if 'partitioner' in kwargs:
self.partitioner = kwargs.pop('partitioner')
if not self.is_wrapper:
if (self.sets is None or len(self.sets) == 0) and not self.benchmark_only and not self.is_multivariate:
if self.partitioner is not None:
self.sets = self.partitioner.sets
else:
raise Exception("Fuzzy sets were not provided for the model. Use 'sets' parameter or 'partitioner'. ")
if not self.is_wrapper and not self.benchmark_only:
if self.partitioner is None:
raise Exception("Fuzzy sets were not provided for the model. Use 'partitioner' parameter. ")
if 'order' in kwargs:
self.order = kwargs.pop('order')
dump = kwargs.get('dump', None)
num_batches = kwargs.get('num_batches', 1)
num_batches = kwargs.get('num_batches', 10)
save = kwargs.get('save_model', False) # save model on disk
@ -419,6 +410,8 @@ class FTS(object):
"""
self.order = model.order
self.partitioner = model.partitioner
self.lags = model.lags
self.shortname = model.shortname
self.name = model.name
self.detail = model.detail
@ -434,8 +427,6 @@ class FTS(object):
self.transformations_param = model.transformations_param
self.original_max = model.original_max
self.original_min = model.original_min
self.partitioner = model.partitioner
self.sets = model.sets
self.auto_update = model.auto_update
self.benchmark_only = model.benchmark_only
self.indexer = model.indexer

View File

@ -4,6 +4,13 @@ import numpy as np
def start_dispy_cluster(method, nodes):
"""
Start a new Dispy cluster on 'nodes' to execute the method 'method'
:param method: function to be executed on each cluster node
:param nodes: list of node names or IP's.
:return: the dispy cluster instance and the http_server for monitoring
"""
cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000)
@ -13,6 +20,13 @@ def start_dispy_cluster(method, nodes):
def stop_dispy_cluster(cluster, http_server):
"""
Stop a dispy cluster and http_server
:param cluster:
:param http_server:
:return:
"""
cluster.wait() # wait for all jobs to finish
cluster.print_status()
@ -21,7 +35,23 @@ def stop_dispy_cluster(cluster, http_server):
cluster.close()
def get_number_of_cpus(cluster):
cpus = 0
for dispy_node in cluster.status().nodes:
cpus += dispy_node.cpus
return cpus
def simple_model_train(model, data, parameters):
"""
Cluster function that receives a FTS instance 'model' and train using the 'data' and 'parameters'
:param model: a FTS instance
:param data: training dataset
:param parameters: parameters for the training process
:return: the trained model
"""
model.train(data, **parameters)
return model
@ -38,7 +68,8 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches=
cluster, http_server = start_dispy_cluster(train_method, nodes)
print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now()))
print("[{0: %H:%M:%S}] Distrituted Train Started with {1} CPU's"
.format(datetime.datetime.now(), get_number_of_cpus(cluster)))
jobs = []
n = len(data)

View File

@ -10,8 +10,8 @@ import random
from pyFTS.common import Util
from pyFTS.benchmarks import Measures
from pyFTS.partitioners import Grid, Entropy # , Huarng
from pyFTS.models import hofts
from pyFTS.common import Membership
from pyFTS.models import hofts, ifts, pwfts
from pyFTS.hyperparam import Util as hUtil
from pyFTS.distributed import dispy as dUtil
@ -71,7 +71,7 @@ def initial_population(n):
return pop
def phenotype(individual, train, fts_method=hofts.WeightedHighOrderFTS, parameters={}):
def phenotype(individual, train, fts_method, parameters={}):
"""
Instantiate the genotype, creating a fitted model with the genotype hyperparameters
@ -81,6 +81,8 @@ def phenotype(individual, train, fts_method=hofts.WeightedHighOrderFTS, paramete
:param parameters: dict with model specific arguments for fit method.
:return: a fitted FTS model
"""
from pyFTS.models import hofts, ifts, pwfts
if individual['mf'] == 1:
mf = Membership.trimf
elif individual['mf'] == 2:
@ -117,6 +119,7 @@ def evaluate(dataset, individual, **kwargs):
:param parameters: dict with model specific arguments for fit method.
:return: a tuple (len_lags, rmse) with the parsimony fitness value and the accuracy fitness value
"""
from pyFTS.models import hofts, ifts, pwfts
from pyFTS.common import Util
from pyFTS.benchmarks import Measures
from pyFTS.hyperparam.Evolutionary import phenotype, __measures
@ -306,7 +309,7 @@ def mutation(individual, **kwargs):
return individual
def elitism(population, new_population):
def elitism(population, new_population, **kwargs):
"""
Elitism operation, always select the best individual of the population and discard the worst
@ -418,12 +421,12 @@ def GeneticAlgorithm(dataset, **kwargs):
# Selection
for j in range(int(npop * psel)):
new_population.append(selection_operator(population))
new_population.append(selection_operator(population, **kwargs))
# Crossover
new = []
for j in range(int(npop * pcross)):
new.append(crossover_operator(new_population))
new.append(crossover_operator(new_population, **kwargs))
new_population.extend(new)
@ -431,7 +434,7 @@ def GeneticAlgorithm(dataset, **kwargs):
for ct, individual in enumerate(new_population):
rnd = random.uniform(0, 1)
if rnd < pmut:
new_population[ct] = mutation_operator(individual)
new_population[ct] = mutation_operator(individual, **kwargs)
# Evaluation
if collect_statistics:
@ -472,7 +475,7 @@ def GeneticAlgorithm(dataset, **kwargs):
# Elitism
if _elitism:
population = elitism_operator(population, new_population)
population = elitism_operator(population, new_population, **kwargs)
population = population[:npop]
@ -503,22 +506,22 @@ def GeneticAlgorithm(dataset, **kwargs):
return best, statistics
def process_experiment(result, datasetname, conn):
log_result(conn, datasetname, result['individual'])
persist_statistics(result['statistics'])
def process_experiment(fts_method, result, datasetname, conn):
log_result(conn, datasetname, fts_method, result['individual'])
persist_statistics(datasetname, result['statistics'])
return result['individual']
def persist_statistics(statistics):
def persist_statistics(datasetname, statistics):
import json
with open('statistics{}.json'.format(time.time()), 'w') as file:
with open('statistics_{}.json'.format(datasetname), 'w') as file:
file.write(json.dumps(statistics))
def log_result(conn, datasetname, result):
def log_result(conn, datasetname, fts_method, result):
metrics = ['rmse', 'size', 'time']
for metric in metrics:
record = (datasetname, 'Evolutive', 'WHOFTS', None, result['mf'],
record = (datasetname, 'Evolutive', fts_method, None, result['mf'],
result['order'], result['partitioner'], result['npart'],
result['alpha'], str(result['lags']), metric, result[metric])
@ -568,6 +571,9 @@ def execute(datasetname, dataset, **kwargs):
distributed = kwargs.get('distributed', False)
fts_method = kwargs.get('fts_method', hofts.WeightedHighOrderFTS)
shortname = str(fts_method.__module__).split('.')[-1]
if distributed == 'dispy':
nodes = kwargs.get('nodes', ['127.0.0.1'])
cluster, http_server = dUtil.start_dispy_cluster(evaluate, nodes=nodes)
@ -583,7 +589,7 @@ def execute(datasetname, dataset, **kwargs):
ret['time'] = end - start
experiment = {'individual': ret, 'statistics': statistics}
ret = process_experiment(experiment, datasetname, conn)
ret = process_experiment(shortname, experiment, datasetname, conn)
if distributed == 'dispy':
dUtil.stop_dispy_cluster(cluster, http_server)

View File

@ -64,7 +64,7 @@ class ConventionalFTS(fts.FTS):
for k in np.arange(0, l):
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets)
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets)
if explain:
print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name))
@ -78,7 +78,7 @@ class ConventionalFTS(fts.FTS):
else:
_flrg = self.flrgs[actual.name]
mp = _flrg.get_midpoint(self.sets)
mp = _flrg.get_midpoint(self.partitioner.sets)
ret.append(mp)

View File

@ -33,9 +33,9 @@ class IntervalFTS(hofts.HighOrderFTS):
if len(flrg.LHS) > 0:
if flrg.get_key() in self.flrgs:
tmp = self.flrgs[flrg.get_key()]
ret = tmp.get_upper(self.sets)
ret = tmp.get_upper(self.partitioner.sets)
else:
ret = self.sets[flrg.LHS[-1]].upper
ret = self.partitioner.sets[flrg.LHS[-1]].upper
return ret
def get_lower(self, flrg):
@ -74,7 +74,7 @@ class IntervalFTS(hofts.HighOrderFTS):
for flrg in flrgs:
if len(flrg.LHS) > 0:
mv = flrg.get_membership(sample, self.sets)
mv = flrg.get_membership(sample, self.partitioner.sets)
up.append(mv * self.get_upper(flrg))
lo.append(mv * self.get_lower(flrg))
affected_flrgs_memberships.append(mv)
@ -119,9 +119,9 @@ class WeightedIntervalFTS(hofts.WeightedHighOrderFTS):
if len(flrg.LHS) > 0:
if flrg.get_key() in self.flrgs:
tmp = self.flrgs[flrg.get_key()]
ret = tmp.get_upper(self.sets)
ret = tmp.get_upper(self.partitioner.sets)
else:
ret = self.sets[flrg.LHS[-1]].upper
ret = self.partitioner.sets[flrg.LHS[-1]].upper
return ret
def get_lower(self, flrg):
@ -159,7 +159,7 @@ class WeightedIntervalFTS(hofts.WeightedHighOrderFTS):
for flrg in flrgs:
if len(flrg.LHS) > 0:
mv = flrg.get_membership(sample, self.sets)
mv = flrg.get_membership(sample, self.partitioner.sets)
up.append(mv * self.get_upper(flrg))
lo.append(mv * self.get_lower(flrg))
affected_flrgs_memberships.append(mv)

View File

@ -74,7 +74,7 @@ class ImprovedWeightedFTS(fts.FTS):
if self.partitioner is not None:
ordered_sets = self.partitioner.ordered_sets
else:
ordered_sets = FuzzySet.set_ordered(self.sets)
ordered_sets = FuzzySet.set_ordered(self.partitioner.sets)
ndata = np.array(ndata)
@ -84,7 +84,7 @@ class ImprovedWeightedFTS(fts.FTS):
for k in np.arange(0, l):
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets)
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets)
if explain:
print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name))
@ -97,7 +97,7 @@ class ImprovedWeightedFTS(fts.FTS):
else:
flrg = self.flrgs[actual.name]
mp = flrg.get_midpoints(self.sets)
mp = flrg.get_midpoints(self.partitioner.sets)
final = mp.dot(flrg.weights())

View File

@ -55,6 +55,7 @@ class ConditionalVarianceFTS(hofts.HighOrderFTS):
self.variance_residual = 0.
self.mean_residual = 0.
self.memory_window = kwargs.get("memory_window",5)
self.sets = {}
def train(self, ndata, **kwargs):

View File

@ -69,11 +69,11 @@ class NonStationaryFTS(fts.FTS):
if self.method == 'unconditional':
window_size = kwargs.get('parameters', 1)
tmpdata = common.fuzzySeries(data, self.sets,
tmpdata = common.fuzzySeries(data, self.partitioner.sets,
self.partitioner.ordered_sets,
window_size, method='fuzzy')
else:
tmpdata = common.fuzzySeries(data, self.sets,
tmpdata = common.fuzzySeries(data, self.partitioner.sets,
self.partitioner.ordered_sets,
method='fuzzy', const_t=0)
@ -190,9 +190,9 @@ class NonStationaryFTS(fts.FTS):
ix = affected_sets[0][0]
aset = self.partitioner.ordered_sets[ix]
if aset in self.flrgs:
numerator.append(self.flrgs[aset].get_midpoint(self.sets, perturb[ix]))
numerator.append(self.flrgs[aset].get_midpoint(self.partitioner.sets, perturb[ix]))
else:
fuzzy_set = self.sets[aset]
fuzzy_set = self.partitioner.sets[aset]
numerator.append(fuzzy_set.get_midpoint(perturb[ix]))
denominator.append(1)
else:
@ -201,9 +201,9 @@ class NonStationaryFTS(fts.FTS):
fs = self.partitioner.ordered_sets[ix]
tdisp = perturb[ix]
if fs in self.flrgs:
numerator.append(self.flrgs[fs].get_midpoint(self.sets, tdisp) * aset[1])
numerator.append(self.flrgs[fs].get_midpoint(self.partitioner.sets, tdisp) * aset[1])
else:
fuzzy_set = self.sets[fs]
fuzzy_set = self.partitioner.sets[fs]
numerator.append(fuzzy_set.get_midpoint(tdisp) * aset[1])
denominator.append(aset[1])
@ -241,9 +241,9 @@ class NonStationaryFTS(fts.FTS):
tdisp = common.window_index(k + time_displacement, window_size)
affected_sets = [[self.sets[key], self.sets[key].membership(ndata[k], tdisp)]
affected_sets = [[self.partitioner.sets[key], self.partitioner.sets[key].membership(ndata[k], tdisp)]
for key in self.partitioner.ordered_sets
if self.sets[key].membership(ndata[k], tdisp) > 0.0]
if self.partitioner.sets[key].membership(ndata[k], tdisp) > 0.0]
if len(affected_sets) == 0:
affected_sets.append([common.check_bounds(ndata[k], self.partitioner, tdisp), 1.0])

View File

@ -78,7 +78,7 @@ class ExponentialyWeightedFTS(fts.FTS):
if self.partitioner is not None:
ordered_sets = self.partitioner.ordered_sets
else:
ordered_sets = FuzzySet.set_ordered(self.sets)
ordered_sets = FuzzySet.set_ordered(self.partitioner.sets)
data = np.array(ndata)
@ -88,7 +88,7 @@ class ExponentialyWeightedFTS(fts.FTS):
for k in np.arange(0, l):
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets)
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets)
if explain:
print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name))
@ -101,7 +101,7 @@ class ExponentialyWeightedFTS(fts.FTS):
else:
flrg = self.flrgs[actual.name]
mp = flrg.get_midpoints(self.sets)
mp = flrg.get_midpoints(self.partitioner.sets)
final = mp.dot(flrg.weights())

View File

@ -44,8 +44,11 @@ class TimeGridPartitioner(partitioner.Partitioner):
else:
self.ordered_sets = FS.set_ordered(self.sets)
def extractor(self,x):
if self.type == 'seasonal':
self.extractor = lambda x: strip_datepart(x, self.season, self.mask)
return strip_datepart(x, self.season, self.mask)
else:
return x
def build(self, data):
sets = {}

View File

@ -79,7 +79,7 @@ class MarkovWeightedFTS(fts.FTS):
if self.partitioner is not None:
ordered_sets = self.partitioner.ordered_sets
else:
ordered_sets = FuzzySet.set_ordered(self.sets)
ordered_sets = FuzzySet.set_ordered(self.partitioner.sets)
data = np.array(ndata)
@ -89,7 +89,7 @@ class MarkovWeightedFTS(fts.FTS):
for k in np.arange(0, l):
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets)
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets)
if explain:
print("Fuzzyfication:\n\n {} -> {} \n".format(ndata[k], actual.name))
@ -102,7 +102,7 @@ class MarkovWeightedFTS(fts.FTS):
else:
flrg = self.flrgs[actual.name]
mp = flrg.get_midpoints(self.sets)
mp = flrg.get_midpoints(self.partitioner.sets)
final = mp.dot(flrg.weights())

View File

@ -70,7 +70,7 @@ class WeightedFTS(fts.FTS):
if self.partitioner is not None:
ordered_sets = self.partitioner.ordered_sets
else:
ordered_sets = FuzzySet.set_ordered(self.sets)
ordered_sets = FuzzySet.set_ordered(self.partitioner.sets)
ndata = np.array(ndata)
@ -80,7 +80,7 @@ class WeightedFTS(fts.FTS):
for k in np.arange(0, l):
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.sets, ordered_sets)
actual = FuzzySet.get_maximum_membership_fuzzyset(ndata[k], self.partitioner.sets, ordered_sets)
if explain:
print("Fuzzyfication:\n\n {} -> {} \n\n".format(ndata[k], actual.name))
@ -93,9 +93,9 @@ class WeightedFTS(fts.FTS):
else:
flrg = self.flrgs[actual.name]
mp = flrg.get_midpoints(self.sets)
mp = flrg.get_midpoints(self.partitioner.sets)
final = mp.dot(flrg.weights(self.sets))
final = mp.dot(flrg.weights(self.partitioner.sets))
ret.append(final)

View File

@ -31,8 +31,6 @@ class Partitioner(object):
"""In a multivariate context, the variable that contains this partitioner"""
self.type = kwargs.get('type', 'common')
"""The type of fuzzy sets that are generated by this partitioner"""
self.extractor = kwargs.get('extractor', lambda x: x)
"""Anonymous function used to extract a single primitive type from an object instance"""
self.ordered_sets = None
"""A ordered list of the fuzzy sets names, sorted by their middle point"""
self.kdtree = None
@ -76,6 +74,10 @@ class Partitioner(object):
del(ndata)
def extractor(self,x):
"""Extract a single primitive type from an structured instance"""
return x
def build(self, data):
"""
Perform the partitioning of the Universe of Discourse

View File

@ -7,122 +7,39 @@ import numpy as np
import os
from pyFTS.common import Transformations
from copy import deepcopy
from pyFTS.nonstationary import flrg, util, honsfts, partitioners
from pyFTS.models.nonstationary import nsfts
from pyFTS.models import pwfts
from pyFTS.benchmarks import benchmarks as bchmk, Measures
bc = Transformations.BoxCox(0)
import time
import dispy
import dispy.httpd
from pyFTS.data import SONDA, Malaysia
os.chdir("/home/petronio/Dropbox/Doutorado/Codigos/")
datasets = {}
sonda = SONDA.get_dataframe()[['datahora','glo_avg','ws_10m']]
def evaluate_individual_model(model, partitioner, train, test, window_size, time_displacement):
import numpy as np
from pyFTS.partitioners import Grid
from pyFTS.benchmarks import Measures
sonda = sonda.drop(sonda.index[np.where(sonda["ws_10m"] <= 0.01)])
sonda = sonda.drop(sonda.index[np.where(sonda["glo_avg"] <= 0.01)])
sonda = sonda.dropna()
try:
model.train(train, sets=partitioner.sets, order=model.order, parameters=window_size)
forecasts = model.forecast(test, time_displacement=time_displacement, window_size=window_size)
_rmse = Measures.rmse(test[model.order:], forecasts[:-1])
_mape = Measures.mape(test[model.order:], forecasts[:-1])
_u = Measures.UStatistic(test[model.order:], forecasts[:-1])
except Exception as e:
print(e)
_rmse = np.nan
_mape = np.nan
_u = np.nan
malaysia = Malaysia.get_dataframe()
return {'model': model.shortname, 'partitions': partitioner.partitions, 'order': model.order,
'rmse': _rmse, 'mape': _mape, 'u': _u}
datasets['SONDA.ws_10m'] = sonda["ws_10m"].values
datasets['SONDA.glo_avg'] = sonda["glo_avg"].values
datasets['Malaysia.temperature'] = malaysia["temperature"].values
datasets['Malaysia.load'] = malaysia["load"].values
windows = [600000, 600000, 10000, 10000]
data = pd.read_csv("DataSets/synthetic_nonstationary_dataset_A.csv", sep=";")
data = np.array(data["0"][:])
cpus = 3
cluster = dispy.JobCluster(evaluate_individual_model, nodes=['192.168.0.108', '192.168.0.110'])
http_server = dispy.httpd.DispyHTTPServer(cluster)
jobs = []
models = []
for order in [1, 2, 3]:
if order == 1:
model = nsfts.NonStationaryFTS("")
else:
model = honsfts.HighOrderNonStationaryFTS("")
model.order = order
models.append(model)
for ct, train, test in cUtil.sliding_window(data, 300):
for partition in np.arange(5, 100, 1):
tmp_partitioner = Grid.GridPartitioner(train, partition)
partitioner = partitioners.PolynomialNonStationaryPartitioner(train, tmp_partitioner,
window_size=35, degree=1)
for model in models:
# print(model.shortname, partition, model.order)
#job = evaluate_individual_model(model, train, test)
job = cluster.submit(deepcopy(model), deepcopy(partitioner), train, test, 35, 240)
job.id = ct + model.order*100
jobs.append(job)
results = {}
for job in jobs:
tmp = job()
# print(tmp)
if job.status == dispy.DispyJob.Finished and tmp is not None:
_m = tmp['model']
_o = tmp['order']
_p = tmp['partitions']
if _m not in results:
results[_m] = {}
if _o not in results[_m]:
results[_m][_o] = {}
if _p not in results[_m][_o]:
results[_m][_o][_p] = {}
results[_m][_o][_p]['rmse'] = []
results[_m][_o][_p]['mape'] = []
results[_m][_o][_p]['u'] = []
results[_m][_o][_p]['rmse'].append_rhs(tmp['rmse'])
results[_m][_o][_p]['mape'].append_rhs(tmp['mape'])
results[_m][_o][_p]['u'].append_rhs(tmp['u'])
cluster.wait() # wait for all jobs to finish
cluster.print_status()
http_server.shutdown() # this waits until browser gets all updates
cluster.close()
dados = []
ncolunas = None
for m in sorted(results.keys()):
for o in sorted(results[m].keys()):
for p in sorted(results[m][o].keys()):
for r in ['rmse', 'mape', 'u']:
tmp = []
tmp.append(m)
tmp.append(o)
tmp.append(p)
tmp.append(r)
tmp.extend(results[m][o][p][r])
dados.append(tmp)
if ncolunas is None:
ncolunas = len(results[m][o][p][r])
colunas = ["model", "order", "partitions", "metric"]
for k in np.arange(0, ncolunas):
colunas.append(str(k))
dat = pd.DataFrame(dados, columns=colunas)
dat.to_csv("experiments/nsfts_partitioning_A.csv", sep=";")
for ct, (dataset_name, dataset) in enumerate(datasets.items()):
bchmk.train_test_time(dataset, windowsize=windows[ct], train=0.9, inc=.5,
methods=[pwfts.ProbabilisticWeightedFTS],
order=2,
partitions=50,
steps=cpus,
num_batches=cpus,
distributed='dispy', nodes=['192.168.0.110'], #, '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name,
tag="speedup")

View File

@ -46,24 +46,71 @@ Util.plot_distribution2(distributions, test[:10], start_at=model.order, ax=ax, c
print("")
'''
from pyFTS.data import SONDA, Malaysia
def sample_by_hour(data):
return [np.nanmean(data[k:k+60]) for k in np.arange(0,len(data),60)]
datasets = {}
datasets['TAIEX'] = TAIEX.get_data()[:5000]
datasets['NASDAQ'] = NASDAQ.get_data()[:5000]
datasets['SP500'] = SP500.get_data()[10000:15000]
sonda = SONDA.get_dataframe()[['datahora','glo_avg','ws_10m']]
sonda = sonda.drop(sonda.index[np.where(sonda["ws_10m"] <= 0.01)])
sonda = sonda.drop(sonda.index[np.where(sonda["glo_avg"] <= 0.01)])
sonda = sonda.dropna()
malaysia = Malaysia.get_dataframe()
datasets['SONDA.ws_10m'] = sample_by_hour(sonda["ws_10m"].values)
datasets['SONDA.glo_avg'] = sample_by_hour(sonda["glo_avg"].values)
datasets['Malaysia.temperature'] = malaysia["temperature"].values
datasets['Malaysia.load'] = malaysia["load"].values
#'''
for dataset_name, dataset in datasets.items():
bchmk.sliding_window_benchmarks2(dataset, 1000, train=0.8, inc=0.2,
methods=[pwfts.ProbabilisticWeightedFTS],
bchmk.sliding_window_benchmarks2(dataset, 10000, train=0.9, inc=0.25,
methods=[hofts.HighOrderFTS, hofts.WeightedHighOrderFTS, pwfts.ProbabilisticWeightedFTS],
benchmark_models=False,
transformations=[None],
orders=[1, 2, 3],
partitions=np.arange(10, 100, 5),
orders=[2],
partitions=[50],
progress=False, type='point',
distributed=True, nodes=['192.168.254.113'],
distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name,
tag="gridsearch")
tag="experiments")
for dataset_name, dataset in datasets.items():
bchmk.sliding_window_benchmarks2(dataset, 10000, train=0.9, inc=0.25,
methods=[ensemble.SimpleEnsembleFTS, ifts.IntervalFTS,
ifts.WeightedIntervalFTS, pwfts.ProbabilisticWeightedFTS],
methods_parameters=[{'partitions': [45, 50, 55], 'alpha':.05},
{},{},{}],
benchmark_models=False,
transformations=[None],
orders=[2],
partitions=[50],
progress=False, type='interval',
distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name,
tag="experiments")
for dataset_name, dataset in datasets.items():
bchmk.sliding_window_benchmarks2(dataset, 10000, train=0.9, inc=0.25,
methods=[ensemble.SimpleEnsembleFTS, pwfts.ProbabilisticWeightedFTS],
methods_parameters=[{'partitions':[45,50,55]}, {}],
benchmark_models=False,
transformations=[None],
orders=[2],
partitions=[50],
progress=False, type='distribution',
distributed=True, nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name,
tag="experiments")
'''

View File

@ -3,18 +3,21 @@ import pandas as pd
from pyFTS.hyperparam import GridSearch, Evolutionary
from pyFTS.models import pwfts
def get_dataset():
from pyFTS.data import SONDA
#from pyFTS.data import Malaysia
#data = SONDA.get_data('temperature')[:3000]
data = pd.read_csv('https://query.data.world/s/6xfb5useuotbbgpsnm5b2l3wzhvw2i', sep=';')
#data = Malaysia.get_data('temperature')[:1000]
data = [k for k in SONDA.get_data('ws_10m') if k > 0.1 and k != np.nan and k is not None]
data = [np.nanmean(data[k:k+60]) for k in np.arange(0,len(data),60)]
#data = pd.read_csv('https://query.data.world/s/6xfb5useuotbbgpsnm5b2l3wzhvw2i', sep=';')
#data = Malaysia.get_data('temperature')
return 'SONDA.glo_avg', data['glo_avg'].values #train, test
return 'SONDA.ws_10m', data
#return 'Malaysia.temperature', data #train, test
#return 'Malaysia.temperature', data # train, test
'''
hyperparams = {
'order':[3],
'partitions': np.arange(10,100,3),
@ -24,7 +27,6 @@ hyperparams = {
'alpha': np.arange(.0, .5, .05)
}
'''
hyperparams = {
'order':[3], #[1, 2],
'partitions': np.arange(10,100,10),
@ -43,11 +45,11 @@ datsetname, dataset = get_dataset()
ret = Evolutionary.execute(datsetname, dataset,
ngen=30, npop=20,psel=0.6, pcross=.5, pmut=.3,
window_size=10000, train_rate=.9, increment_rate=1,
experiments=1,
window_size=10000, train_rate=.9, increment_rate=.3,
experiments=2,
fts_method=pwfts.ProbabilisticWeightedFTS,
database_file='experiments.db',
distributed=False, nodes=nodes)
distributed='dispy', nodes=nodes)
#res = GridSearch.cluster_method({'mf':1, 'partitioner': 1, 'npart': 10, 'lags':[1], 'alpha': 0.0, 'order': 1},
# dataset, window_size = 10000, train_rate = .9, increment_rate = 1)