diff --git a/pyFTS/common/Util.py b/pyFTS/common/Util.py index 71ead6a..5bfce62 100644 --- a/pyFTS/common/Util.py +++ b/pyFTS/common/Util.py @@ -8,7 +8,6 @@ import dill import numpy as np - def plot_rules(model, size=[5, 5], axis=None, rules_by_axis=None, columns=1): if axis is None and rules_by_axis is None: rows = 1 @@ -126,8 +125,8 @@ def show_and_save_image(fig, file, flag, lgd=None): :param flag: if True the image will be saved :param lgd: legend """ + plt.show() if flag: - plt.show() if lgd is not None: fig.savefig(file, additional_artists=lgd,bbox_inches='tight') #bbox_extra_artists=(lgd,), ) else: diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index 2af341d..4cc4852 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -163,9 +163,7 @@ class FTS(object): elif distributed == 'spark': from pyFTS.distributed import spark - nodes = kwargs.get("nodes", 'spark://192.168.0.110:7077') - app = kwargs.get("app", 'pyFTS') - ret = spark.distributed_predict(data=ndata, model=self, url=nodes, app=app) + ret = spark.distributed_predict(data=ndata, model=self, **kwargs) if not self.is_multivariate: diff --git a/pyFTS/distributed/spark.py b/pyFTS/distributed/spark.py index c822574..8a8c935 100644 --- a/pyFTS/distributed/spark.py +++ b/pyFTS/distributed/spark.py @@ -16,7 +16,21 @@ SPARK_ADDR = 'spark://192.168.0.110:7077' os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3' +def create_spark_conf(**kwargs): + spark_executor_memory = kwargs.get("spark_executor_memory", "2g") + spark_driver_memory = kwargs.get("spark_driver_memory", "2g") + url = kwargs.get("url", SPARK_ADDR) + app = kwargs.get("app", 'pyFTS') + conf = SparkConf() + conf.setMaster(url) + conf.setAppName(app) + conf.set("spark.executor.memory", spark_executor_memory) + conf.set("spark.driver.memory", spark_driver_memory) + conf.set("spark.memory.offHeap.enabled",True) + conf.set("spark.memory.offHeap.size","16g") + + return conf def get_partitioner(shared_partitioner, type='common', variables=[]): """ @@ -74,6 +88,16 @@ def get_variables(**parameters): return (explanatory_variables, target_variable) +def create_univariate_model(**parameters): + if parameters['order'].value > 1: + model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']), + order=parameters['order'].value, alpha_cut=parameters['alpha_cut'].value, + lags=parameters['lags'].value) + else: + model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']), + alpha_cut=parameters['alpha_cut'].value) + + return model def slave_train_univariate(data, **parameters): """ @@ -82,27 +106,32 @@ def slave_train_univariate(data, **parameters): :return: """ - if parameters['type'].value == 'common': + model = create_univariate_model(**parameters) - if parameters['order'].value > 1: - model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']), - order=parameters['order'].value, alpha_cut=parameters['alpha_cut'].value, - lags=parameters['lags'].value) - else: - model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']), - alpha_cut=parameters['alpha_cut'].value) - - ndata = [k for k in data] - - else: - pass + ndata = [k for k in data] model.train(ndata) return [(k, model.flrgs[k]) for k in model.flrgs.keys()] + + +def slave_forecast_univariate(data, **parameters): + """ + + :param data: + :return: + """ + + model = create_univariate_model(**parameters) + + ndata = [k for k in data] + + forecasts = model.predict(ndata) + + return [(k, k) for k in forecasts] -def slave_train_multivariate(data, **parameters): +def create_multivariate_model(**parameters): explanatory_variables, target_variable = get_variables(**parameters) #vars = [(v.name, v.name) for v in explanatory_variables] @@ -129,8 +158,13 @@ def slave_train_multivariate(data, **parameters): target_variable=target_variable, alpha_cut=parameters['alpha_cut'].value) + return model +def slave_train_multivariate(data, **parameters): + + model = create_multivariate_model(**parameters) + rows = [k for k in data] ndata = pd.DataFrame.from_records(rows, columns=parameters['columns'].value) @@ -145,7 +179,68 @@ def slave_train_multivariate(data, **parameters): return [(k, v) for k,v in model.flrgs.items()] -def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'): +def slave_forecast_multivariate(data, **parameters): + + model = create_multivariate_model(**parameters) + + rows = [k for k in data] + ndata = pd.DataFrame.from_records(rows, columns=parameters['columns'].value) + + forecasts = model.predict(ndata) + + return [(k, k) for k in forecasts] + + +def share_parameters(model, context, data): + parameters = {} + if not model.is_multivariate: + parameters['type'] = context.broadcast('common') + parameters['partitioner'] = context.broadcast(model.partitioner.sets) + parameters['alpha_cut'] = context.broadcast(model.alpha_cut) + parameters['order'] = context.broadcast(model.order) + parameters['method'] = context.broadcast(type(model)) + parameters['lags'] = context.broadcast(model.lags) + parameters['max_lag'] = context.broadcast(model.max_lag) + else: + if model.is_clustered: + parameters['type'] = context.broadcast('clustered') + names = [] + for name, fset in model.partitioner.sets.items(): + names.append(name) + parameters['partitioner_{}'.format(name)] = context.broadcast([(k,v) for k,v in fset.sets.items()]) + + parameters['partitioner_names'] = context.broadcast(names) + + else: + parameters['type'] = context.broadcast('multivariate') + names = [] + for var in model.explanatory_variables: + #if var.data_type is None: + # raise Exception("It is mandatory to inform the data_type parameter for each variable when the training is distributed! ") + names.append(var.name) + parameters['{}_type'.format(var.name)] = context.broadcast(var.type) + #parameters['{}_data_type'.format(var.name)] = context.broadcast(var.data_type) + #parameters['{}_mask'.format(var.name)] = context.broadcast(var.mask) + parameters['{}_label'.format(var.name)] = context.broadcast(var.data_label) + parameters['{}_alpha'.format(var.name)] = context.broadcast(var.alpha_cut) + parameters['{}_partitioner'.format(var.name)] = context.broadcast(var.partitioner.sets) + parameters['{}_partitioner_type'.format(var.name)] = context.broadcast(var.partitioner.type) + + parameters['variables'] = context.broadcast(names) + parameters['target'] = context.broadcast(model.target_variable.name) + + parameters['columns'] = context.broadcast(data.columns.values) + + parameters['alpha_cut'] = context.broadcast(model.alpha_cut) + parameters['order'] = context.broadcast(model.order) + parameters['method'] = context.broadcast(type(model)) + parameters['lags'] = context.broadcast(model.lags) + parameters['max_lag'] = context.broadcast(model.max_lag) + + return parameters + + +def distributed_train(model, data, **kwargs): """ @@ -155,85 +250,34 @@ def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'): :param app: :return: """ - - conf = SparkConf() - conf.setMaster(url) - conf.setAppName(app) - conf.set("spark.executor.memory", "2g") - conf.set("spark.driver.memory", "2g") - conf.set("spark.memory.offHeap.enabled",True) - conf.set("spark.memory.offHeap.size","16g") - parameters = {} + + num_batches = kwargs.get("num_batches", 4) + + conf = create_spark_conf(**kwargs) with SparkContext(conf=conf) as context: nodes = context.defaultParallelism + + parameters = share_parameters(model, context, data) if not model.is_multivariate: - parameters['type'] = context.broadcast('common') - parameters['partitioner'] = context.broadcast(model.partitioner.sets) - parameters['alpha_cut'] = context.broadcast(model.alpha_cut) - parameters['order'] = context.broadcast(model.order) - parameters['method'] = context.broadcast(type(model)) - parameters['lags'] = context.broadcast(model.lags) - parameters['max_lag'] = context.broadcast(model.max_lag) - func = lambda x: slave_train_univariate(x, **parameters) - flrgs = context.parallelize(data).repartition(nodes*4).mapPartitions(func) + flrgs = context.parallelize(data).repartition(nodes*num_batches).mapPartitions(func) for k in flrgs.collect(): model.append_rule(k[1]) - return model else: - if model.is_clustered: - parameters['type'] = context.broadcast('clustered') - names = [] - for name, fset in model.partitioner.sets.items(): - names.append(name) - parameters['partitioner_{}'.format(name)] = context.broadcast([(k,v) for k,v in fset.sets.items()]) - - parameters['partitioner_names'] = context.broadcast(names) - - else: - parameters['type'] = context.broadcast('multivariate') - names = [] - for var in model.explanatory_variables: - #if var.data_type is None: - # raise Exception("It is mandatory to inform the data_type parameter for each variable when the training is distributed! ") - names.append(var.name) - parameters['{}_type'.format(var.name)] = context.broadcast(var.type) - #parameters['{}_data_type'.format(var.name)] = context.broadcast(var.data_type) - #parameters['{}_mask'.format(var.name)] = context.broadcast(var.mask) - parameters['{}_label'.format(var.name)] = context.broadcast(var.data_label) - parameters['{}_alpha'.format(var.name)] = context.broadcast(var.alpha_cut) - parameters['{}_partitioner'.format(var.name)] = context.broadcast(var.partitioner.sets) - parameters['{}_partitioner_type'.format(var.name)] = context.broadcast(var.partitioner.type) - - parameters['variables'] = context.broadcast(names) - parameters['target'] = context.broadcast(model.target_variable.name) - - parameters['columns'] = context.broadcast(data.columns.values) - + data = data.to_dict(orient='records') - parameters['alpha_cut'] = context.broadcast(model.alpha_cut) - parameters['order'] = context.broadcast(model.order) - parameters['method'] = context.broadcast(type(model)) - parameters['lags'] = context.broadcast(model.lags) - parameters['max_lag'] = context.broadcast(model.max_lag) - func = lambda x: slave_train_multivariate(x, **parameters) flrgs = context.parallelize(data).mapPartitions(func) for k in flrgs.collect(): - print(k) - #for g in k: - # print(g) - - #return if parameters['type'].value == 'clustered': if k[0] == 'counts': for fset, count in k[1]: @@ -243,8 +287,46 @@ def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'): else: model.append_rule(k[1]) - return model + return model -def distributed_predict(data, model, url=SPARK_ADDR, app='pyFTS'): - return None +def distributed_predict(data, model, **kwargs): + """ + + + :param model: + :param data: + :param url: + :param app: + :return: + """ + + num_batches = kwargs.get("num_batches", 4) + + conf = create_spark_conf(**kwargs) + + ret = [] + + with SparkContext(conf=conf) as context: + + nodes = context.defaultParallelism + + parameters = share_parameters(model, context) + + if not model.is_multivariate: + func = lambda x: slave_forecast_univariate(x, **parameters) + + forecasts = context.parallelize(data).repartition(nodes * num_batches).mapPartitions(func) + + else: + + data = data.to_dict(orient='records') + + func = lambda x: slave_forecast_multivariate(x, **parameters) + + forecasts = context.parallelize(data).repartition(nodes * num_batches).mapPartitions(func) + + for k in forecasts.collect(): + ret.extend(k) + + return ret diff --git a/pyFTS/tests/hyperparam.py b/pyFTS/tests/hyperparam.py index b9aaa11..b660d48 100644 --- a/pyFTS/tests/hyperparam.py +++ b/pyFTS/tests/hyperparam.py @@ -96,7 +96,7 @@ t1 = time() Evolutionary.execute('SONDA', dataset, ngen=20, mgen=5, npop=15, pcruz=.5, pmut=.3, window_size=35000, train_rate=.6, increment_rate=1, - collect_statistics=True, experiments=1) + collect_statistics=True, experiments=5) #distributed='dispy', nodes=['192.168.0.110','192.168.0.106','192.168.0.107']) t2 = time() diff --git a/pyFTS/tests/multivariate.py b/pyFTS/tests/multivariate.py index b1f98cf..58dbe54 100644 --- a/pyFTS/tests/multivariate.py +++ b/pyFTS/tests/multivariate.py @@ -1,20 +1,98 @@ +import numpy as np import pandas as pd -import matplotlib.pylab as plt -from pyFTS.data import TAIEX, Malaysia -from pyFTS.common import Transformations +import time -from pyFTS.benchmarks import Measures -from pyFTS.partitioners import Grid, Util as pUtil -from pyFTS.common import Transformations, Util -from pyFTS.models import pwfts -from pyFTS.models.multivariate import common, variable, mvfts, wmvfts +from pyFTS.data import Enrollments, TAIEX, SONDA +from pyFTS.partitioners import Grid, Simple +from pyFTS.common import Util + +from pyspark import SparkConf +from pyspark import SparkContext + +import os +# make sure pyspark tells workers to use python3 not 2 if both are installed +os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' +os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3' +#''' +data = SONDA.get_dataframe() + +data = data[['datahora','glo_avg']] + +data = data[~(np.isnan(data['glo_avg']) | np.equal(data['glo_avg'], 0.0))] + +train = data.iloc[:1500000] +test = data.iloc[1500000:] + +from pyFTS.models.multivariate import common, variable, wmvfts from pyFTS.models.seasonal import partitioner as seasonal from pyFTS.models.seasonal.common import DateTime +from pyFTS.partitioners import Grid -bc = Transformations.BoxCox(0) -tdiff = Transformations.Differential(1) +import matplotlib.pyplot as plt -from pyFTS.models.multivariate import common, variable, mvfts, cmvfts +''' +#fig, ax = plt.subplots(nrows=3, ncols=1, figsize=[15,5]) + + +sp = {'seasonality': DateTime.day_of_year , 'names': ['Jan','Feb','Mar','Apr','May','Jun','Jul', 'Aug','Sep','Oct','Nov','Dec']} + +vmonth = variable.Variable("Month", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=12, alpha_cut=.25, + data=train, partitioner_specific=sp) + +#vmonth.partitioner.plot(ax[0]) + +sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k) for k in range(0,24)]} + +vhour = variable.Variable("Hour", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=24, alpha_cut=.2, + data=train, partitioner_specific=sp) + +#vhour.partitioner.plot(ax[1]) + + +vavg = variable.Variable("Radiation", data_label="glo_avg", alias='R', + partitioner=Grid.GridPartitioner, npart=35, alpha_cut=.3, + data=train) + +#vavg.partitioner.plot(ax[2]) + +#plt.tight_layout() + +#Util.show_and_save_image(fig, 'variables', True) + +model = wmvfts.WeightedMVFTS(explanatory_variables=[vmonth,vhour,vavg], target_variable=vavg) + + +_s1 = time.time() +model.fit(train) +#model.fit(data, distributed='spark', url='spark://192.168.0.106:7077', num_batches=4) +_s2 = time.time() + +print(_s2-_s1) + +Util.persist_obj(model, 'sonda_wmvfts') +''' + +model = Util.load_obj('sonda_wmvfts') + +''' +from pyFTS.benchmarks import Measures + +_s1 = time.time() +print(Measures.get_point_statistics(test, model)) +_s2 = time.time() + +print(_s2-_s1) +''' + +print(len(model)) + + +# + +#model.fit(data, distributed='dispy', nodes=['192.168.0.110']) +''' + +from pyFTS.models.multivariate import common, variable, mvfts, wmvfts, cmvfts, grid from pyFTS.models.seasonal import partitioner as seasonal from pyFTS.models.seasonal.common import DateTime @@ -22,70 +100,83 @@ dataset = pd.read_csv('/home/petronio/Downloads/kalang.csv', sep=',') dataset['date'] = pd.to_datetime(dataset["date"], format='%Y-%m-%d %H:%M:%S') -train_uv = dataset['value'].values[:24505] -test_uv = dataset['value'].values[24505:] - train_mv = dataset.iloc[:24505] test_mv = dataset.iloc[24505:] sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k)+'hs' for k in range(0,24)]} vhour = variable.Variable("Hour", data_label="date", partitioner=seasonal.TimeGridPartitioner, npart=24, - data=train_mv, partitioner_specific=sp) + data=train_mv, partitioner_specific=sp, data_type=pd.datetime, mask='%Y-%m-%d %H:%M:%S') vvalue = variable.Variable("Pollution", data_label="value", alias='value', - partitioner=Grid.GridPartitioner, npart=35, + partitioner=Grid.GridPartitioner, npart=35, data_type=np.float64, data=train_mv) -parameters = [ - {},{}, - {'order':2, 'knn': 1}, - {'order':2, 'knn': 2}, - {'order':2, 'knn': 3}, -] - -#for ct, method in enumerate([, wmvfts.WeightedMVFTS, -# cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS]): -model = mvfts.MVFTS() - -model.append_variable(vhour) -model.append_variable(vvalue) -model.target_variable = vvalue -model.fit(train_mv) - -print(model) - +fs = grid.GridCluster(explanatory_variables=[vhour, vvalue], target_variable=vvalue) +#model = wmvfts.WeightedMVFTS(explanatory_variables=[vhour, vvalue], target_variable=vvalue) +model = cmvfts.ClusteredMVFTS(explanatory_variables=[vhour, vvalue], target_variable=vvalue, + partitioner=fs) +model.fit(train_mv, distributed='spark', url='spark://192.168.0.106:7077') +#''' +#print(model) ''' -from pyFTS.data import henon -df = henon.get_dataframe(iterations=1000) +def fun(x): + return (x, x % 2) -from pyFTS.models.multivariate import variable, cmvfts -vx = variable.Variable("x", data_label="x", partitioner=Grid.GridPartitioner, npart=15, data=df) -vy = variable.Variable("y", data_label="y", partitioner=Grid.GridPartitioner, npart=15, data=df) +def get_fs(): + fs_tmp = Simple.SimplePartitioner() -model = cmvfts.ClusteredMVFTS(pre_fuzzyfy=False, knn=3, order=2, fts_method=pwfts.ProbabilisticWeightedFTS) -model.append_variable(vx) -model.append_variable(vy) -model.target_variable = vx + for fset in part.value.keys(): + fz = part.value[fset] + fs_tmp.append(fset, fz.mf, fz.parameters) -model.fit(df.iloc[:800]) + return fs_tmp -test = df.iloc[800:] +def fuzzyfy(x): + + fs_tmp = get_fs() + + ret = [] + + for k in x: + ret.append(fs_tmp.fuzzyfy(k, mode='both')) + + return ret + + +def train(fuzzyfied): + model = hofts.WeightedHighOrderFTS(partitioner=get_fs(), order=order.value) + + ndata = [k for k in fuzzyfied] + + model.train(ndata) + + return [(k, model.flrgs[k]) for k in model.flrgs] + + +with SparkContext(conf=conf) as sc: + + part = sc.broadcast(fs.sets) + + order = sc.broadcast(2) + + #ret = sc.parallelize(np.arange(0,100)).map(fun) + + #fuzzyfied = sc.parallelize(data).mapPartitions(fuzzyfy) + + flrgs = sc.parallelize(data).mapPartitions(train) + + model = hofts.WeightedHighOrderFTS(partitioner=fs, order=order.value) + + for k in flrgs.collect(): + model.append_rule(k[1]) + + print(model) + +''' -forecasts = model.predict(test, type='multivariate') -fig, ax = plt.subplots(nrows=2, ncols=2, figsize=[15,7]) -ax[0][0].plot(test['x'].values) -ax[0][0].plot(forecasts['x'].values) -ax[0][1].scatter(test['x'].values,test['y'].values) -ax[0][1].scatter(forecasts['x'].values,forecasts['y'].values) -ax[1][0].scatter(test['y'].values,test['x'].values) -ax[1][0].scatter(forecasts['y'].values,forecasts['x'].values) -ax[1][1].plot(test['y'].values) -ax[1][1].plot(forecasts['y'].values) -print(forecasts) -''' \ No newline at end of file diff --git a/pyFTS/tests/spark.py b/pyFTS/tests/spark.py index b0ab098..525099e 100644 --- a/pyFTS/tests/spark.py +++ b/pyFTS/tests/spark.py @@ -4,7 +4,7 @@ import time from pyFTS.data import Enrollments, TAIEX, SONDA from pyFTS.partitioners import Grid, Simple -from pyFTS.models import hofts +from pyFTS.common import Util from pyspark import SparkConf from pyspark import SparkContext @@ -14,18 +14,66 @@ import os os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3' #''' -data = SONDA.get_data('glo_avg') +data = SONDA.get_dataframe() -fs = Grid.GridPartitioner(data=data, npart=50) +data = data[['datahora','glo_avg']] + +data = data[~(np.isnan(data['glo_avg']) | np.equal(data['glo_avg'], 0.0))] + +train = data.iloc[:1500000] +test = data.iloc[1500000:] + +from pyFTS.models.multivariate import common, variable, wmvfts +from pyFTS.models.seasonal import partitioner as seasonal +from pyFTS.models.seasonal.common import DateTime +from pyFTS.partitioners import Grid + +import matplotlib.pyplot as plt + +#fig, ax = plt.subplots(nrows=3, ncols=1, figsize=[15,5]) + + +sp = {'seasonality': DateTime.day_of_year , 'names': ['Jan','Feb','Mar','Apr','May','Jun','Jul', 'Aug','Sep','Oct','Nov','Dec']} + +vmonth = variable.Variable("Month", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=12, alpha_cut=.25, + data=train, partitioner_specific=sp) + +#vmonth.partitioner.plot(ax[0]) + +sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k) for k in range(0,24)]} + +vhour = variable.Variable("Hour", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=24, alpha_cut=.2, + data=train, partitioner_specific=sp) + +#vhour.partitioner.plot(ax[1]) + + +vavg = variable.Variable("Radiation", data_label="glo_avg", alias='R', + partitioner=Grid.GridPartitioner, npart=35, alpha_cut=.3, + data=train) + +#vavg.partitioner.plot(ax[2]) + +#plt.tight_layout() + +#Util.show_and_save_image(fig, 'variables', True) + +model = wmvfts.WeightedMVFTS(explanatory_variables=[vmonth,vhour,vavg], target_variable=vavg) -model = hofts.WeightedHighOrderFTS(partitioner=fs, order=2) _s1 = time.time() -model.fit(data, distributed='spark', url='spark://192.168.0.106:7077') +#model.fit(train) +model.fit(data, distributed='spark', url='spark://192.168.0.106:7077', num_batches=5) _s2 = time.time() print(_s2-_s1) +Util.persist_obj(model, 'sonda_wmvfts') + +from pyFTS.benchmarks import Measures + +#print(Measures.get_point_statistics(test, model)) + #model.fit(data, distributed='dispy', nodes=['192.168.0.110']) ''' @@ -56,7 +104,7 @@ model = cmvfts.ClusteredMVFTS(explanatory_variables=[vhour, vvalue], target_vari model.fit(train_mv, distributed='spark', url='spark://192.168.0.106:7077') #''' -print(model) +#print(model) ''' def fun(x):