From 876de2721d4163662f43952525230b4d08cdee6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido?= Date: Mon, 5 Aug 2019 10:24:29 -0300 Subject: [PATCH] Bugfixes and improvements in MVFTS and DEHO --- pyFTS/common/fts.py | 4 +- pyFTS/data/Malaysia.py | 1 - pyFTS/hyperparam/Evolutionary.py | 25 +- pyFTS/hyperparam/Util.py | 5 +- pyFTS/hyperparam/mvfts.py | 407 ++++++++++++++++++++++++++- pyFTS/models/multivariate/FLR.py | 2 +- pyFTS/models/multivariate/mvfts.py | 17 +- pyFTS/models/seasonal/partitioner.py | 27 +- pyFTS/partitioners/partitioner.py | 9 +- pyFTS/tests/hyperparam.py | 45 ++- pyFTS/tests/multivariate.py | 38 +-- 11 files changed, 499 insertions(+), 81 deletions(-) diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index 25a2d9f..4acee86 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -332,7 +332,7 @@ class FTS(object): dump = kwargs.get('dump', None) - num_batches = kwargs.get('num_batches', 10) + num_batches = kwargs.get('num_batches', None) save = kwargs.get('save_model', False) # save model on disk @@ -345,6 +345,8 @@ class FTS(object): batch_save_interval = kwargs.get('batch_save_interval', 10) if distributed is not None and distributed: + if num_batches is None: + num_batches = 10 if distributed == 'dispy': from pyFTS.distributed import dispy diff --git a/pyFTS/data/Malaysia.py b/pyFTS/data/Malaysia.py index 3d79d5a..0d794b1 100644 --- a/pyFTS/data/Malaysia.py +++ b/pyFTS/data/Malaysia.py @@ -30,5 +30,4 @@ def get_dataframe(): return df - return df diff --git a/pyFTS/hyperparam/Evolutionary.py b/pyFTS/hyperparam/Evolutionary.py index 8bb7f7b..3bc968b 100644 --- a/pyFTS/hyperparam/Evolutionary.py +++ b/pyFTS/hyperparam/Evolutionary.py @@ -69,13 +69,16 @@ def initial_population(n, **kwargs): :param n: the size of the population :return: a list with n random individuals """ + + create_random_individual = kwargs.get('random_individual', random_genotype) + pop = [] for i in range(n): - pop.append(random_genotype(**kwargs)) + pop.append(create_random_individual(**kwargs)) return pop -def phenotype(individual, train, fts_method, parameters={}): +def phenotype(individual, train, fts_method, parameters={}, **kwargs): """ Instantiate the genotype, creating a fitted model with the genotype hyperparameters @@ -96,10 +99,10 @@ def phenotype(individual, train, fts_method, parameters={}): else: mf = Membership.trimf - #if individual['partitioner'] == 1: - partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) - #elif individual['partitioner'] == 2: - # partitioner = Entropy.EntropyPartitioner(data=train, npart=individual['npart'], func=mf) + if individual['partitioner'] == 1: + partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) + elif individual['partitioner'] == 2: + partitioner = Entropy.EntropyPartitioner(data=train, npart=individual['npart'], func=mf) model = fts_method(partitioner=partitioner, lags=individual['lags'], @@ -243,8 +246,10 @@ def crossover(population, **kwargs): n = len(population) - 1 - r1 = random.randint(0, n) - r2 = random.randint(0, n) + r1, r2 = 0, 0 + while r1 == r2: + r1 = random.randint(0, n) + r2 = random.randint(0, n) if population[r1]['f1'] < population[r2]['f1']: best = population[r1] @@ -304,9 +309,6 @@ def mutation(individual, **kwargs): :param pmut: individual probability o :return: """ - import numpy.random - - print('mutation') individual['npart'] = min(50, max(3, int(individual['npart'] + np.random.normal(0, 4)))) individual['alpha'] = min(.5, max(0, individual['alpha'] + np.random.normal(0, .5))) @@ -572,6 +574,7 @@ def execute(datasetname, dataset, **kwargs): :keyword parameters: dict with model specific arguments for fts_method :keyword elitism: A boolean value indicating if the best individual must always survive to next population :keyword initial_operator: a function that receives npop and return a random population with size npop + :keyword random_individual: create an random genotype :keyword evalutation_operator: a function that receives a dataset and an individual and return its fitness :keyword selection_operator: a function that receives the whole population and return a selected individual :keyword crossover_operator: a function that receives the whole population and return a descendent individual diff --git a/pyFTS/hyperparam/Util.py b/pyFTS/hyperparam/Util.py index 48be9c6..7cc4252 100644 --- a/pyFTS/hyperparam/Util.py +++ b/pyFTS/hyperparam/Util.py @@ -1,9 +1,10 @@ """ -Common facilities for hyperparameter tunning +Common facilities for hyperparameter optimization """ import sqlite3 + def open_hyperparam_db(name): """ Open a connection with a Sqlite database designed to store benchmark results. @@ -66,4 +67,4 @@ def insert_hyperparam(data, conn): + "Transformation, mf, 'Order', Partitioner, Partitions, " + "alpha, lags, Measure, Value) " + "VALUES(datetime('now'),?,?,?,?,?,?,?,?,?,?,?,?)", data) - conn.commit() \ No newline at end of file + conn.commit() diff --git a/pyFTS/hyperparam/mvfts.py b/pyFTS/hyperparam/mvfts.py index f1341ae..27bbaf3 100644 --- a/pyFTS/hyperparam/mvfts.py +++ b/pyFTS/hyperparam/mvfts.py @@ -1,25 +1,58 @@ """ Distributed Evolutionary Hyperparameter Optimization (DEHO) for MVFTS + +variables: A list of dictionaries, where each dictionary contains +- name: Variable name +- data_label: data label +- type: common | seasonal +- seasonality: + +target_variable + +genotype: A dictionary containing +- variables: a list with the selected variables, each instance is the index of a variable in variables +- params: a list of dictionaries, where each dictionary contains {mf, npart, partitioner, alpha} + """ + import numpy as np import pandas as pd import math +import time import random +import logging +from pyFTS.common import Util +from pyFTS.benchmarks import Measures +from pyFTS.partitioners import Grid, Entropy # , Huarng +from pyFTS.common import Membership +from pyFTS.models import hofts, ifts, pwfts +from pyFTS.hyperparam import Util as hUtil +from pyFTS.distributed import dispy as dUtil from pyFTS.hyperparam import Evolutionary +from pyFTS.models.multivariate import mvfts, wmvfts, variable +from pyFTS.models.seasonal import partitioner as seasonal +from pyFTS.models.seasonal.common import DateTime -def genotype(vars, params, f1, f2): +def genotype(vars, params, tparams, f1=None, f2=None): """ Create the individual genotype - :param vars: dictionary with variable names, types, and other parameters + :param variables: dictionary with explanatory variable names, types, and other parameters :param params: dictionary with variable hyperparameters var: {mf, npart, partitioner, alpha} + :param tparams: dictionary with target variable hyperparameters var: {mf, npart, partitioner, alpha} :param f1: accuracy fitness value :param f2: parsimony fitness value :return: the genotype, a dictionary with all hyperparameters """ - ind = dict(vars=vars, params=params, f1=f1, f2=f2) + ind = dict( + explanatory_variables=vars, + explanatory_params=params, + target_params = tparams, + f1=f1, + f2=f2 + ) return ind @@ -29,21 +62,365 @@ def random_genotype(**kwargs): :return: the genotype, a dictionary with all hyperparameters """ - order = random.randint(1, 3) - lags = [k for k in np.arange(1, order+1)] + vars = kwargs.get('variables',None) + + tvar = kwargs.get('target_variable',None) + + l = len(vars) + + nvar = np.random.randint(1,l,1) # the number of variables + + explanatory_variables = np.unique(np.random.randint(0, l, nvar)).tolist() #indexes of the variables + + explanatory_params = [] + + for v in explanatory_variables: + param = { + 'mf': random.randint(1, 4), + 'npart': random.randint(10, 50), + 'partitioner': 1, #random.randint(1, 2), + 'alpha': random.uniform(0, .5) + } + explanatory_params.append(param) + + target_params = { + 'mf': random.randint(1, 4), + 'npart': random.randint(10, 50), + 'partitioner': 1, #random.randint(1, 2), + 'alpha': random.uniform(0, .5) + } + return genotype( - random.randint(1, 4), - random.randint(10, 100), - random.randint(1, 2), - order, - random.uniform(0, .5), - lags, - None, - None + explanatory_variables, + explanatory_params, + target_params ) +def phenotype(individual, train, fts_method, parameters={}, **kwargs): + vars = kwargs.get('variables', None) + tvar = kwargs.get('target_variable', None) -def phenotype(individual, train, fts_method, parameters={}): - pass + explanatory_vars = [] + for ct, vix in enumerate(individual['explanatory_variables']): + var = vars[vix] + params = individual['explanatory_params'][ct] + + mf = phenotype_mf(params) + + partitioner = phenotype_partitioner(params) + + if var['type'] == 'common': + tmp = variable.Variable(var['name'], data_label=var['data_label'], alias=var['name'], partitioner=partitioner, + partitioner_specific={'mf': mf}, npart=params['npart'], alpha_cut=params['alpha'], + data=train) + elif var['type'] == 'seasonal': + sp = {'seasonality': var['seasonality'], 'mf': mf } + tmp = variable.Variable(var['name'], data_label=var['data_label'], alias=var['name'], + partitioner=seasonal.TimeGridPartitioner, + partitioner_specific=sp, npart=params['npart'], alpha_cut=params['alpha'], + data=train) + + explanatory_vars.append(tmp) + + tparams = individual['target_params'] + + partitioner = phenotype_partitioner(tparams) + mf = phenotype_mf(tparams) + + target_var = variable.Variable(tvar['name'], data_label=tvar['data_label'], alias=tvar['name'], partitioner=partitioner, + partitioner_specific={'mf': mf}, npart=tparams['npart'], alpha_cut=tparams['alpha'], + data=train) + + model = fts_method(explanatory_variables=explanatory_vars, target_variable=target_var, **parameters) + model.fit(train, **parameters) + + return model + + +def phenotype_partitioner(params): + if params['partitioner'] == 1: + partitioner = Grid.GridPartitioner + elif params['partitioner'] == 2: + partitioner = Entropy.EntropyPartitioner + return partitioner + + +def phenotype_mf(params): + if params['mf'] == 1: + mf = Membership.trimf + elif params['mf'] == 2: + mf = Membership.trapmf + elif params['mf'] == 3 and params['partitioner'] != 2: + mf = Membership.gaussmf + else: + mf = Membership.trimf + return mf + + +def evaluate(dataset, individual, **kwargs): + """ + Evaluate an individual using a sliding window cross validation over the dataset. + + :param dataset: Evaluation dataset + :param individual: genotype to be tested + :param window_size: The length of scrolling window for train/test on dataset + :param train_rate: The train/test split ([0,1]) + :param increment_rate: The increment of the scrolling window, relative to the window_size ([0,1]) + :param parameters: dict with model specific arguments for fit method. + :return: a tuple (len_lags, rmse) with the parsimony fitness value and the accuracy fitness value + """ + from pyFTS.models import hofts, ifts, pwfts + from pyFTS.common import Util + from pyFTS.benchmarks import Measures + from pyFTS.hyperparam.Evolutionary import __measures + from pyFTS.hyperparam.mvfts import phenotype + from pyFTS.models.multivariate import mvfts, wmvfts, partitioner, variable, cmvfts,grid, granular, common + import numpy as np + + window_size = kwargs.get('window_size', 800) + train_rate = kwargs.get('train_rate', .8) + increment_rate = kwargs.get('increment_rate', .2) + fts_method = kwargs.get('fts_method', wmvfts.WeightedMVFTS) + parameters = kwargs.get('parameters',{}) + tvar = kwargs.get('target_variable', None) + + if individual['f1'] is not None and individual['f2'] is not None: + return { key: individual[key] for key in __measures } + + errors = [] + lengths = [] + + kwargs2 = kwargs.copy() + kwargs2.pop('fts_method') + if 'parameters' in kwargs2: + kwargs2.pop('parameters') + + for count, train, test in Util.sliding_window(dataset, window_size, train=train_rate, inc=increment_rate): + + try: + + model = phenotype(individual, train, fts_method=fts_method, parameters=parameters, **kwargs2) + + forecasts = model.predict(test) + + rmse = Measures.rmse(test[tvar['data_label']].values[model.max_lag:], forecasts[:-1]) + lengths.append(len(model)) + + errors.append(rmse) + + except Exception as ex: + logging.exception("Error") + + lengths.append(np.nan) + errors.append(np.nan) + + try: + _rmse = np.nanmean(errors) + _len = np.nanmean(lengths) + + f1 = np.nansum([.6 * _rmse, .4 * np.nanstd(errors)]) + f2 = np.nansum([.9 * _len, .1 * np.nanstd(lengths)]) + + return {'f1': f1, 'f2': f2, 'rmse': _rmse, 'size': _len } + except Exception as ex: + logging.exception("Error") + return {'f1': np.inf, 'f2': np.inf, 'rmse': np.inf, 'size': np.inf} + + +def crossover(population, **kwargs): + """ + Crossover operation between two parents + + :param population: the original population + :return: a genotype + """ + import random + + n = len(population) - 1 + + r1,r2 = 0,0 + while r1 == r2: + r1 = random.randint(0, n) + r2 = random.randint(0, n) + + if population[r1]['f1'] < population[r2]['f1']: + best = population[r1] + worst = population[r2] + else: + best = population[r2] + worst = population[r1] + + rnd = random.uniform(0, 1) + nvar = len(best['explanatory_variables']) if rnd < .7 else len(worst['explanatory_variables']) + + explanatory_variables = [] + explanatory_params = [] + for ct in np.arange(nvar): + if ct < len(best['explanatory_variables']) and ct < len(worst['explanatory_variables']): + rnd = random.uniform(0, 1) + ix = best['explanatory_variables'][ct] if rnd < .7 else worst['explanatory_variables'][ct] + elif ct < len(best['explanatory_variables']): + ix = best['explanatory_variables'][ct] + elif ct < len(worst['explanatory_variables']): + ix = worst['explanatory_variables'][ct] + + if ix in explanatory_variables: + continue + + if ix in best['explanatory_variables'] and ix in worst['explanatory_variables']: + bix = best['explanatory_variables'].index(ix) + wix = worst['explanatory_variables'].index(ix) + param = crossover_variable_params(best['explanatory_params'][bix], worst['explanatory_params'][wix]) + elif ix in best['explanatory_variables']: + bix = best['explanatory_variables'].index(ix) + param = best['explanatory_params'][bix] + elif ix in worst['explanatory_variables']: + wix = worst['explanatory_variables'].index(ix) + param = worst['explanatory_params'][wix] + + explanatory_variables.append(ix) + explanatory_params.append(param) + + tparams = crossover_variable_params(best['target_params'], worst['target_params']) + + descendent = genotype(explanatory_variables, explanatory_params, tparams, None, None) + + return descendent + + +def crossover_variable_params(best, worst): + npart = int(round(.7 * best['npart'] + .3 * worst['npart'])) + alpha = float(.7 * best['alpha'] + .3 * worst['alpha']) + rnd = random.uniform(0, 1) + mf = best['mf'] if rnd < .7 else worst['mf'] + rnd = random.uniform(0, 1) + partitioner = best['partitioner'] if rnd < .7 else worst['partitioner'] + param = {'partitioner': partitioner, 'npart': npart, 'alpha': alpha, 'mf': mf} + return param + +def mutation(individual, **kwargs): + """ + Mutation operator + + :param individual: an individual genotype + :param pmut: individual probability o + :return: + """ + + for ct in np.arange(len(individual['explanatory_variables'])): + rnd = random.uniform(0, 1) + if rnd > .5: + mutate_variable_params(individual['explanatory_params'][ct]) + + rnd = random.uniform(0, 1) + if rnd > .5: + mutate_variable_params(individual['target_params']) + + individual['f1'] = None + individual['f2'] = None + + return individual + + +def mutate_variable_params(param): + param['npart'] = min(50, max(3, int(param['npart'] + np.random.normal(0, 4)))) + param['alpha'] = min(.5, max(0, param['alpha'] + np.random.normal(0, .5))) + param['mf'] = random.randint(1, 4) + param['partitioner'] = random.randint(1, 2) + + +def execute(datasetname, dataset, **kwargs): + """ + Batch execution of Distributed Evolutionary Hyperparameter Optimization (DEHO) for monovariate methods + + :param datasetname: + :param dataset: The time series to optimize the FTS + :keyword database_file: + :keyword experiments: + :keyword distributed: + :keyword ngen: An integer value with the maximum number of generations, default value: 30 + :keyword mgen: An integer value with the maximum number of generations without improvement to stop, default value 7 + :keyword npop: An integer value with the population size, default value: 20 + :keyword pcross: A float value between 0 and 1 with the probability of crossover, default: .5 + :keyword psel: A float value between 0 and 1 with the probability of selection, default: .5 + :keyword pmut: A float value between 0 and 1 with the probability of mutation, default: .3 + :keyword fts_method: The MVFTS method to optimize + :keyword parameters: dict with model specific arguments for fts_method + :keyword elitism: A boolean value indicating if the best individual must always survive to next population + :keyword selection_operator: a function that receives the whole population and return a selected individual + :keyword window_size: An integer value with the the length of scrolling window for train/test on dataset + :keyword train_rate: A float value between 0 and 1 with the train/test split ([0,1]) + :keyword increment_rate: A float value between 0 and 1 with the the increment of the scrolling window, + relative to the window_size ([0,1]) + :keyword collect_statistics: A boolean value indicating to collect statistics for each generation + :keyword distributed: A value indicating it the execution will be local and sequential (distributed=False), + or parallel and distributed (distributed='dispy' or distributed='spark') + :keyword cluster: If distributed='dispy' the list of cluster nodes, else if distributed='spark' it is the master node + :return: the best genotype + """ + + experiments = kwargs.get('experiments', 30) + + distributed = kwargs.get('distributed', False) + + fts_method = kwargs.get('fts_method', hofts.WeightedHighOrderFTS) + shortname = str(fts_method.__module__).split('.')[-1] + + kwargs['mutation_operator'] = mutation + kwargs['crossover_operator'] = crossover + kwargs['evaluation_operator'] = evaluate + kwargs['random_individual'] = random_genotype + + if distributed == 'dispy': + nodes = kwargs.get('nodes', ['127.0.0.1']) + cluster, http_server = dUtil.start_dispy_cluster(evaluate, nodes=nodes) + kwargs['cluster'] = cluster + + ret = [] + for i in np.arange(experiments): + print("Experiment {}".format(i)) + + start = time.time() + ret, statistics = Evolutionary.GeneticAlgorithm(dataset, **kwargs) + end = time.time() + ret['time'] = end - start + experiment = {'individual': ret, 'statistics': statistics} + + ret = process_experiment(shortname, experiment, datasetname) + + if distributed == 'dispy': + dUtil.stop_dispy_cluster(cluster, http_server) + + return ret + + +def process_experiment(fts_method, result, datasetname): + """ + Persist the results of an DEHO execution in sqlite database (best hyperparameters) and json file (generation statistics) + + :param fts_method: + :param result: + :param datasetname: + :param conn: + :return: + """ + + log_result(datasetname, fts_method, result['individual']) + persist_statistics(datasetname, result['statistics']) + return result['individual'] + + +def persist_statistics(datasetname, statistics): + import json + with open('statistics_{}.json'.format(datasetname), 'w') as file: + file.write(json.dumps(statistics)) + + +def log_result(datasetname, fts_method, result): + import json + with open('result_{}{}.json'.format(fts_method,datasetname), 'w') as file: + file.write(json.dumps(result)) + + print(result) diff --git a/pyFTS/models/multivariate/FLR.py b/pyFTS/models/multivariate/FLR.py index d5ace61..c50b054 100644 --- a/pyFTS/models/multivariate/FLR.py +++ b/pyFTS/models/multivariate/FLR.py @@ -19,7 +19,7 @@ class FLR(object): self.RHS = set def __str__(self): - return "{} -> {}".format([self.LHS[k] for k in self.LHS.keys()], self.RHS) + return "{} -> {}".format([k for k in self.LHS.values()], self.RHS) diff --git a/pyFTS/models/multivariate/mvfts.py b/pyFTS/models/multivariate/mvfts.py index f688638..632b5db 100644 --- a/pyFTS/models/multivariate/mvfts.py +++ b/pyFTS/models/multivariate/mvfts.py @@ -3,6 +3,7 @@ from pyFTS.partitioners import Grid from pyFTS.models.multivariate import FLR as MVFLR, common, flrg as mvflrg from itertools import product from types import LambdaType +from copy import deepcopy import numpy as np import pandas as pd @@ -75,19 +76,19 @@ class MVFTS(fts.FTS): for path in product_dict(**lags): flr = MVFLR.FLR() - for var, fset in path.items(): - flr.set_lhs(var, fset) + flr.LHS = path + + #for var, fset in path.items(): + # flr.set_lhs(var, fset) if len(flr.LHS.keys()) == len(self.explanatory_variables): flrs.append(flr) - else: - print(flr) return flrs def generate_flrs(self, data): flrs = [] - for ct in range(1, len(data.index)): + for ct in np.arange(1, len(data.index)): ix = data.index[ct-1] data_point = self.format_data( data.loc[ix] ) @@ -99,8 +100,9 @@ class MVFTS(fts.FTS): for flr in tmp_flrs: for v, s in target: - flr.set_rhs(s) - flrs.append(flr) + new_flr = deepcopy(flr) + new_flr.set_rhs(s) + flrs.append(new_flr) return flrs @@ -113,7 +115,6 @@ class MVFTS(fts.FTS): self.flrgs[flrg.get_key()].append_rhs(flr.RHS) - def train(self, data, **kwargs): ndata = self.apply_transformations(data) diff --git a/pyFTS/models/seasonal/partitioner.py b/pyFTS/models/seasonal/partitioner.py index 144e227..9af0fd5 100644 --- a/pyFTS/models/seasonal/partitioner.py +++ b/pyFTS/models/seasonal/partitioner.py @@ -72,55 +72,54 @@ class TimeGridPartitioner(partitioner.Partitioner): partlen = self.season.value / self.partitions pl2 = partlen / 2 - count = 0 - for c in np.arange(self.min, self.max, partlen): + for count, midpoint in enumerate(np.arange(self.min, self.max, partlen)): set_name = self.get_name(count) if self.membership_function == Membership.trimf: - if c == self.min: + if midpoint == self.min or count == 0: tmp = Composite(set_name, superset=True, **kwargs) tmp.append_set(FuzzySet(self.season, set_name, Membership.trimf, [self.season.value - pl2, self.season.value, self.season.value + pl2], self.season.value, alpha=1, **kwargs)) tmp.append_set(FuzzySet(self.season, set_name, Membership.trimf, - [c - partlen, c, c + partlen], c, + [midpoint - partlen, midpoint, midpoint + partlen], midpoint, **kwargs)) - tmp.centroid = c + tmp.centroid = midpoint sets[set_name] = tmp - elif c == self.max - partlen: + elif midpoint == self.max - partlen or count == self.partitions - 1: tmp = Composite(set_name, superset=True, **kwargs) tmp.append_set(FuzzySet(self.season, set_name, Membership.trimf, [-pl2, 0.0, pl2], 0.0, alpha=1, **kwargs)) tmp.append_set(FuzzySet(self.season, set_name, Membership.trimf, - [c - partlen, c, c + partlen], c, + [midpoint - partlen, midpoint, midpoint + partlen], midpoint, **kwargs)) - tmp.centroid = c + tmp.centroid = midpoint sets[set_name] = tmp else: sets[set_name] = FuzzySet(self.season, set_name, Membership.trimf, - [c - partlen, c, c + partlen], c, + [midpoint - partlen, midpoint, midpoint + partlen], midpoint, **kwargs) elif self.membership_function == Membership.gaussmf: - sets[set_name] = FuzzySet(self.season, set_name, Membership.gaussmf, [c, partlen / 3], c, + sets[set_name] = FuzzySet(self.season, set_name, Membership.gaussmf, [midpoint, partlen / 3], midpoint, **kwargs) elif self.membership_function == Membership.trapmf: q = partlen / 4 - if c == self.min: + if midpoint == self.min: tmp = Composite(set_name, superset=True) tmp.append_set(FuzzySet(self.season, set_name, Membership.trimf, [self.season.value - pl2, self.season.value, self.season.value + 0.0000001], 0, **kwargs)) tmp.append_set(FuzzySet(self.season, set_name, Membership.trapmf, - [c - partlen, c - q, c + q, c + partlen], c, + [midpoint - partlen, midpoint - q, midpoint + q, midpoint + partlen], midpoint, **kwargs)) - tmp.centroid = c + tmp.centroid = midpoint sets[set_name] = tmp else: sets[set_name] = FuzzySet(self.season, set_name, Membership.trapmf, - [c - partlen, c - q, c + q, c + partlen], c, + [midpoint - partlen, midpoint - q, midpoint + q, midpoint + partlen], midpoint, **kwargs) count += 1 diff --git a/pyFTS/partitioners/partitioner.py b/pyFTS/partitioners/partitioner.py index 0c4ea96..5bd991f 100644 --- a/pyFTS/partitioners/partitioner.py +++ b/pyFTS/partitioners/partitioner.py @@ -169,9 +169,12 @@ class Partitioner(object): if method == 'fuzzy' and mode == 'vector': return mv elif method == 'fuzzy' and mode == 'sets': - ix = np.ravel(np.argwhere(mv > 0.)) - sets = [self.ordered_sets[i] for i in ix] - return sets + try: + ix = np.ravel(np.argwhere(mv > 0.)) + sets = [self.ordered_sets[i] for i in ix if i < self.partitions] + return sets + except Exception as ex: + return None elif method == 'maximum' and mode == 'sets': mx = max(mv) ix = np.ravel(np.argwhere(mv == mx)) diff --git a/pyFTS/tests/hyperparam.py b/pyFTS/tests/hyperparam.py index 899299a..5a7ef69 100644 --- a/pyFTS/tests/hyperparam.py +++ b/pyFTS/tests/hyperparam.py @@ -1,20 +1,23 @@ import numpy as np import pandas as pd -from pyFTS.hyperparam import GridSearch, Evolutionary +from pyFTS.hyperparam import GridSearch, Evolutionary, mvfts as deho_mv from pyFTS.models import pwfts +from pyFTS.models.multivariate import mvfts, wmvfts +from pyFTS.models.seasonal.common import DateTime def get_dataset(): - from pyFTS.data import SONDA - #from pyFTS.data import Malaysia + #from pyFTS.data import SONDA + from pyFTS.data import Malaysia - data = [k for k in SONDA.get_data('ws_10m') if k > 0.1 and k != np.nan and k is not None] - data = [np.nanmean(data[k:k+60]) for k in np.arange(0,len(data),60)] + #data = [k for k in SONDA.get_data('ws_10m') if k > 0.1 and k != np.nan and k is not None] + #data = [np.nanmean(data[k:k+60]) for k in np.arange(0,len(data),60)] #data = pd.read_csv('https://query.data.world/s/6xfb5useuotbbgpsnm5b2l3wzhvw2i', sep=';') - #data = Malaysia.get_data('temperature') + data = Malaysia.get_dataframe() + data['time'] = pd.to_datetime(data["time"], format='%m/%d/%y %I:%M %p') - return 'SONDA.ws_10m', data - #return 'Malaysia.temperature', data #train, test + #return 'SONDA.ws_10m', data + return 'Malaysia', data.iloc[:5000] #train, test #return 'Malaysia.temperature', data # train, test ''' @@ -43,6 +46,30 @@ datsetname, dataset = get_dataset() #GridSearch.execute(hyperparams, datsetname, dataset, nodes=nodes, # window_size=10000, train_rate=.9, increment_rate=1,) +explanatory_variables =[ + {'name': 'Load', 'data_label': 'load', 'type': 'common'}, + {'name': 'Temperature', 'data_label': 'temperature', 'type': 'common'}, + {'name': 'Daily', 'data_label': 'time', 'type': 'seasonal', 'seasonality': DateTime.minute_of_day, 'npart': 24 }, + {'name': 'Weekly', 'data_label': 'time', 'type': 'seasonal', 'seasonality': DateTime.day_of_week, 'npart': 7 }, + #{'name': 'Monthly', 'data_label': 'time', 'type': 'seasonal', 'seasonality': DateTime.day_of_month, 'npart': 4 }, + {'name': 'Yearly', 'data_label': 'time', 'type': 'seasonal', 'seasonality': DateTime.day_of_year, 'npart': 12 } +] + +target_variable = {'name': 'Load', 'data_label': 'load', 'type': 'common'} +nodes=['192.168.28.38'] +deho_mv.execute(datsetname, dataset, + ngen=10, npop=10,psel=0.6, pcross=.5, pmut=.3, + window_size=5000, train_rate=.9, increment_rate=1, + experiments=1, + fts_method=wmvfts.WeightedMVFTS, + variables=explanatory_variables, + target_variable=target_variable, + distributed='dispy', nodes=nodes, + #parameters=dict(num_batches=5) + #parameters=dict(distributed='dispy', nodes=nodes, num_batches=5) + ) + +''' ret = Evolutionary.execute(datsetname, dataset, ngen=30, npop=20,psel=0.6, pcross=.5, pmut=.3, window_size=10000, train_rate=.9, increment_rate=.3, @@ -50,7 +77,7 @@ ret = Evolutionary.execute(datsetname, dataset, fts_method=pwfts.ProbabilisticWeightedFTS, database_file='experiments.db', distributed='dispy', nodes=nodes) - +''' #res = GridSearch.cluster_method({'mf':1, 'partitioner': 1, 'npart': 10, 'lags':[1], 'alpha': 0.0, 'order': 1}, # dataset, window_size = 10000, train_rate = .9, increment_rate = 1) diff --git a/pyFTS/tests/multivariate.py b/pyFTS/tests/multivariate.py index ec90ae5..e8510ec 100644 --- a/pyFTS/tests/multivariate.py +++ b/pyFTS/tests/multivariate.py @@ -16,33 +16,39 @@ from pyFTS.models.seasonal.common import DateTime from pyFTS.models.multivariate import common, variable, mvfts from pyFTS.partitioners import Grid from pyFTS.common import Membership - - import os -from pyFTS.data import NASDAQ +from pyFTS.data import Malaysia, Enrollments -train_data = NASDAQ.get_data()[:2000] -test_data = NASDAQ.get_data()[2000:3000] +df = Malaysia.get_dataframe() +df['time'] = pd.to_datetime(df["time"], format='%m/%d/%y %I:%M %p') -from pyFTS.partitioners import Grid +train_mv = df.iloc[:4500] +test_mv = df.iloc[4500:5000] -partitioner = Grid.GridPartitioner(data=train_data, npart=35) +del(df) -from pyFTS.models import pwfts, hofts +sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k)+'hs' for k in range(0,24)]} -#model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner, order=2) -#from pyFTS.models.incremental import TimeVariant +vhour = variable.Variable("Hour", data_label="time", partitioner=seasonal.TimeGridPartitioner, npart=24, + data=train_mv, partitioner_specific=sp, alpha_cut=.3) -#model = TimeVariant.Retrainer(partitioner_method=Grid.GridPartitioner, partitioner_params={'npart': 35}, -# fts_method=pwfts.ProbabilisticWeightedFTS, fts_params={}, order=2 , -# batch_size=100, window_length=500) +vtemp = variable.Variable("Temperature", data_label="temperature", alias='temp', + partitioner=Grid.GridPartitioner, npart=15, func=Membership.gaussmf, + data=train_mv, alpha_cut=.3) -model = hofts.HighOrderFTS(partitioner=partitioner, order=2) -model.fit(train_data) +vload = variable.Variable("Load", data_label="load", alias='load', + partitioner=Grid.GridPartitioner, npart=20, func=Membership.trimf, + data=train_mv, alpha_cut=.3) -print(model.predict(test_data, steps_ahead=10)) +model = mvfts.MVFTS(explanatory_variables=[vhour, vtemp, vload], target_variable=vload) +#fs = Grid.GridPartitioner(data=Enrollments.get_data(), npart=10) +#print(fs) +#model = pwfts.ProbabilisticWeightedFTS(partitioner=vload.partitioner, order=2) +model.fit(train_mv) #, num_batches=10) #, distributed='dispy',nodes=['192.168.0.110']) +#model.fit(Enrollments.get_data()) #, num_batches=20) #, distributed='dispy',nodes=['192.168.0.110']) +print(model) ''' def sample_by_hour(data):