Optimizations on hyperparam and spark

This commit is contained in:
Petrônio Cândido 2019-01-18 13:14:33 -02:00
parent a0af21d4b9
commit 2ce04b1031
3 changed files with 39 additions and 14 deletions

View File

@ -180,7 +180,7 @@ def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'):
func = lambda x: slave_train_univariate(x, **parameters)
flrgs = context.parallelize(data).repartition(nodes*2).mapPartitions(func)
flrgs = context.parallelize(data).repartition(nodes*4).mapPartitions(func)
for k in flrgs.collect():
model.append_rule(k[1])

View File

@ -88,10 +88,10 @@ def phenotype(individual, train, parameters={}):
else:
mf = Membership.trimf
if individual['partitioner'] == 1:
#if individual['partitioner'] == 1:
partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf)
elif individual['partitioner'] == 2:
partitioner = Entropy.EntropyPartitioner(data=train, npart=individual['npart'], func=mf)
#elif individual['partitioner'] == 2:
# partitioner = Entropy.EntropyPartitioner(data=train, npart=individual['npart'], func=mf)
model = hofts.WeightedHighOrderFTS(partitioner=partitioner,
lags=individual['lags'],
@ -372,11 +372,12 @@ def GeneticAlgorithm(dataset, **kwargs):
last_best = population[0]
best = population[1]
print("Evaluating initial population {}".format(time.time()))
for individual in population:
individual['len_lags'], individual['rmse'] = evaluate(dataset, individual, **kwargs)
for i in range(ngen):
print("GENERATION {}".format(i))
print("GENERATION {} {}".format(i, time.time()))
generation_statistics = {}
@ -403,7 +404,6 @@ def GeneticAlgorithm(dataset, **kwargs):
if collect_statistics:
_f1.append(f1)
_f2.append(f2)
#print('eval {}'.format(individual))
if collect_statistics:
generation_statistics['population'] = {'f1': np.nanmedian(_f1), 'f2': np.nanmedian(_f2)}
@ -426,13 +426,13 @@ def GeneticAlgorithm(dataset, **kwargs):
if last_best['rmse'] <= best['rmse'] and last_best['len_lags'] <= best['len_lags']:
no_improvement_count += 1
#print("WITHOUT IMPROVEMENT {}".format(no_improvement_count))
print("WITHOUT IMPROVEMENT {}".format(no_improvement_count))
pmut += .05
else:
no_improvement_count = 0
pcruz = kwargs.get('pcruz', .5)
pmut = kwargs.get('pmut', .3)
#print(best)
print(best)
if no_improvement_count == mgen:
break

View File

@ -30,7 +30,7 @@ hyperparams = {
'lags': np.arange(1, 10),
'alpha': [.0, .3, .5]
}
"""
nodes = ['192.168.0.106', '192.168.0.110', '192.168.0.107']
datsetname, dataset = get_dataset()
@ -39,7 +39,7 @@ datsetname, dataset = get_dataset()
#Evolutionary.cluster_method(dataset, 70, 20, .8, .3, 1)
'''
from pyFTS.models import hofts
from pyFTS.partitioners import Grid
from pyFTS.benchmarks import Measures
@ -56,11 +56,36 @@ Measures.get_point_statistics(dataset[800:1000], model)
print(model)
'''
ret = Evolutionary.execute(datsetname, dataset,
ngen=30, npop=20, pcruz=.5, pmut=.3,
window_size=800, experiments=30)
#parameters={'distributed': 'spark', 'url': 'spark://192.168.0.106:7077'})
parameters={'distributed': 'spark', 'url': 'spark://192.168.0.106:7077'})
print(ret)
#'''
"""
from pyFTS.hyperparam import Evolutionary
from pyFTS.data import SONDA
data = np.array(SONDA.get_data('glo_avg'))
data = data[~(np.isnan(data) | np.equal(data, 0.0))]
dataset = data[:1000000]
del(data)
ret, statistics = Evolutionary.GeneticAlgorithm(dataset, ngen=30, npop=20, pcruz=.5,
pmut=.3, window_size=800000, collect_statistics=True,
parameters={'distributed': 'spark',
'url': 'spark://192.168.0.106:7077'})
import json
print(ret)
with open('statistics.txt', 'w') as file:
file.write(json.dumps(statistics)) # use `json.loads` to do the reverse