- -

Source code for pyFTS.hyperparam.GridSearch

-
-from pyFTS.common import Util, Membership
-from pyFTS.models import hofts
-from pyFTS.partitioners import Grid, Entropy
-from pyFTS.benchmarks import Measures
-from pyFTS.hyperparam import Util as hUtil
-import numpy as np
-import dispy
-from itertools import product
-
-
-
[docs]def dict_individual(mf, partitioner, partitions, order, lags, alpha_cut): - return { - 'mf': mf, - 'partitioner': partitioner, - 'npart': partitions, - 'alpha': alpha_cut, - 'order': order, - 'lags': lags - }
- - -
[docs]def cluster_method(individual, train, test): - from pyFTS.common import Util, Membership - from pyFTS.models import hofts - from pyFTS.partitioners import Grid, Entropy - from pyFTS.benchmarks import Measures - - if individual['mf'] == 1: - mf = Membership.trimf - elif individual['mf'] == 2: - mf = Membership.trapmf - elif individual['mf'] == 3 and individual['partitioner'] != 2: - mf = Membership.gaussmf - else: - mf = Membership.trimf - - if individual['partitioner'] == 1: - partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) - elif individual['partitioner'] == 2: - npart = individual['npart'] if individual['npart'] > 10 else 10 - partitioner = Entropy.EntropyPartitioner(data=train, npart=npart, func=mf) - - - model = hofts.WeightedHighOrderFTS(partitioner=partitioner, - lags=individual['lags'], - alpha_cut=individual['alpha'], - order=individual['order']) - - model.fit(train) - - rmse, mape, u = Measures.get_point_statistics(test, model) - - size = len(model) - - return individual, rmse, size, mape, u
- - -
[docs]def process_jobs(jobs, datasetname, conn): - for job in jobs: - result, rmse, size, mape, u = job() - if job.status == dispy.DispyJob.Finished and result is not None: - print("Processing result of {}".format(result)) - - metrics = {'rmse': rmse, 'size': size, 'mape': mape, 'u': u } - - for metric in metrics.keys(): - - record = (datasetname, 'GridSearch', 'WHOFTS', None, result['mf'], - result['order'], result['partitioner'], result['npart'], - result['alpha'], str(result['lags']), metric, metrics[metric]) - - print(record) - - hUtil.insert_hyperparam(record, conn) - - else: - print(job.exception) - print(job.stdout)
- - -
[docs]def execute(hyperparams, datasetname, train, test, **kwargs): - - nodes = kwargs.get('nodes',['127.0.0.1']) - - individuals = [] - - if 'lags' in hyperparams: - lags = hyperparams.pop('lags') - else: - lags = [k for k in np.arange(50)] - - keys_sorted = [k for k in sorted(hyperparams.keys())] - - index = {} - for k in np.arange(len(keys_sorted)): - index[keys_sorted[k]] = k - - print("Evaluation order: \n {}".format(index)) - - hp_values = [ - [v for v in hyperparams[hp]] - for hp in keys_sorted - ] - - print("Evaluation values: \n {}".format(hp_values)) - - cluster, http_server = Util.start_dispy_cluster(cluster_method, nodes=nodes) - conn = hUtil.open_hyperparam_db('hyperparam.db') - - for instance in product(*hp_values): - partitions = instance[index['partitions']] - partitioner = instance[index['partitioner']] - mf = instance[index['mf']] - alpha_cut = instance[index['alpha']] - order = instance[index['order']] - count = 0 - for lag1 in lags: # o é o lag1 - _lags = [lag1] - count += 1 - if order > 1: - for lag2 in lags: # o é o lag1 - _lags2 = [lag1, lag1+lag2] - count += 1 - if order > 2: - for lag3 in lags: # o é o lag1 - count += 1 - _lags3 = [lag1, lag1 + lag2, lag1 + lag2+lag3 ] - individuals.append(dict_individual(mf, partitioner, partitions, order, _lags3, alpha_cut)) - else: - individuals.append( - dict_individual(mf, partitioner, partitions, order, _lags2, alpha_cut)) - else: - individuals.append(dict_individual(mf, partitioner, partitions, order, _lags, alpha_cut)) - - if count > 50: - jobs = [] - - for ind in individuals: - print("Testing individual {}".format(ind)) - job = cluster.submit(ind, train, test) - jobs.append(job) - - process_jobs(jobs, datasetname, conn) - - count = 0 - - individuals = [] - - Util.stop_dispy_cluster(cluster, http_server)
-
- -