From 2ce04b1031257d481336cf3d3cb631aa4cede8aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido?= Date: Fri, 18 Jan 2019 13:14:33 -0200 Subject: [PATCH] Optimizations on hyperparam and spark --- pyFTS/distributed/spark.py | 2 +- pyFTS/hyperparam/Evolutionary.py | 16 +++++++-------- pyFTS/tests/hyperparam.py | 35 +++++++++++++++++++++++++++----- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/pyFTS/distributed/spark.py b/pyFTS/distributed/spark.py index d24a537..c822574 100644 --- a/pyFTS/distributed/spark.py +++ b/pyFTS/distributed/spark.py @@ -180,7 +180,7 @@ def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'): func = lambda x: slave_train_univariate(x, **parameters) - flrgs = context.parallelize(data).repartition(nodes*2).mapPartitions(func) + flrgs = context.parallelize(data).repartition(nodes*4).mapPartitions(func) for k in flrgs.collect(): model.append_rule(k[1]) diff --git a/pyFTS/hyperparam/Evolutionary.py b/pyFTS/hyperparam/Evolutionary.py index e9a806d..59f4d1c 100644 --- a/pyFTS/hyperparam/Evolutionary.py +++ b/pyFTS/hyperparam/Evolutionary.py @@ -88,10 +88,10 @@ def phenotype(individual, train, parameters={}): else: mf = Membership.trimf - if individual['partitioner'] == 1: - partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) - elif individual['partitioner'] == 2: - partitioner = Entropy.EntropyPartitioner(data=train, npart=individual['npart'], func=mf) + #if individual['partitioner'] == 1: + partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) + #elif individual['partitioner'] == 2: + # partitioner = Entropy.EntropyPartitioner(data=train, npart=individual['npart'], func=mf) model = hofts.WeightedHighOrderFTS(partitioner=partitioner, lags=individual['lags'], @@ -372,11 +372,12 @@ def GeneticAlgorithm(dataset, **kwargs): last_best = population[0] best = population[1] + print("Evaluating initial population {}".format(time.time())) for individual in population: individual['len_lags'], individual['rmse'] = evaluate(dataset, individual, **kwargs) for i in range(ngen): - print("GENERATION {}".format(i)) + print("GENERATION {} {}".format(i, time.time())) generation_statistics = {} @@ -403,7 +404,6 @@ def GeneticAlgorithm(dataset, **kwargs): if collect_statistics: _f1.append(f1) _f2.append(f2) - #print('eval {}'.format(individual)) if collect_statistics: generation_statistics['population'] = {'f1': np.nanmedian(_f1), 'f2': np.nanmedian(_f2)} @@ -426,13 +426,13 @@ def GeneticAlgorithm(dataset, **kwargs): if last_best['rmse'] <= best['rmse'] and last_best['len_lags'] <= best['len_lags']: no_improvement_count += 1 - #print("WITHOUT IMPROVEMENT {}".format(no_improvement_count)) + print("WITHOUT IMPROVEMENT {}".format(no_improvement_count)) pmut += .05 else: no_improvement_count = 0 pcruz = kwargs.get('pcruz', .5) pmut = kwargs.get('pmut', .3) - #print(best) + print(best) if no_improvement_count == mgen: break diff --git a/pyFTS/tests/hyperparam.py b/pyFTS/tests/hyperparam.py index 34f87f9..a50edb2 100644 --- a/pyFTS/tests/hyperparam.py +++ b/pyFTS/tests/hyperparam.py @@ -30,7 +30,7 @@ hyperparams = { 'lags': np.arange(1, 10), 'alpha': [.0, .3, .5] } -""" + nodes = ['192.168.0.106', '192.168.0.110', '192.168.0.107'] datsetname, dataset = get_dataset() @@ -39,7 +39,7 @@ datsetname, dataset = get_dataset() #Evolutionary.cluster_method(dataset, 70, 20, .8, .3, 1) -''' + from pyFTS.models import hofts from pyFTS.partitioners import Grid from pyFTS.benchmarks import Measures @@ -56,11 +56,36 @@ Measures.get_point_statistics(dataset[800:1000], model) print(model) -''' + ret = Evolutionary.execute(datsetname, dataset, ngen=30, npop=20, pcruz=.5, pmut=.3, window_size=800, experiments=30) - #parameters={'distributed': 'spark', 'url': 'spark://192.168.0.106:7077'}) + parameters={'distributed': 'spark', 'url': 'spark://192.168.0.106:7077'}) print(ret) -#''' \ No newline at end of file +""" + +from pyFTS.hyperparam import Evolutionary + +from pyFTS.data import SONDA + +data = np.array(SONDA.get_data('glo_avg')) + +data = data[~(np.isnan(data) | np.equal(data, 0.0))] + +dataset = data[:1000000] + +del(data) + +ret, statistics = Evolutionary.GeneticAlgorithm(dataset, ngen=30, npop=20, pcruz=.5, + pmut=.3, window_size=800000, collect_statistics=True, + parameters={'distributed': 'spark', + 'url': 'spark://192.168.0.106:7077'}) + +import json + +print(ret) + +with open('statistics.txt', 'w') as file: + file.write(json.dumps(statistics)) # use `json.loads` to do the reverse +