From 6fd161468b778237d2c7ed7cc5fbde95c97d500a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=C3=B4nio=20C=C3=A2ndido?= Date: Mon, 18 Mar 2019 18:22:03 -0300 Subject: [PATCH] Mixture Distributions; Bugfixes on distributed.dispy and benchmarks; Improvements on hyperparam --- pyFTS/benchmarks/benchmarks.py | 8 +- pyFTS/distributed/dispy.py | 3 +- pyFTS/hyperparam/Evolutionary.py | 194 ++++++++++++++++++------------- pyFTS/hyperparam/GridSearch.py | 74 +++++++----- pyFTS/probabilistic/Mixture.py | 27 +++++ pyFTS/tests/hyperparam.py | 55 ++++++--- 6 files changed, 227 insertions(+), 134 deletions(-) create mode 100644 pyFTS/probabilistic/Mixture.py diff --git a/pyFTS/benchmarks/benchmarks.py b/pyFTS/benchmarks/benchmarks.py index e43a9a6..b7f9c1b 100644 --- a/pyFTS/benchmarks/benchmarks.py +++ b/pyFTS/benchmarks/benchmarks.py @@ -218,10 +218,10 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs): raise ValueError("Type parameter has a unkown value!") if distributed: - import dispy, dispy.httpd + import pyFTS.distributed.dispy as dispy nodes = kwargs.get("nodes", ['127.0.0.1']) - cluster, http_server = cUtil.start_dispy_cluster(experiment_method, nodes) + cluster, http_server = dispy.start_dispy_cluster(experiment_method, nodes) jobs = [] @@ -306,7 +306,7 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs): if progress: progressbar.update(1) job() - if job.status == dispy.DispyJob.Finished and job is not None: + if job.status == dispy.dispy.DispyJob.Finished and job is not None: tmp = job.result synthesis_method(dataset, tag, tmp, conn) else: @@ -317,7 +317,7 @@ def sliding_window_benchmarks(data, windowsize, train=0.8, **kwargs): cluster.wait() # wait for all jobs to finish - cUtil.stop_dispy_cluster(cluster, http_server) + dispy.stop_dispy_cluster(cluster, http_server) conn.close() diff --git a/pyFTS/distributed/dispy.py b/pyFTS/distributed/dispy.py index 24dbc26..f9c0eab 100644 --- a/pyFTS/distributed/dispy.py +++ b/pyFTS/distributed/dispy.py @@ -1,5 +1,6 @@ -import dispy, dispy.httpd, logging +import dispy as dispy, dispy.httpd, logging from pyFTS.common import Util +import numpy as np def start_dispy_cluster(method, nodes): diff --git a/pyFTS/hyperparam/Evolutionary.py b/pyFTS/hyperparam/Evolutionary.py index 46fb54b..7120c9d 100644 --- a/pyFTS/hyperparam/Evolutionary.py +++ b/pyFTS/hyperparam/Evolutionary.py @@ -13,11 +13,12 @@ from pyFTS.partitioners import Grid, Entropy # , Huarng from pyFTS.models import hofts from pyFTS.common import Membership from pyFTS.hyperparam import Util as hUtil -from pyFTS.distributed import dispy +from pyFTS.distributed import dispy as dUtil +__measures = ['f1', 'f2', 'rmse', 'size'] # -def genotype(mf, npart, partitioner, order, alpha, lags, len_lags, rmse): +def genotype(mf, npart, partitioner, order, alpha, lags, f1, f2): ''' Create the individual genotype @@ -27,12 +28,12 @@ def genotype(mf, npart, partitioner, order, alpha, lags, len_lags, rmse): :param order: model order :param alpha: alpha-cut :param lags: array with lag indexes - :param len_lags: parsimony fitness value - :param rmse: accuracy fitness value + :param f1: accuracy fitness value + :param f2: parsimony fitness value :return: the genotype, a dictionary with all hyperparameters ''' ind = dict(mf=mf, npart=npart, partitioner=partitioner, order=order, - alpha=alpha, lags=lags, len_lags=len_lags, rmse=rmse) + alpha=alpha, lags=lags, f1=f1, f2=f2) return ind @@ -122,17 +123,19 @@ def evaluate(dataset, individual, **kwargs): ''' from pyFTS.common import Util from pyFTS.benchmarks import Measures + from pyFTS.hyperparam.Evolutionary import phenotype, __measures + import numpy as np window_size = kwargs.get('window_size', 800) train_rate = kwargs.get('train_rate', .8) increment_rate = kwargs.get('increment_rate', .2) parameters = kwargs.get('parameters',{}) - if individual['rmse'] is not None and individual['len_lags'] is not None: - return individual['len_lags'], individual['rmse'] + if individual['f1'] is not None and individual['f2'] is not None: + return { key: individual[key] for key in __measures } try: - results = [] + errors = [] lengths = [] for count, train, test in Util.sliding_window(dataset, window_size, train=train_rate, inc=increment_rate): @@ -144,22 +147,25 @@ def evaluate(dataset, individual, **kwargs): forecasts = model.predict(test) - rmse = Measures.rmse(test[model.max_lag:], forecasts[:-1]) #.get_point_statistics(test, model) + rmse = Measures.rmse(test[model.max_lag:], forecasts) #.get_point_statistics(test, model) lengths.append(len(model)) - results.append(rmse) + errors.append(rmse) - _lags = sum(model.lags) * 100 + _lags = sum(model.lags) * 100 - rmse = np.nansum([.6 * np.nanmean(results), .4 * np.nanstd(results)]) - len_lags = np.nansum([.4 * np.nanmean(lengths), .6 * _lags]) + _rmse = np.nanmean(errors) + _len = np.nanmean(lengths) + + f1 = np.nansum([.6 * _rmse, .4 * np.nanstd(errors)]) + f2 = np.nansum([.4 * _len, .6 * _lags]) #print("EVALUATION {}".format(individual)) - return len_lags, rmse + return {'f1': f1, 'f2': f2, 'rmse': _rmse, 'size': _len } except Exception as ex: - print("EVALUATION EXCEPTION!", str(ex), str(individual)) - return np.inf, np.inf + #print("EVALUATION EXCEPTION!", str(ex), str(individual)) + return {'f1': np.inf, 'f2': np.inf, 'rmse': np.inf, 'size': np.inf } def tournament(population, objective): @@ -191,10 +197,10 @@ def double_tournament(population): :return: ''' - ancestor1 = tournament(population, 'rmse') - ancestor2 = tournament(population, 'rmse') + ancestor1 = tournament(population, 'f1') + ancestor2 = tournament(population, 'f1') - selected = tournament([ancestor1, ancestor2], 'len_lags') + selected = tournament([ancestor1, ancestor2], 'f2') return selected @@ -242,7 +248,7 @@ def crossover(parents): r1 = random.randint(0, n) r2 = random.randint(0, n) - if parents[r1]['rmse'] < parents[r2]['rmse']: + if parents[r1]['f1'] < parents[r2]['f1']: best = parents[r1] worst = parents[r2] else: @@ -315,8 +321,8 @@ def mutation(individual, pmut): # Chama a função mutation_lags individual['lags'] = mutation_lags( individual['lags'], individual['order']) - individual['rmse'] = None - individual['len_lags'] = None + individual['f1'] = None + individual['f2'] = None return individual @@ -329,12 +335,14 @@ def elitism(population, new_population): :param new_population: :return: ''' - population = sorted(population, key=itemgetter('rmse')) + population = sorted(population, key=itemgetter('f1')) best = population[0] - new_population = sorted(new_population, key=itemgetter('rmse')) - if new_population[0]["rmse"] > best["rmse"]: + new_population = sorted(new_population, key=itemgetter('f1')) + if new_population[0]["f1"] > best["f1"]: new_population.insert(0,best) + elif new_population[0]["f1"] == best["f1"] and new_population[0]["f2"] > best["f2"]: + new_population.insert(0, best) return new_population @@ -363,6 +371,10 @@ def GeneticAlgorithm(dataset, **kwargs): npop = kwargs.get('npop',20) pcruz = kwargs.get('pcruz',.5) pmut = kwargs.get('pmut',.3) + distributed = kwargs.get('distributed', False) + + if distributed == 'dispy': + cluster = kwargs.pop('cluster', None) collect_statistics = kwargs.get('collect_statistics', False) @@ -376,8 +388,25 @@ def GeneticAlgorithm(dataset, **kwargs): best = population[1] print("Evaluating initial population {}".format(time.time())) - for individual in population: - individual['len_lags'], individual['rmse'] = evaluate(dataset, individual, **kwargs) + if not distributed: + for individual in population: + ret = evaluate(dataset, individual, **kwargs) + for key in __measures: + individual[key] = ret[key] + elif distributed=='dispy': + jobs = [] + for ct, individual in enumerate(population): + job = cluster.submit(dataset, individual, **kwargs) + job.id = ct + jobs.append(job) + for job in jobs: + result = job() + if job.status == dispy.DispyJob.Finished and result is not None: + for key in __measures: + population[job.id][key] = result[key] + else: + print(job.exception) + print(job.stdout) for i in range(ngen): print("GENERATION {} {}".format(i, time.time())) @@ -400,16 +429,41 @@ def GeneticAlgorithm(dataset, **kwargs): new_population[ct] = mutation(individual, pmut) # Evaluation - _f1 = _f2 = [] - for individual in new_population: - f1, f2 = evaluate(dataset, individual, **kwargs) - individual['len_lags'], individual['rmse'] = f1, f2 - if collect_statistics: - _f1.append(f1) - _f2.append(f2) + if collect_statistics: + stats = {} + for key in __measures: + stats[key] = [] + + if not distributed: + for individual in new_population: + ret = evaluate(dataset, individual, **kwargs) + for key in __measures: + individual[key] = ret[key] + if collect_statistics: stats[key].append(ret[key]) + + elif distributed == 'dispy': + jobs = [] + + for ct, individual in enumerate(new_population): + job = cluster.submit(dataset, individual, **kwargs) + job.id = ct + jobs.append(job) + for job in jobs: + print('job id {}'.format(job.id)) + result = job() + if job.status == dispy.DispyJob.Finished and result is not None: + for key in __measures: + new_population[job.id][key] = result[key] + if collect_statistics: stats[key].append(ret[key]) + else: + print(job.exception) + print(job.stdout) + if collect_statistics: - generation_statistics['population'] = {'f1': np.nanmedian(_f1), 'f2': np.nanmedian(_f2)} + mean_stats = {key: np.nanmedian(stats[key]) for key in __measures } + + generation_statistics['population'] = mean_stats # Elitism population = elitism(population, new_population) @@ -423,11 +477,11 @@ def GeneticAlgorithm(dataset, **kwargs): best = population[0] if collect_statistics: - generation_statistics['best'] = {'f1': best["len_lags"], 'f2': best["rmse"]} + generation_statistics['best'] = {key: best[key] for key in __measures } statistics.append(generation_statistics) - if last_best['rmse'] <= best['rmse'] and last_best['len_lags'] <= best['len_lags']: + if last_best['f1'] <= best['f1'] and last_best['f2'] <= best['f2']: no_improvement_count += 1 print("WITHOUT IMPROVEMENT {}".format(no_improvement_count)) pmut += .05 @@ -444,35 +498,15 @@ def GeneticAlgorithm(dataset, **kwargs): return best, statistics -def cluster_method(dataset, **kwargs): - from pyFTS.hyperparam.Evolutionary import GeneticAlgorithm - - inicio = time.time() - ret, statistics = GeneticAlgorithm(dataset, **kwargs) - fim = time.time() - ret['time'] = fim - inicio - ret['size'] = ret['len_lags'] - return ret, statistics - - -def process_jobs(jobs, datasetname, conn): - for job in jobs: - result,statistics = job() - if job.status == dispy.DispyJob.Finished and result is not None: - print("Processing result of {}".format(result)) - - log_result(conn, datasetname, result) - - persist_statistics(statistics) - - else: - print(job.exception) - print(job.stdout) +def process_experiment(result, datasetname, conn): + log_result(conn, datasetname, result['individual']) + persist_statistics(result['statistics']) + return result['individual'] def persist_statistics(statistics): import json - with open('statistics.txt', 'w') as file: + with open('statistics{}.txt'.format(time.time()), 'w') as file: file.write(json.dumps(statistics)) @@ -491,33 +525,29 @@ def log_result(conn, datasetname, result): def execute(datasetname, dataset, **kwargs): conn = hUtil.open_hyperparam_db('hyperparam.db') - distributed = kwargs.get('distributed', False) - experiments = kwargs.get('experiments', 30) - if not distributed: - ret = [] - for i in range(experiments): - result, statistics = cluster_method(dataset, **kwargs) - log_result(conn, datasetname, result) - persist_statistics(statistics) - ret.append(result) + distributed = kwargs.get('distributed', False) - return result - - elif distributed=='dispy': + if distributed == 'dispy': nodes = kwargs.get('nodes', ['127.0.0.1']) + cluster, http_server = dUtil.start_dispy_cluster(evaluate, nodes=nodes) + kwargs['cluster'] = cluster - cluster, http_server = dispy.start_dispy_cluster(cluster_method, nodes=nodes) + ret = [] + for i in range(experiments): + print("Experiment {}".format(i)) + start = time.time() + ret, statistics = GeneticAlgorithm(dataset, **kwargs) + end = time.time() + ret['time'] = end - start + experiment = {'individual': ret, 'statistics': statistics} - jobs = [] + ret = process_experiment(experiment, datasetname, conn) - for i in range(experiments): - print("Experiment {}".format(i)) - job = cluster.submit(dataset, **kwargs) - jobs.append(job) + if distributed == 'dispy': + dUtil.stop_dispy_cluster(cluster, http_server) - process_jobs(jobs, datasetname, conn) + return ret - dispy.stop_dispy_cluster(cluster, http_server) diff --git a/pyFTS/hyperparam/GridSearch.py b/pyFTS/hyperparam/GridSearch.py index 9922721..4b26880 100644 --- a/pyFTS/hyperparam/GridSearch.py +++ b/pyFTS/hyperparam/GridSearch.py @@ -4,8 +4,9 @@ from pyFTS.models import hofts from pyFTS.partitioners import Grid, Entropy from pyFTS.benchmarks import Measures from pyFTS.hyperparam import Util as hUtil -import numpy as np +from pyFTS.distributed import dispy as dUtil import dispy +import numpy as np from itertools import product @@ -20,11 +21,12 @@ def dict_individual(mf, partitioner, partitions, order, lags, alpha_cut): } -def cluster_method(individual, train, test): +def cluster_method(individual, dataset, **kwargs): from pyFTS.common import Util, Membership from pyFTS.models import hofts from pyFTS.partitioners import Grid, Entropy from pyFTS.benchmarks import Measures + import numpy as np if individual['mf'] == 1: mf = Membership.trimf @@ -35,42 +37,58 @@ def cluster_method(individual, train, test): else: mf = Membership.trimf - if individual['partitioner'] == 1: - partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) - elif individual['partitioner'] == 2: - npart = individual['npart'] if individual['npart'] > 10 else 10 - partitioner = Entropy.EntropyPartitioner(data=train, npart=npart, func=mf) + window_size = kwargs.get('window_size', 800) + train_rate = kwargs.get('train_rate', .8) + increment_rate = kwargs.get('increment_rate', .2) + parameters = kwargs.get('parameters', {}) + errors = [] + sizes = [] - model = hofts.WeightedHighOrderFTS(partitioner=partitioner, - lags=individual['lags'], - alpha_cut=individual['alpha'], - order=individual['order']) + for count, train, test in Util.sliding_window(dataset, window_size, train=train_rate, inc=increment_rate): - model.fit(train) + if individual['partitioner'] == 1: + partitioner = Grid.GridPartitioner(data=train, npart=individual['npart'], func=mf) + elif individual['partitioner'] == 2: + npart = individual['npart'] if individual['npart'] > 10 else 10 + partitioner = Entropy.EntropyPartitioner(data=train, npart=npart, func=mf) - rmse, mape, u = Measures.get_point_statistics(test, model) + model = hofts.WeightedHighOrderFTS(partitioner=partitioner, + lags=individual['lags'], + alpha_cut=individual['alpha'], + order=individual['order']) + model.fit(train) - size = len(model) + forecasts = model.predict(test) - return individual, rmse, size, mape, u + #rmse, mape, u = Measures.get_point_statistics(test, model) + rmse = Measures.rmse(test[model.max_lag:], forecasts) + + size = len(model) + + errors.append(rmse) + sizes.append(size) + + return {'parameters': individual, 'rmse': np.nanmean(errors), 'size': np.nanmean(size)} def process_jobs(jobs, datasetname, conn): - for job in jobs: - result, rmse, size, mape, u = job() + for ct, job in enumerate(jobs): + print("Processing job {}".format(ct)) + result = job() if job.status == dispy.DispyJob.Finished and result is not None: print("Processing result of {}".format(result)) - metrics = {'rmse': rmse, 'size': size, 'mape': mape, 'u': u } + metrics = {'rmse': result['rmse'], 'size': result['size']} for metric in metrics.keys(): - record = (datasetname, 'GridSearch', 'WHOFTS', None, result['mf'], - result['order'], result['partitioner'], result['npart'], - result['alpha'], str(result['lags']), metric, metrics[metric]) - - print(record) + param = result['parameters'] + + record = (datasetname, 'GridSearch', 'WHOFTS', None, param['mf'], + param['order'], param['partitioner'], param['npart'], + param['alpha'], str(param['lags']), metric, metrics[metric]) + hUtil.insert_hyperparam(record, conn) @@ -79,7 +97,7 @@ def process_jobs(jobs, datasetname, conn): print(job.stdout) -def execute(hyperparams, datasetname, train, test, **kwargs): +def execute(hyperparams, datasetname, dataset, **kwargs): nodes = kwargs.get('nodes',['127.0.0.1']) @@ -105,7 +123,7 @@ def execute(hyperparams, datasetname, train, test, **kwargs): print("Evaluation values: \n {}".format(hp_values)) - cluster, http_server = Util.start_dispy_cluster(cluster_method, nodes=nodes) + cluster, http_server = dUtil.start_dispy_cluster(cluster_method, nodes=nodes) conn = hUtil.open_hyperparam_db('hyperparam.db') for instance in product(*hp_values): @@ -133,12 +151,12 @@ def execute(hyperparams, datasetname, train, test, **kwargs): else: individuals.append(dict_individual(mf, partitioner, partitions, order, _lags, alpha_cut)) - if count > 50: + if count > 10: jobs = [] for ind in individuals: print("Testing individual {}".format(ind)) - job = cluster.submit(ind, train, test) + job = cluster.submit(ind, dataset, **kwargs) jobs.append(job) process_jobs(jobs, datasetname, conn) @@ -147,4 +165,4 @@ def execute(hyperparams, datasetname, train, test, **kwargs): individuals = [] - Util.stop_dispy_cluster(cluster, http_server) + dUtil.stop_dispy_cluster(cluster, http_server) diff --git a/pyFTS/probabilistic/Mixture.py b/pyFTS/probabilistic/Mixture.py new file mode 100644 index 0000000..4173e00 --- /dev/null +++ b/pyFTS/probabilistic/Mixture.py @@ -0,0 +1,27 @@ +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +from pyFTS.common import FuzzySet,SortedCollection,tree +from pyFTS.probabilistic import ProbabilityDistribution + + +class Mixture(ProbabilityDistribution.ProbabilityDistribution): + """ + + """ + def __init__(self, type="mixture", **kwargs): + self.models = [] + self.weights = [] + + def append_model(self,model, weight): + self.models.append(model) + self.weights.append(weight) + + def density(self, values): + if not isinstance(values, list): + values = [values] + + for ct, m in enumerate(self.models): + + probs = [m.density(values) ] + diff --git a/pyFTS/tests/hyperparam.py b/pyFTS/tests/hyperparam.py index b660d48..bb0b5db 100644 --- a/pyFTS/tests/hyperparam.py +++ b/pyFTS/tests/hyperparam.py @@ -1,27 +1,29 @@ import numpy as np +import pandas as pd from pyFTS.hyperparam import GridSearch, Evolutionary def get_dataset(): - #from pyFTS.data import SONDA - from pyFTS.data import Malaysia + from pyFTS.data import SONDA + #from pyFTS.data import Malaysia - #data = SONDA.get_data('temperature')[:1000] - data = Malaysia.get_data('temperature')[:1000] + #data = SONDA.get_data('temperature')[:3000] + data = pd.read_csv('https://query.data.world/s/6xfb5useuotbbgpsnm5b2l3wzhvw2i', sep=';') + #data = Malaysia.get_data('temperature')[:1000] + + return 'SONDA.glo_avg', data['glo_avg'].values #train, test + #return 'Malaysia.temperature', data #train, test - #return 'SONDA.glo_avg', data #train, test - return 'Malaysia.temperature', data #train, test -""" hyperparams = { - 'order':[1, 2, 3], + 'order':[3], 'partitions': np.arange(10,100,3), - 'partitioner': [1,2], - 'mf': [1, 2, 3, 4], - 'lags': np.arange(1,35,2), + 'partitioner': [1], + 'mf': [1], #, 2, 3, 4], + 'lags': np.arange(2, 7, 1), 'alpha': np.arange(.0, .5, .05) } - +''' hyperparams = { 'order':[3], #[1, 2], 'partitions': np.arange(10,100,10), @@ -30,16 +32,28 @@ hyperparams = { 'lags': np.arange(1, 10), 'alpha': [.0, .3, .5] } - +''' nodes = ['192.168.0.106', '192.168.0.110', '192.168.0.107'] datsetname, dataset = get_dataset() -#GridSearch.execute(hyperparams, ds, train, test, nodes=nodes) +#GridSearch.execute(hyperparams, datsetname, dataset, nodes=nodes, +# window_size=10000, train_rate=.9, increment_rate=1,) + +ret = Evolutionary.execute(datsetname, dataset, + ngen=30, npop=20, pcruz=.5, pmut=.3, + window_size=10000, train_rate=.9, increment_rate=1, + experiments=1, + distributed='dispy', nodes=nodes) + +#res = GridSearch.cluster_method({'mf':1, 'partitioner': 1, 'npart': 10, 'lags':[1], 'alpha': 0.0, 'order': 1}, +# dataset, window_size = 10000, train_rate = .9, increment_rate = 1) + +#print(res) #Evolutionary.cluster_method(dataset, 70, 20, .8, .3, 1) - +""" from pyFTS.models import hofts from pyFTS.partitioners import Grid from pyFTS.benchmarks import Measures @@ -63,11 +77,11 @@ ret = Evolutionary.execute(datsetname, dataset, parameters={'distributed': 'spark', 'url': 'spark://192.168.0.106:7077'}) print(ret) -""" + from pyFTS.hyperparam import Evolutionary -""" + from pyFTS.data import SONDA data = np.array(SONDA.get_data('glo_avg')) @@ -78,7 +92,7 @@ dataset = data[:1000000] del(data) -""" + import pandas as pd df = pd.read_csv('https://query.data.world/s/i7eb73c4rluf2luasppsyxaurx5ol7', sep=';') @@ -93,6 +107,9 @@ from time import time t1 = time() + + + Evolutionary.execute('SONDA', dataset, ngen=20, mgen=5, npop=15, pcruz=.5, pmut=.3, window_size=35000, train_rate=.6, increment_rate=1, @@ -102,4 +119,4 @@ Evolutionary.execute('SONDA', dataset, t2 = time() print(t2 - t1) - +"""