Mixture Distributions; Bugfixes on distributed.dispy and benchmarks; Improvements on hyperparam

This commit is contained in:
Petrônio Cândido 2019-03-18 18:22:03 -03:00
parent 8a01256185
commit 6fd161468b
6 changed files with 227 additions and 134 deletions

View File

@ -218,10 +218,10 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
raise ValueError("Type parameter has a unkown value!")
if distributed:
import dispy, dispy.httpd
import pyFTS.distributed.dispy as dispy
nodes = kwargs.get("nodes", ['127.0.0.1'])
cluster, http_server = cUtil.start_dispy_cluster(experiment_method, nodes)
cluster, http_server = dispy.start_dispy_cluster(experiment_method, nodes)
jobs = []
@ -306,7 +306,7 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
if progress:
progressbar.update(1)
job()
if job.status == dispy.DispyJob.Finished and job is not None:
if job.status == dispy.dispy.DispyJob.Finished and job is not None:
tmp = job.result
synthesis_method(dataset, tag, tmp, conn)
else:
@ -317,7 +317,7 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs):
cluster.wait() # wait for all jobs to finish
cUtil.stop_dispy_cluster(cluster, http_server)
dispy.stop_dispy_cluster(cluster, http_server)
conn.close()

View File

@ -1,5 +1,6 @@
import dispy, dispy.httpd, logging
import dispy as dispy, dispy.httpd, logging
from pyFTS.common import Util
import numpy as np
def start_dispy_cluster(method, nodes):

View File

@ -13,11 +13,12 @@ from pyFTS.partitioners import Grid, Entropy # , Huarng
from pyFTS.models import hofts
from pyFTS.common import Membership
from pyFTS.hyperparam import Util as hUtil
from pyFTS.distributed import dispy
from pyFTS.distributed import dispy as dUtil
__measures = ['f1', 'f2', 'rmse', 'size']
#
def genotype(mf, npart, partitioner, order, alpha, lags, len_lags, rmse):
def genotype(mf, npart, partitioner, order, alpha, lags, f1, f2):
'''
Create the individual genotype
@ -27,12 +28,12 @@ def genotype(mf, npart, partitioner, order, alpha, lags, len_lags, rmse):
:param order: model order
:param alpha: alpha-cut
:param lags: array with lag indexes
:param len_lags: parsimony fitness value
:param rmse: accuracy fitness value
:param f1: accuracy fitness value
:param f2: parsimony fitness value
:return: the genotype, a dictionary with all hyperparameters
'''
ind = dict(mf=mf, npart=npart, partitioner=partitioner, order=order,
alpha=alpha, lags=lags, len_lags=len_lags, rmse=rmse)
alpha=alpha, lags=lags, f1=f1, f2=f2)
return ind
@ -122,17 +123,19 @@ def evaluate(dataset, individual, **kwargs):
'''
from pyFTS.common import Util
from pyFTS.benchmarks import Measures
from pyFTS.hyperparam.Evolutionary import phenotype, __measures
import numpy as np
window_size = kwargs.get('window_size', 800)
train_rate = kwargs.get('train_rate', .8)
increment_rate = kwargs.get('increment_rate', .2)
parameters = kwargs.get('parameters',{})
if individual['rmse'] is not None and individual['len_lags'] is not None:
return individual['len_lags'], individual['rmse']
if individual['f1'] is not None and individual['f2'] is not None:
return { key: individual[key] for key in __measures }
try:
results = []
errors = []
lengths = []
for count, train, test in Util.sliding_window(dataset, window_size, train=train_rate, inc=increment_rate):
@ -144,22 +147,25 @@ def evaluate(dataset, individual, **kwargs):
forecasts = model.predict(test)
rmse = Measures.rmse(test[model.max_lag:], forecasts[:-1]) #.get_point_statistics(test, model)
rmse = Measures.rmse(test[model.max_lag:], forecasts) #.get_point_statistics(test, model)
lengths.append(len(model))
results.append(rmse)
errors.append(rmse)
_lags = sum(model.lags) * 100
_lags = sum(model.lags) * 100
rmse = np.nansum([.6 * np.nanmean(results), .4 * np.nanstd(results)])
len_lags = np.nansum([.4 * np.nanmean(lengths), .6 * _lags])
_rmse = np.nanmean(errors)
_len = np.nanmean(lengths)
f1 = np.nansum([.6 * _rmse, .4 * np.nanstd(errors)])
f2 = np.nansum([.4 * _len, .6 * _lags])
#print("EVALUATION {}".format(individual))
return len_lags, rmse
return {'f1': f1, 'f2': f2, 'rmse': _rmse, 'size': _len }
except Exception as ex:
print("EVALUATION EXCEPTION!", str(ex), str(individual))
return np.inf, np.inf
#print("EVALUATION EXCEPTION!", str(ex), str(individual))
return {'f1': np.inf, 'f2': np.inf, 'rmse': np.inf, 'size': np.inf }
def tournament(population, objective):
@ -191,10 +197,10 @@ def double_tournament(population):
:return:
'''
ancestor1 = tournament(population, 'rmse')
ancestor2 = tournament(population, 'rmse')
ancestor1 = tournament(population, 'f1')
ancestor2 = tournament(population, 'f1')
selected = tournament([ancestor1, ancestor2], 'len_lags')
selected = tournament([ancestor1, ancestor2], 'f2')
return selected
@ -242,7 +248,7 @@ def crossover(parents):
r1 = random.randint(0, n)
r2 = random.randint(0, n)
if parents[r1]['rmse'] < parents[r2]['rmse']:
if parents[r1]['f1'] < parents[r2]['f1']:
best = parents[r1]
worst = parents[r2]
else:
@ -315,8 +321,8 @@ def mutation(individual, pmut):
# Chama a função mutation_lags
individual['lags'] = mutation_lags( individual['lags'], individual['order'])
individual['rmse'] = None
individual['len_lags'] = None
individual['f1'] = None
individual['f2'] = None
return individual
@ -329,12 +335,14 @@ def elitism(population, new_population):
:param new_population:
:return:
'''
population = sorted(population, key=itemgetter('rmse'))
population = sorted(population, key=itemgetter('f1'))
best = population[0]
new_population = sorted(new_population, key=itemgetter('rmse'))
if new_population[0]["rmse"] > best["rmse"]:
new_population = sorted(new_population, key=itemgetter('f1'))
if new_population[0]["f1"] > best["f1"]:
new_population.insert(0,best)
elif new_population[0]["f1"] == best["f1"] and new_population[0]["f2"] > best["f2"]:
new_population.insert(0, best)
return new_population
@ -363,6 +371,10 @@ def GeneticAlgorithm(dataset, **kwargs):
npop = kwargs.get('npop',20)
pcruz = kwargs.get('pcruz',.5)
pmut = kwargs.get('pmut',.3)
distributed = kwargs.get('distributed', False)
if distributed == 'dispy':
cluster = kwargs.pop('cluster', None)
collect_statistics = kwargs.get('collect_statistics', False)
@ -376,8 +388,25 @@ def GeneticAlgorithm(dataset, **kwargs):
best = population[1]
print("Evaluating initial population {}".format(time.time()))
for individual in population:
individual['len_lags'], individual['rmse'] = evaluate(dataset, individual, **kwargs)
if not distributed:
for individual in population:
ret = evaluate(dataset, individual, **kwargs)
for key in __measures:
individual[key] = ret[key]
elif distributed=='dispy':
jobs = []
for ct, individual in enumerate(population):
job = cluster.submit(dataset, individual, **kwargs)
job.id = ct
jobs.append(job)
for job in jobs:
result = job()
if job.status == dispy.DispyJob.Finished and result is not None:
for key in __measures:
population[job.id][key] = result[key]
else:
print(job.exception)
print(job.stdout)
for i in range(ngen):
print("GENERATION {} {}".format(i, time.time()))
@ -400,16 +429,41 @@ def GeneticAlgorithm(dataset, **kwargs):
new_population[ct] = mutation(individual, pmut)
# Evaluation
_f1 = _f2 = []
for individual in new_population:
f1, f2 = evaluate(dataset, individual, **kwargs)
individual['len_lags'], individual['rmse'] = f1, f2
if collect_statistics:
_f1.append(f1)
_f2.append(f2)
if collect_statistics:
stats = {}
for key in __measures:
stats[key] = []
if not distributed:
for individual in new_population:
ret = evaluate(dataset, individual, **kwargs)
for key in __measures:
individual[key] = ret[key]
if collect_statistics: stats[key].append(ret[key])
elif distributed == 'dispy':
jobs = []
for ct, individual in enumerate(new_population):
job = cluster.submit(dataset, individual, **kwargs)
job.id = ct
jobs.append(job)
for job in jobs:
print('job id {}'.format(job.id))
result = job()
if job.status == dispy.DispyJob.Finished and result is not None:
for key in __measures:
new_population[job.id][key] = result[key]
if collect_statistics: stats[key].append(ret[key])
else:
print(job.exception)
print(job.stdout)
if collect_statistics:
generation_statistics['population'] = {'f1': np.nanmedian(_f1), 'f2': np.nanmedian(_f2)}
mean_stats = {key: np.nanmedian(stats[key]) for key in __measures }
generation_statistics['population'] = mean_stats
# Elitism
population = elitism(population, new_population)
@ -423,11 +477,11 @@ def GeneticAlgorithm(dataset, **kwargs):
best = population[0]
if collect_statistics:
generation_statistics['best'] = {'f1': best["len_lags"], 'f2': best["rmse"]}
generation_statistics['best'] = {key: best[key] for key in __measures }
statistics.append(generation_statistics)
if last_best['rmse'] <= best['rmse'] and last_best['len_lags'] <= best['len_lags']:
if last_best['f1'] <= best['f1'] and last_best['f2'] <= best['f2']:
no_improvement_count += 1
print("WITHOUT IMPROVEMENT {}".format(no_improvement_count))
pmut += .05
@ -444,35 +498,15 @@ def GeneticAlgorithm(dataset, **kwargs):
return best, statistics
def cluster_method(dataset, **kwargs):
from pyFTS.hyperparam.Evolutionary import GeneticAlgorithm
inicio = time.time()
ret, statistics = GeneticAlgorithm(dataset, **kwargs)
fim = time.time()
ret['time'] = fim - inicio
ret['size'] = ret['len_lags']
return ret, statistics
def process_jobs(jobs, datasetname, conn):
for job in jobs:
result,statistics = job()
if job.status == dispy.DispyJob.Finished and result is not None:
print("Processing result of {}".format(result))
log_result(conn, datasetname, result)
persist_statistics(statistics)
else:
print(job.exception)
print(job.stdout)
def process_experiment(result, datasetname, conn):
log_result(conn, datasetname, result['individual'])
persist_statistics(result['statistics'])
return result['individual']
def persist_statistics(statistics):
import json
with open('statistics.txt', 'w') as file:
with open('statistics{}.txt'.format(time.time()), 'w') as file:
file.write(json.dumps(statistics))
@ -491,33 +525,29 @@ def log_result(conn, datasetname, result):
def execute(datasetname, dataset, **kwargs):
conn = hUtil.open_hyperparam_db('hyperparam.db')
distributed = kwargs.get('distributed', False)
experiments = kwargs.get('experiments', 30)
if not distributed:
ret = []
for i in range(experiments):
result, statistics = cluster_method(dataset, **kwargs)
log_result(conn, datasetname, result)
persist_statistics(statistics)
ret.append(result)
distributed = kwargs.get('distributed', False)
return result
elif distributed=='dispy':
if distributed == 'dispy':
nodes = kwargs.get('nodes', ['127.0.0.1'])
cluster, http_server = dUtil.start_dispy_cluster(evaluate, nodes=nodes)
kwargs['cluster'] = cluster
cluster, http_server = dispy.start_dispy_cluster(cluster_method, nodes=nodes)
ret = []
for i in range(experiments):
print("Experiment {}".format(i))
start = time.time()
ret, statistics = GeneticAlgorithm(dataset, **kwargs)
end = time.time()
ret['time'] = end - start
experiment = {'individual': ret, 'statistics': statistics}
jobs = []
ret = process_experiment(experiment, datasetname, conn)
for i in range(experiments):
print("Experiment {}".format(i))
job = cluster.submit(dataset, **kwargs)
jobs.append(job)
if distributed == 'dispy':
dUtil.stop_dispy_cluster(cluster, http_server)
process_jobs(jobs, datasetname, conn)
return ret
dispy.stop_dispy_cluster(cluster, http_server)

View File

@ -4,8 +4,9 @@ from pyFTS.models import hofts
from pyFTS.partitioners import Grid, Entropy
from pyFTS.benchmarks import Measures
from pyFTS.hyperparam import Util as hUtil
import numpy as np
from pyFTS.distributed import dispy as dUtil
import dispy
import numpy as np
from itertools import product
@ -20,11 +21,12 @@ def dict_individual(mf, partitioner, partitions, order, lags, alpha_cut):
}
def cluster_method(individual, train, test):
def cluster_method(individual, dataset, **kwargs):
from pyFTS.common import Util, Membership
from pyFTS.models import hofts
from pyFTS.partitioners import Grid, Entropy
from pyFTS.benchmarks import Measures
import numpy as np
if individual['mf'] == 1:
mf = Membership.trimf
@ -35,42 +37,58 @@ def cluster_method(individual, train, test):
else:
mf = Membership.trimf
if individual['partitioner'] == 1:
partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf)
elif individual['partitioner'] == 2:
npart = individual['npart'] if individual['npart'] > 10 else 10
partitioner = Entropy.EntropyPartitioner(data=train, npart=npart, func=mf)
window_size = kwargs.get('window_size', 800)
train_rate = kwargs.get('train_rate', .8)
increment_rate = kwargs.get('increment_rate', .2)
parameters = kwargs.get('parameters', {})
errors = []
sizes = []
model = hofts.WeightedHighOrderFTS(partitioner=partitioner,
lags=individual['lags'],
alpha_cut=individual['alpha'],
order=individual['order'])
for count, train, test in Util.sliding_window(dataset, window_size, train=train_rate, inc=increment_rate):
model.fit(train)
if individual['partitioner'] == 1:
partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf)
elif individual['partitioner'] == 2:
npart = individual['npart'] if individual['npart'] > 10 else 10
partitioner = Entropy.EntropyPartitioner(data=train, npart=npart, func=mf)
rmse, mape, u = Measures.get_point_statistics(test, model)
model = hofts.WeightedHighOrderFTS(partitioner=partitioner,
lags=individual['lags'],
alpha_cut=individual['alpha'],
order=individual['order'])
model.fit(train)
size = len(model)
forecasts = model.predict(test)
return individual, rmse, size, mape, u
#rmse, mape, u = Measures.get_point_statistics(test, model)
rmse = Measures.rmse(test[model.max_lag:], forecasts)
size = len(model)
errors.append(rmse)
sizes.append(size)
return {'parameters': individual, 'rmse': np.nanmean(errors), 'size': np.nanmean(size)}
def process_jobs(jobs, datasetname, conn):
for job in jobs:
result, rmse, size, mape, u = job()
for ct, job in enumerate(jobs):
print("Processing job {}".format(ct))
result = job()
if job.status == dispy.DispyJob.Finished and result is not None:
print("Processing result of {}".format(result))
metrics = {'rmse': rmse, 'size': size, 'mape': mape, 'u': u }
metrics = {'rmse': result['rmse'], 'size': result['size']}
for metric in metrics.keys():
record = (datasetname, 'GridSearch', 'WHOFTS', None, result['mf'],
result['order'], result['partitioner'], result['npart'],
result['alpha'], str(result['lags']), metric, metrics[metric])
print(record)
param = result['parameters']
record = (datasetname, 'GridSearch', 'WHOFTS', None, param['mf'],
param['order'], param['partitioner'], param['npart'],
param['alpha'], str(param['lags']), metric, metrics[metric])
hUtil.insert_hyperparam(record, conn)
@ -79,7 +97,7 @@ def process_jobs(jobs, datasetname, conn):
print(job.stdout)
def execute(hyperparams, datasetname, train, test, **kwargs):
def execute(hyperparams, datasetname, dataset, **kwargs):
nodes = kwargs.get('nodes',['127.0.0.1'])
@ -105,7 +123,7 @@ def execute(hyperparams, datasetname, train, test, **kwargs):
print("Evaluation values: \n {}".format(hp_values))
cluster, http_server = Util.start_dispy_cluster(cluster_method, nodes=nodes)
cluster, http_server = dUtil.start_dispy_cluster(cluster_method, nodes=nodes)
conn = hUtil.open_hyperparam_db('hyperparam.db')
for instance in product(*hp_values):
@ -133,12 +151,12 @@ def execute(hyperparams, datasetname, train, test, **kwargs):
else:
individuals.append(dict_individual(mf, partitioner, partitions, order, _lags, alpha_cut))
if count > 50:
if count > 10:
jobs = []
for ind in individuals:
print("Testing individual {}".format(ind))
job = cluster.submit(ind, train, test)
job = cluster.submit(ind, dataset, **kwargs)
jobs.append(job)
process_jobs(jobs, datasetname, conn)
@ -147,4 +165,4 @@ def execute(hyperparams, datasetname, train, test, **kwargs):
individuals = []
Util.stop_dispy_cluster(cluster, http_server)
dUtil.stop_dispy_cluster(cluster, http_server)

View File

@ -0,0 +1,27 @@
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyFTS.common import FuzzySet,SortedCollection,tree
from pyFTS.probabilistic import ProbabilityDistribution
class Mixture(ProbabilityDistribution.ProbabilityDistribution):
"""
"""
def __init__(self, type="mixture", **kwargs):
self.models = []
self.weights = []
def append_model(self,model, weight):
self.models.append(model)
self.weights.append(weight)
def density(self, values):
if not isinstance(values, list):
values = [values]
for ct, m in enumerate(self.models):
probs = [m.density(values) ]

View File

@ -1,27 +1,29 @@
import numpy as np
import pandas as pd
from pyFTS.hyperparam import GridSearch, Evolutionary
def get_dataset():
#from pyFTS.data import SONDA
from pyFTS.data import Malaysia
from pyFTS.data import SONDA
#from pyFTS.data import Malaysia
#data = SONDA.get_data('temperature')[:1000]
data = Malaysia.get_data('temperature')[:1000]
#data = SONDA.get_data('temperature')[:3000]
data = pd.read_csv('https://query.data.world/s/6xfb5useuotbbgpsnm5b2l3wzhvw2i', sep=';')
#data = Malaysia.get_data('temperature')[:1000]
return 'SONDA.glo_avg', data['glo_avg'].values #train, test
#return 'Malaysia.temperature', data #train, test
#return 'SONDA.glo_avg', data #train, test
return 'Malaysia.temperature', data #train, test
"""
hyperparams = {
'order':[1, 2, 3],
'order':[3],
'partitions': np.arange(10,100,3),
'partitioner': [1,2],
'mf': [1, 2, 3, 4],
'lags': np.arange(1,35,2),
'partitioner': [1],
'mf': [1], #, 2, 3, 4],
'lags': np.arange(2, 7, 1),
'alpha': np.arange(.0, .5, .05)
}
'''
hyperparams = {
'order':[3], #[1, 2],
'partitions': np.arange(10,100,10),
@ -30,16 +32,28 @@ hyperparams = {
'lags': np.arange(1, 10),
'alpha': [.0, .3, .5]
}
'''
nodes = ['192.168.0.106', '192.168.0.110', '192.168.0.107']
datsetname, dataset = get_dataset()
#GridSearch.execute(hyperparams, ds, train, test, nodes=nodes)
#GridSearch.execute(hyperparams, datsetname, dataset, nodes=nodes,
# window_size=10000, train_rate=.9, increment_rate=1,)
ret = Evolutionary.execute(datsetname, dataset,
ngen=30, npop=20, pcruz=.5, pmut=.3,
window_size=10000, train_rate=.9, increment_rate=1,
experiments=1,
distributed='dispy', nodes=nodes)
#res = GridSearch.cluster_method({'mf':1, 'partitioner': 1, 'npart': 10, 'lags':[1], 'alpha': 0.0, 'order': 1},
# dataset, window_size = 10000, train_rate = .9, increment_rate = 1)
#print(res)
#Evolutionary.cluster_method(dataset, 70, 20, .8, .3, 1)
"""
from pyFTS.models import hofts
from pyFTS.partitioners import Grid
from pyFTS.benchmarks import Measures
@ -63,11 +77,11 @@ ret = Evolutionary.execute(datsetname, dataset,
parameters={'distributed': 'spark', 'url': 'spark://192.168.0.106:7077'})
print(ret)
"""
from pyFTS.hyperparam import Evolutionary
"""
from pyFTS.data import SONDA
data = np.array(SONDA.get_data('glo_avg'))
@ -78,7 +92,7 @@ dataset = data[:1000000]
del(data)
"""
import pandas as pd
df = pd.read_csv('https://query.data.world/s/i7eb73c4rluf2luasppsyxaurx5ol7', sep=';')
@ -93,6 +107,9 @@ from time import time
t1 = time()
Evolutionary.execute('SONDA', dataset,
ngen=20, mgen=5, npop=15, pcruz=.5, pmut=.3,
window_size=35000, train_rate=.6, increment_rate=1,
@ -102,4 +119,4 @@ Evolutionary.execute('SONDA', dataset,
t2 = time()
print(t2 - t1)
"""