diff --git a/pyFTS/common/Composite.py b/pyFTS/common/Composite.py index 1a4083a..c381bab 100644 --- a/pyFTS/common/Composite.py +++ b/pyFTS/common/Composite.py @@ -11,12 +11,14 @@ class FuzzySet(FuzzySet.FuzzySet): """ Composite Fuzzy Set """ - def __init__(self, name, superset=False): + def __init__(self, name, superset=False, **kwargs): """ Create an empty composite fuzzy set :param name: fuzzy set name """ - super(FuzzySet, self).__init__(name, None, None, None, type='composite') + if 'type' in kwargs: + kwargs.pop('type') + super(FuzzySet, self).__init__(name, None, None, None, type='composite', **kwargs) self.superset = superset if self.superset: self.sets = [] diff --git a/pyFTS/common/Util.py b/pyFTS/common/Util.py index 352d7bc..e24afb6 100644 --- a/pyFTS/common/Util.py +++ b/pyFTS/common/Util.py @@ -233,7 +233,6 @@ def simple_model_train(model, data, parameters): return model - def distributed_train(model, train_method, nodes, fts_method, data, num_batches=10, train_parameters={}, **kwargs): import dispy, dispy.httpd, datetime diff --git a/pyFTS/hyperparam/GridSearch.py b/pyFTS/hyperparam/GridSearch.py new file mode 100644 index 0000000..302d024 --- /dev/null +++ b/pyFTS/hyperparam/GridSearch.py @@ -0,0 +1,127 @@ + +from pyFTS.common import Util, Membership +from pyFTS.models import hofts +from pyFTS.partitioners import Grid, Entropy +from pyFTS.benchmarks import Measures +from pyFTS.hyperparam import Util as hUtil +import numpy as np +import dispy +from itertools import product + + +def dict_individual(mf, partitioner, partitions, order, lags, alpha_cut): + return { + 'mf': mf, + 'partitioner': partitioner, + 'npart': partitions, + 'alpha': alpha_cut, + 'order': order, + 'lags': lags + } + + +def metodo_cluster(individual, train, test): + from pyFTS.common import Util, Membership + from pyFTS.models import hofts + from pyFTS.partitioners import Grid, Entropy + from pyFTS.benchmarks import Measures + + if individual['mf'] == 1: + mf = Membership.trimf + elif individual['mf'] == 2: + mf = Membership.trapmf + elif individual['mf'] == 3 and individual['partitioner'] != 2: + mf = Membership.gaussmf + else: + mf = Membership.trimf + + if individual['partitioner'] == 1: + partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) + elif individual['partitioner'] == 2: + npart = individual['npart'] if individual['npart'] > 10 else 10 + partitioner = Entropy.EntropyPartitioner(data=train, npart=npart, func=mf) + + + model = hofts.HighOrderFTS(partitioner=partitioner, + lags=individual['lags'], + alpha_cut=individual['alpha'], + order=individual['order']) + + model.fit(train) + + rmse, mape, u = Measures.get_point_statistics(test, model) + + return individual, rmse + + +def execute(hyperparams, datasetname, train, test, **kwargs): + + nodes = kwargs.get('nodes',['127.0.0.1']) + + individuals = [] + + if 'lags' in hyperparams: + lags = hyperparams.pop('lags') + else: + lags = [k for k in np.arange(50)] + + keys_sorted = [k for k in sorted(hyperparams.keys())] + + index = {} + for k in np.arange(len(keys_sorted)): + index[keys_sorted[k]] = k + + hp_values = [ + [v for v in hyperparams[hp]] + for hp in keys_sorted + ] + + for instance in product(*hp_values): + partitions = instance[index['partitions']] + partitioner = instance[index['partitioner']] + mf = instance[index['mf']] + alpha_cut = instance[index['alpha']] + order = instance[index['order']] + for lag1 in lags: # o é o lag1 + _lags = [lag1] + if order > 1: + for lag2 in lags: # o é o lag1 + _lags2 = [lag1, lag1+lag2] + if order > 2: + for lag3 in lags: # o é o lag1 + _lags3 = [lag1, lag1 + lag2, lag1 + lag2+lag3 ] + individuals.append(dict_individual(mf, partitioner, partitions, order, _lags3, alpha_cut)) + else: + individuals.append( + dict_individual(mf, partitioner, partitions, order, _lags2, alpha_cut)) + else: + individuals.append(dict_individual(mf, partitioner, partitions, order, _lags, alpha_cut)) + + + cluster, http_server = Util.start_dispy_cluster(metodo_cluster, nodes=nodes) + + jobs = [] + + for ind in individuals: + job = cluster.submit(ind, train, test) + jobs.append(job) + + + conn = hUtil.open_hyperparam_db('hyperparam.db') + + for job in jobs: + result, rmse = job() + if job.status == dispy.DispyJob.Finished and result is not None: + print(result) + + record = (datasetname, 'GridSearch', 'HOFTS', None, result['mf'], + result['order'], result['partitioner'], result['npart'], + result['alpha'], str(result['lags']), 'rmse', rmse) + + hUtil.insert_hyperparam(record, conn) + + else: + print(job.exception) + print(job.stdout) + + Util.stop_dispy_cluster(cluster, http_server) \ No newline at end of file diff --git a/pyFTS/hyperparam/Util.py b/pyFTS/hyperparam/Util.py new file mode 100644 index 0000000..48be9c6 --- /dev/null +++ b/pyFTS/hyperparam/Util.py @@ -0,0 +1,69 @@ +""" +Common facilities for hyperparameter tunning +""" + +import sqlite3 + +def open_hyperparam_db(name): + """ + Open a connection with a Sqlite database designed to store benchmark results. + + :param name: database filenem + :return: a sqlite3 database connection + """ + conn = sqlite3.connect(name) + + #performance optimizations + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA synchronous = NORMAL") + + create_hyperparam_tables(conn) + return conn + + +def create_hyperparam_tables(conn): + """ + Create a sqlite3 table designed to store benchmark results. + + :param conn: a sqlite3 database connection + """ + c = conn.cursor() + + c.execute('''CREATE TABLE if not exists hyperparam( + ID integer primary key, Date int, Dataset text, Tag text, + Model text, Transformation text, mf text, 'Order' int, + Partitioner text, Partitions int, alpha real, lags text, + Measure text, Value real)''') + + conn.commit() + + +def insert_hyperparam(data, conn): + """ + Insert benchmark data on database + + :param data: a tuple with the benchmark data with format: + + Dataset: Identify on which dataset the dataset was performed + Tag: a user defined word that indentify a benchmark set + Model: FTS model + Transformation: The name of data transformation, if one was used + mf: membership function + Order: the order of the FTS method + Partitioner: UoD partitioning scheme + Partitions: Number of partitions + alpha: alpha cut + lags: lags + Measure: accuracy measure + Value: the measure value + + :param conn: a sqlite3 database connection + :return: + """ + c = conn.cursor() + + c.execute("INSERT INTO hyperparam(Date, Dataset, Tag, Model, " + + "Transformation, mf, 'Order', Partitioner, Partitions, " + + "alpha, lags, Measure, Value) " + + "VALUES(datetime('now'),?,?,?,?,?,?,?,?,?,?,?,?)", data) + conn.commit() \ No newline at end of file diff --git a/pyFTS/hyperparam/__init__.py b/pyFTS/hyperparam/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyFTS/models/hofts.py b/pyFTS/models/hofts.py index 9aa4bbd..4227dc7 100644 --- a/pyFTS/models/hofts.py +++ b/pyFTS/models/hofts.py @@ -63,13 +63,19 @@ class HighOrderFTS(fts.FTS): self.lags = np.arange(1, self.order+1) def generate_lhs_flrg(self, sample, explain=False): + + nsample = [FuzzySet.fuzzyfy(k, partitioner=self.partitioner, mode="sets", alpha_cut=self.alpha_cut) + for k in sample] + + return self.generate_lhs_flrg_fuzzyfied(nsample, explain) + + def generate_lhs_flrg_fuzzyfied(self, sample, explain=False): lags = {} flrgs = [] for ct, o in enumerate(self.lags): - lhs = FuzzySet.fuzzyfy(sample[o-1], partitioner=self.partitioner, mode="sets", alpha_cut=self.alpha_cut) - lags[ct] = lhs + lags[ct] = sample[o-1] if explain: print("\t (Lag {}) {} -> {} \n".format(o, sample[o-1], lhs)) @@ -93,15 +99,39 @@ class HighOrderFTS(fts.FTS): def generate_flrg(self, data): l = len(data) for k in np.arange(self.max_lag, l): + lags = {} + if self.dump: print("FLR: " + str(k)) sample = data[k - self.max_lag: k] + print(sample) rhs = FuzzySet.fuzzyfy(data[k], partitioner=self.partitioner, mode="sets", alpha_cut=self.alpha_cut) flrgs = self.generate_lhs_flrg(sample) for flrg in flrgs: + print('key', flrg.get_key()) + if flrg.get_key() not in self.flrgs: + self.flrgs[flrg.get_key()] = flrg; + + for st in rhs: + self.flrgs[flrg.get_key()].append_rhs(st) + + def generate_flrg_fuzzyfied(self, data): + l = len(data) + for k in np.arange(self.max_lag, l): + if self.dump: print("FLR: " + str(k)) + + sample = data[k - self.max_lag: k] + + + rhs = data[k] + + flrgs = self.generate_lhs_flrg_fuzzyfied(sample) + + for flrg in flrgs: + if flrg.get_key() not in self.flrgs: self.flrgs[flrg.get_key()] = flrg; @@ -110,7 +140,11 @@ class HighOrderFTS(fts.FTS): def train(self, data, **kwargs): self.configure_lags(**kwargs) - self.generate_flrg(data) + if not kwargs.get('fuzzyfied',False): + self.generate_flrg(data) + else: + self.generate_flrg_fuzzyfied(data) + def forecast(self, ndata, **kwargs): diff --git a/pyFTS/models/multivariate/cmvfts.py b/pyFTS/models/multivariate/cmvfts.py new file mode 100644 index 0000000..738a8d4 --- /dev/null +++ b/pyFTS/models/multivariate/cmvfts.py @@ -0,0 +1,69 @@ + +import numpy as np +from pyFTS.common import FuzzySet, FLR, fts, flrg +from pyFTS.models import hofts +from pyFTS.models.multivariate import mvfts, grid, common + + +class ClusteredMVFTS(mvfts.MVFTS): + """ + Meta model for multivariate, high order, clustered multivariate FTS + """ + def __init__(self, **kwargs): + super(ClusteredMVFTS, self).__init__(**kwargs) + + self.cluster_method = kwargs.get('cluster_method', grid.GridCluster) + """The cluster method to be called when a new model is build""" + self.cluster_params = kwargs.get('cluster_params', {}) + """The cluster method parameters""" + self.cluster = None + """The most recent trained clusterer""" + + self.fts_method = kwargs.get('fts_method', hofts.HighOrderFTS) + """The FTS method to be called when a new model is build""" + self.fts_params = kwargs.get('fts_params', {}) + """The FTS method specific parameters""" + self.model = None + """The most recent trained model""" + + self.is_high_order = True + + self.order = kwargs.get("order", 2) + self.lags = kwargs.get("lags", None) + self.alpha_cut = kwargs.get('alpha_cut', 0.25) + + + def train(self, data, **kwargs): + + self.cluster = self.cluster_method(data=data, mvfts=self) + + self.model = self.fts_method(partitioner=self.cluster, **self.fts_params) + if self.model.is_high_order: + self.model.order = self.model = self.fts_method(partitioner=self.partitioner, + order=self.order, **self.fts_params) + + ndata = [] + for ct in range(1, len(data.index)): + ix = data.index[ct-1] + data_point = self.format_data(data.loc[ix]) + ndata.append(common.fuzzyfy_instance_clustered(data_point, self.cluster, self.alpha_cut)) + + self.model.train(ndata, fuzzyfied=True) + self.shortname = self.model.shortname + + + + + def __str__(self): + """String representation of the model""" + + return str(self.model) + + def __len__(self): + """ + The length (number of rules) of the model + + :return: number of rules + """ + return len(self.model) + diff --git a/pyFTS/models/multivariate/common.py b/pyFTS/models/multivariate/common.py index c167845..2b28701 100644 --- a/pyFTS/models/multivariate/common.py +++ b/pyFTS/models/multivariate/common.py @@ -1,10 +1,47 @@ import numpy as np import pandas as pd -from pyFTS.common import FuzzySet +from pyFTS.common import FuzzySet, Composite + +class MultivariateFuzzySet(Composite.FuzzySet): + """ + Multivariate Composite Fuzzy Set + """ + def __init__(self, name): + """ + Create an empty composite fuzzy set + :param name: fuzzy set name + """ + super(MultivariateFuzzySet, self).__init__(name) + self.sets = {} + + def append_set(self, variable, set): + """ + Appends a new fuzzy set from a new variable + + :param variable: an multivariate.variable instance + :param set: an common.FuzzySet instance + """ + self.sets[variable] = set + + def membership(self, x): + mv = [] + for var in self.sets.keys(): + data = x[var] + mv.append(self.sets[var].membership(data)) + return np.nanmin(mv) + + def fuzzyfy_instance(data_point, var): fsets = FuzzySet.fuzzyfy(data_point, var.partitioner, mode='sets', method='fuzzy', alpha_cut=var.alpha_cut) return [(var.name, fs) for fs in fsets] +def fuzzyfy_instance_clustered(data_point, cluster, alpha_cut=0.0): + fsets = [] + for fset in cluster.sets: + if cluster.sets[fset].membership(data_point) > alpha_cut: + fsets.append(fset) + return fsets + diff --git a/pyFTS/models/multivariate/grid.py b/pyFTS/models/multivariate/grid.py new file mode 100644 index 0000000..74a65ed --- /dev/null +++ b/pyFTS/models/multivariate/grid.py @@ -0,0 +1,29 @@ +from pyFTS.partitioners import partitioner +from pyFTS.models.multivariate.common import MultivariateFuzzySet +from itertools import product + +class GridCluster(partitioner.Partitioner): + """ + A cartesian product of all fuzzy sets of all variables + """ + + def __init__(self, **kwargs): + super(GridCluster, self).__init__(name="GridCluster", preprocess=False, **kwargs) + + self.mvfts = kwargs.get('mvfts', None) + self.sets = {} + self.build(None) + + def build(self, data): + fsets = [[x for x in k.partitioner.sets.values()] + for k in self.mvfts.explanatory_variables] + + c = 0 + for k in product(*fsets): + key = self.prefix+str(c) + mvfset = MultivariateFuzzySet(name=key) + c += 1 + for fset in k: + mvfset.append_set(fset.variable, fset) + self.sets[key] = mvfset + diff --git a/pyFTS/models/seasonal/partitioner.py b/pyFTS/models/seasonal/partitioner.py index 38c110e..2f3a6e8 100644 --- a/pyFTS/models/seasonal/partitioner.py +++ b/pyFTS/models/seasonal/partitioner.py @@ -56,7 +56,7 @@ class TimeGridPartitioner(partitioner.Partitioner): set_name = self.get_name(count) if self.membership_function == Membership.trimf: if c == self.min: - tmp = Composite(set_name, superset=True) + tmp = Composite(set_name, superset=True, **kwargs) tmp.append_set(FuzzySet(self.season, set_name, Membership.trimf, [self.season.value - pl2, self.season.value, self.season.value + 0.0000001], self.season.value, alpha=.5, @@ -67,7 +67,7 @@ class TimeGridPartitioner(partitioner.Partitioner): tmp.centroid = c sets[set_name] = tmp elif c == self.max - partlen: - tmp = Composite(set_name, superset=True) + tmp = Composite(set_name, superset=True, **kwargs) tmp.append_set(FuzzySet(self.season, set_name, Membership.trimf, [0.0000001, 0.0, pl2], 0.0, alpha=.5, diff --git a/pyFTS/partitioners/partitioner.py b/pyFTS/partitioners/partitioner.py index fe58aea..566cb65 100644 --- a/pyFTS/partitioners/partitioner.py +++ b/pyFTS/partitioners/partitioner.py @@ -27,7 +27,9 @@ class Partitioner(object): """data transformation to be applied on data""" self.indexer = kwargs.get('indexer', None) self.variable = kwargs.get('variable', None) + """In a multivariate context, the variable that contains this partitioner""" self.type = kwargs.get('type', 'common') + """The type of fuzzy sets that are generated by this partitioner""" self.ordered_sets = None if kwargs.get('preprocess',True): diff --git a/pyFTS/tests/general.py b/pyFTS/tests/general.py index 1b8d1b1..c43f393 100644 --- a/pyFTS/tests/general.py +++ b/pyFTS/tests/general.py @@ -21,13 +21,15 @@ from pyFTS.data import TAIEX, SP500, NASDAQ, Malaysia dataset = Malaysia.get_data('temperature')[:1000] -p = Entropy.EntropyPartitioner(data=dataset, npart=3) +p = Grid.GridPartitioner(data=dataset, npart=20) print(p) -model = hofts.HighOrderFTS(partitioner=p, order=2, lags=[34, 47], alpha_cut=0.31390672707694006) +model = hofts.HighOrderFTS(partitioner=p, order=2) -model.fit(dataset) +model.fit(dataset) #[22, 22, 23, 23, 24]) + +print(model) ''' #dataset = SP500.get_data()[11500:16000] diff --git a/pyFTS/tests/hyperparam.py b/pyFTS/tests/hyperparam.py new file mode 100644 index 0000000..5e00c60 --- /dev/null +++ b/pyFTS/tests/hyperparam.py @@ -0,0 +1,27 @@ + +from pyFTS.hyperparam import GridSearch + +def get_train_test(): + from pyFTS.data import Malaysia + + ds = Malaysia.get_data('temperature')[:2000] + # ds = pd.read_csv('Malaysia.csv',delimiter=',' )[['temperature']].values[:2000].flatten().tolist() + train = ds[:1000] + test = ds[1000:] + + return 'Malaysia.temperature', train, test + +hyperparams = { + 'order':[1], + 'partitions':[10, 15], + 'partitioner': [1], + 'mf': [1], + 'lags': [1, 2, 3], + 'alpha': [.1, .2, .5] +} + +nodes = ['192.168.0.110','192.168.0.106'] + +ds, train, test = get_train_test() + +GridSearch.execute(hyperparams, ds, train, test, nodes=nodes) \ No newline at end of file diff --git a/pyFTS/tests/multivariate.py b/pyFTS/tests/multivariate.py index 81178ff..9dd1db1 100644 --- a/pyFTS/tests/multivariate.py +++ b/pyFTS/tests/multivariate.py @@ -12,7 +12,7 @@ from pyFTS.models.seasonal.common import DateTime bc = Transformations.BoxCox(0) tdiff = Transformations.Differential(1) -from pyFTS.models.multivariate import common, variable, mvfts +from pyFTS.models.multivariate import common, variable, mvfts, cmvfts from pyFTS.models.seasonal import partitioner as seasonal from pyFTS.models.seasonal.common import DateTime @@ -89,10 +89,10 @@ test_mv = dataset.iloc[train_split:] vhour = variable.Variable("Hour", data_label="hour", partitioner=seasonal.TimeGridPartitioner, npart=24, data=dataset, partitioner_specific={'seasonality': DateTime.hour_of_day, 'type': 'common'}) -vprice = variable.Variable("Price", data_label="price", partitioner=Grid.GridPartitioner, npart=25, +vprice = variable.Variable("Price", data_label="price", partitioner=Grid.GridPartitioner, npart=10, data=train_mv) -model1 = wmvfts.WeightedMVFTS() +model1 = cmvfts.ClusteredMVFTS(order=2) model1.shortname += "1" model1.append_variable(vhour) model1.append_variable(vprice) diff --git a/setup.py b/setup.py index b0cef56..d06a39d 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( packages=['pyFTS', 'pyFTS.benchmarks', 'pyFTS.common', 'pyFTS.data', 'pyFTS.models.ensemble', 'pyFTS.models', 'pyFTS.models.seasonal', 'pyFTS.partitioners', 'pyFTS.probabilistic', 'pyFTS.tests', 'pyFTS.models.nonstationary', 'pyFTS.models.multivariate', - 'pyFTS.models.incremental'], + 'pyFTS.models.incremental', 'pyFTS.hyperparam'], version='1.2.3', description='Fuzzy Time Series for Python', author='Petronio Candido L. e Silva',