Minor bugfixes and features for speed up measuring in distributed methods

This commit is contained in:
Petrônio Cândido 2019-12-23 23:58:01 -03:00
parent 2d5414f01f
commit ed9c07deae
5 changed files with 260 additions and 65 deletions

View File

@ -1748,3 +1748,58 @@ def train_test_time(data, windowsize, train=0.8, **kwargs):
conn.close()
def distributed_model_train_test_time(models, data, windowsize, train=0.8, **kwargs):
"""
Assess the train and test times for a given list of configured models and save the results on a database.
:param models: A list of FTS models already configured, but not yet trained,
:param data: time series data, including train and test data
:param windowsize: Train/test data windows
:param train: Percent of data window that will be used to train the models
:param kwargs:
:return:
"""
import time
tag = __pop('tag', None, kwargs)
num_batches = kwargs.get('num_batches', 1)
dataset = __pop('dataset', None, kwargs)
file = kwargs.get('file', "benchmarks.db")
inc = __pop("inc", 0.5, kwargs)
conn = bUtil.open_benchmark_db(file)
for ct, train, test in cUtil.sliding_window(data, windowsize, train, inc=inc, **kwargs):
for id, model in enumerate(models):
print(dataset, model, ct)
model.fit(train, **kwargs)
for time in model.__dict__['training_time']:
job = {
'steps': num_batches, 'method': 'train', 'time': time,
'model': model.shortname, 'transformation': None,
'order': model.order, 'partitioner': None,
'partitions': None, 'size': len(model)
}
data = bUtil.process_common_data2(dataset, tag, 'train', job)
common_process_time_jobs(conn, data, job)
model.predict(train, **kwargs)
for time in model.__dict__['forecasting_time']:
job = {
'steps': num_batches, 'method': 'test', 'time': time,
'model': model.shortname, 'transformation': None,
'order': model.order, 'partitioner': None,
'partitions': None, 'size': len(model)
}
data = bUtil.process_common_data2(dataset, tag, 'test', job)
common_process_time_jobs(conn, data, job)
conn.close()

View File

@ -97,7 +97,6 @@ class FTS(object):
ndata = np.clip(ndata, self.original_min, self.original_max)
return ndata
def predict(self, data, **kwargs):
"""
Forecast using trained model
@ -117,6 +116,9 @@ class FTS(object):
:return: a numpy array with the forecasted data
"""
import copy
kw = copy.deepcopy(kwargs)
if self.is_multivariate:
ndata = data
@ -125,38 +127,38 @@ class FTS(object):
ndata = self.clip_uod(ndata)
if 'distributed' in kwargs:
distributed = kwargs.pop('distributed')
if 'distributed' in kw:
distributed = kw.pop('distributed')
else:
distributed = False
if 'type' in kwargs:
type = kwargs.pop("type")
if 'type' in kw:
type = kw.pop("type")
else:
type = 'point'
if distributed is None or distributed == False:
steps_ahead = kwargs.get("steps_ahead", None)
steps_ahead = kw.get("steps_ahead", None)
if steps_ahead == None or steps_ahead == 1:
if type == 'point':
ret = self.forecast(ndata, **kwargs)
ret = self.forecast(ndata, **kw)
elif type == 'interval':
ret = self.forecast_interval(ndata, **kwargs)
ret = self.forecast_interval(ndata, **kw)
elif type == 'distribution':
ret = self.forecast_distribution(ndata, **kwargs)
ret = self.forecast_distribution(ndata, **kw)
elif type == 'multivariate':
ret = self.forecast_multivariate(ndata, **kwargs)
ret = self.forecast_multivariate(ndata, **kw)
elif steps_ahead > 1:
if type == 'point':
ret = self.forecast_ahead(ndata, steps_ahead, **kwargs)
ret = self.forecast_ahead(ndata, steps_ahead, **kw)
elif type == 'interval':
ret = self.forecast_ahead_interval(ndata, steps_ahead, **kwargs)
ret = self.forecast_ahead_interval(ndata, steps_ahead, **kw)
elif type == 'distribution':
ret = self.forecast_ahead_distribution(ndata, steps_ahead, **kwargs)
ret = self.forecast_ahead_distribution(ndata, steps_ahead, **kw)
elif type == 'multivariate':
ret = self.forecast_ahead_multivariate(ndata, steps_ahead, **kwargs)
ret = self.forecast_ahead_multivariate(ndata, steps_ahead, **kw)
if not ['point', 'interval', 'distribution', 'multivariate'].__contains__(type):
raise ValueError('The argument \'type\' has an unknown value.')
@ -166,20 +168,22 @@ class FTS(object):
if distributed == 'dispy':
from pyFTS.distributed import dispy
nodes = kwargs.get("nodes", ['127.0.0.1'])
num_batches = kwargs.get('num_batches', 10)
nodes = kw.pop("nodes", ['127.0.0.1'])
num_batches = kw.pop('num_batches', 10)
ret = dispy.distributed_predict(self, kwargs, nodes, ndata, num_batches)
ret = dispy.distributed_predict(self, kw, nodes, ndata, num_batches, **kw)
elif distributed == 'spark':
from pyFTS.distributed import spark
ret = spark.distributed_predict(data=ndata, model=self, **kwargs)
ret = spark.distributed_predict(data=ndata, model=self, **kw)
if not self.is_multivariate:
kwargs['type'] = type
ret = self.apply_inverse_transformations(ret, params=[data[self.max_lag - 1:]], **kwargs)
kw['type'] = type
ret = self.apply_inverse_transformations(ret, params=[data[self.max_lag - 1:]], **kw)
if 'statistics' in kw:
kwargs['statistics'] = kw['statistics']
return ret
@ -312,7 +316,9 @@ class FTS(object):
"""
import datetime
import datetime, copy
kw = copy.deepcopy(kwargs)
if self.is_multivariate:
data = ndata
@ -322,29 +328,27 @@ class FTS(object):
self.original_min = np.nanmin(data)
self.original_max = np.nanmax(data)
if 'partitioner' in kwargs:
self.partitioner = kwargs.pop('partitioner')
if 'partitioner' in kw:
self.partitioner = kw.pop('partitioner')
if not self.is_multivariate and not self.is_wrapper and not self.benchmark_only:
if self.partitioner is None:
raise Exception("Fuzzy sets were not provided for the model. Use 'partitioner' parameter. ")
if 'order' in kwargs:
self.order = kwargs.pop('order')
if 'order' in kw:
self.order = kw.pop('order')
dump = kwargs.get('dump', None)
dump = kw.get('dump', None)
num_batches = kwargs.get('num_batches', None)
num_batches = kw.pop('num_batches', None)
save = kwargs.get('save_model', False) # save model on disk
save = kw.get('save_model', False) # save model on disk
batch_save = kwargs.get('batch_save', False) #save model between batches
batch_save = kw.get('batch_save', False) #save model between batches
file_path = kwargs.get('file_path', None)
file_path = kw.get('file_path', None)
distributed = kwargs.get('distributed', False)
batch_save_interval = kwargs.get('batch_save_interval', 10)
distributed = kw.pop('distributed', False)
if distributed is not None and distributed:
if num_batches is None:
@ -352,14 +356,13 @@ class FTS(object):
if distributed == 'dispy':
from pyFTS.distributed import dispy
nodes = kwargs.get('nodes', False)
nodes = kw.pop('nodes', False)
train_method = kwargs.get('train_method', dispy.simple_model_train)
dispy.distributed_train(self, train_method, nodes, type(self), data, num_batches, {},
batch_save=batch_save, file_path=file_path,
batch_save_interval=batch_save_interval)
**kw)
elif distributed == 'spark':
from pyFTS.distributed import spark
url = kwargs.get('url', 'spark://192.168.0.110:7077')
url = kwargs.get('url', 'spark://127.0.0.1:7077')
app = kwargs.get('app', 'pyFTS')
spark.distributed_train(self, data, url=url, app=app)
@ -388,7 +391,7 @@ class FTS(object):
else:
mdata = data[ct - self.order : ct + batch_size]
self.train(mdata, **kwargs)
self.train(mdata, **kw)
if batch_save:
Util.persist_obj(self,file_path)
@ -399,7 +402,7 @@ class FTS(object):
bcount += 1
else:
self.train(data, **kwargs)
self.train(data, **kw)
if dump == 'time':
print("[{0: %H:%M:%S}] Finish training".format(datetime.datetime.now()))
@ -407,6 +410,10 @@ class FTS(object):
if save:
Util.persist_obj(self, file_path)
if 'statistics' in kw:
kwargs['statistics'] = kw['statistics']
print(kwargs['statistics'])
def clone_parameters(self, model):
"""

View File

@ -27,7 +27,7 @@ def stop_dispy_cluster(cluster, http_server):
:param http_server:
:return:
"""
cluster.wait() # wait for all jobs to finish
#cluster.wait() # wait for all jobs to finish
cluster.print_status()
@ -44,6 +44,7 @@ def get_number_of_cpus(cluster):
def simple_model_train(model, data, parameters):
import time
"""
Cluster function that receives a FTS instance 'model' and train using the 'data' and 'parameters'
@ -52,7 +53,10 @@ def simple_model_train(model, data, parameters):
:param parameters: parameters for the training process
:return: the trained model
"""
_start = time.time()
model.train(data, **parameters)
_end = time.time()
model.__dict__['training_time'] = _end - _start
return model
@ -96,6 +100,9 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches=
tmp = job()
if job.status == dispy.DispyJob.Finished and tmp is not None:
model.merge(tmp)
if 'training_time' not in model.__dict__:
model.__dict__['training_time'] = []
model.__dict__['training_time'].append(tmp.__dict__['training_time'])
if batch_save and (job.id % batch_save_interval) == 0:
Util.persist_obj(model, file_path)
@ -113,13 +120,15 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches=
return model
def simple_model_predict(model, data, parameters):
return model.predict(data, **parameters)
import time
_start = time.time()
forecasts = model.predict(data, **parameters)
_stop = time.time()
return forecasts, _stop - _start
def distributed_predict(model, parameters, nodes, data, num_batches):
def distributed_predict(model, parameters, nodes, data, num_batches, **kwargs):
import dispy, dispy.httpd
cluster, http_server = start_dispy_cluster(simple_model_predict, nodes)
@ -146,9 +155,14 @@ def distributed_predict(model, parameters, nodes, data, num_batches):
tmp = job()
if job.status == dispy.DispyJob.Finished and tmp is not None:
if job.id < batch_size:
ret.extend(tmp[:-1])
ret.extend(tmp[0][:-1])
else:
ret.extend(tmp)
ret.extend(tmp[0])
if 'forecasting_time' not in model.__dict__:
model.__dict__['forecasting_time'] = []
model.__dict__['forecasting_time'].append(tmp[1])
else:
print(job.exception)
print(job.stdout)

View File

@ -16,7 +16,14 @@ SPARK_ADDR = 'spark://192.168.0.110:7077'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
def create_spark_conf(**kwargs):
"""
Configure the Spark master node
:param kwargs:
:return:
"""
spark_executor_memory = kwargs.get("spark_executor_memory", "2g")
spark_driver_memory = kwargs.get("spark_driver_memory", "2g")
url = kwargs.get("url", SPARK_ADDR)
@ -32,11 +39,15 @@ def create_spark_conf(**kwargs):
return conf
def get_partitioner(shared_partitioner, type='common', variables=[]):
"""
Return the UoD partitioner from the 'shared_partitioner' fuzzy sets
:param part:
:return:
:param shared_partitioner: the shared variable with the fuzzy sets
:param type: the type of the partitioner
:param variables: in case of a Multivariate FTS, the list of variables
:return: Partitioner object
"""
if type=='common':
fs_tmp = Simple.SimplePartitioner()
@ -52,6 +63,14 @@ def get_partitioner(shared_partitioner, type='common', variables=[]):
def get_clustered_partitioner(explanatory_variables, target_variable, **parameters):
"""
Return the UoD partitioner from the 'shared_partitioner' fuzzy sets, special case for
clustered multivariate FTS.
:param explanatory_variables: the list with the names of the explanatory variables
:param target_variable: the name of the target variable
:return: Partitioner object
"""
from pyFTS.models.multivariate.common import MultivariateFuzzySet
fs_tmp = mv_partitioner.MultivariatePartitioner(explanatory_variables=explanatory_variables,
target_variable=target_variable)
@ -67,6 +86,12 @@ def get_clustered_partitioner(explanatory_variables, target_variable, **paramete
def get_variables(**parameters):
"""
From the dictionary of parameters, return a tuple with the list of explanatory and target variables
:param parameters: dictionary of parameters
:return: a tuple with the list of explanatory and target variables
"""
explanatory_variables = []
target_variable = None
for name in parameters['variables'].value:
@ -88,7 +113,14 @@ def get_variables(**parameters):
return (explanatory_variables, target_variable)
def create_univariate_model(**parameters):
"""
From the dictionary of parameters, create an univariate FTS model
:param parameters: dictionary of parameters
:return: univariate FTS model
"""
if parameters['order'].value > 1:
model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']),
order=parameters['order'].value, alpha_cut=parameters['alpha_cut'].value,
@ -99,11 +131,14 @@ def create_univariate_model(**parameters):
return model
def slave_train_univariate(data, **parameters):
"""
Receive train data, train an univariate FTS model and return the learned rules
:param data:
:return:
:param data: train data
:param parameters: dictionary of parameters
:return: Key/value list of the learned rules
"""
model = create_univariate_model(**parameters)
@ -117,9 +152,11 @@ def slave_train_univariate(data, **parameters):
def slave_forecast_univariate(data, **parameters):
"""
Receive test data, create an univariate FTS model from the parameters and return the forecasted values
:param data:
:return:
:param data: test data
:param parameters: dictionary of parameters
:return: forecasted values from the data input
"""
model = create_univariate_model(**parameters)
@ -132,6 +169,13 @@ def slave_forecast_univariate(data, **parameters):
def create_multivariate_model(**parameters):
"""
From the dictionary of parameters, create a multivariate FTS model
:param parameters: dictionary of parameters
:return: multivariate FTS model
"""
explanatory_variables, target_variable = get_variables(**parameters)
#vars = [(v.name, v.name) for v in explanatory_variables]
@ -162,6 +206,13 @@ def create_multivariate_model(**parameters):
def slave_train_multivariate(data, **parameters):
"""
Receive train data, train a multivariate FTS model and return the learned rules
:param data: train data
:param parameters: dictionary of parameters
:return: Key/value list of the learned rules
"""
model = create_multivariate_model(**parameters)
@ -180,6 +231,13 @@ def slave_train_multivariate(data, **parameters):
def slave_forecast_multivariate(data, **parameters):
"""
Receive test data, create a multivariate FTS model from the parameters and return the forecasted values
:param data: test data
:param parameters: dictionary of parameters
:return: forecasted values from the data input
"""
model = create_multivariate_model(**parameters)
@ -192,6 +250,14 @@ def slave_forecast_multivariate(data, **parameters):
def share_parameters(model, context, data):
"""
Create a shared variable with a dictionary of the model parameters and hyperparameters
:param model: the FTS model to extract the parameters and hyperparameters
:param context: Spark context
:param data: dataset
:return: the shared variable with the dictionary of parameters
"""
parameters = {}
if not model.is_multivariate:
parameters['type'] = context.broadcast('common')
@ -242,13 +308,17 @@ def share_parameters(model, context, data):
def distributed_train(model, data, **kwargs):
"""
The main method for distributed training of FTS models using Spark clusters.
It takes an empty model and the train data, connect with the Spark cluster, proceed the
distributed training and return the learned model.
:param model:
:param data:
:param url:
:param app:
:return:
:param model: An empty (non-trained) FTS model
:param data: train data
:param url: URL of the Spark master node
:param app: Application name
:return: trained model
"""
num_batches = kwargs.get("num_batches", 4)
@ -292,13 +362,18 @@ def distributed_train(model, data, **kwargs):
def distributed_predict(data, model, **kwargs):
"""
The main method for distributed forecasting with FTS models using Spark clusters.
It takes a trained FTS model and the test data, connect with the Spark cluster,
proceed the distributed forecasting and return the merged forecasted values.
:param model:
:param data:
:param url:
:param model: an FTS trained model
:param data: test data
:param url: URL of the Spark master
:param app:
:return:
:return: forecasted values
"""
num_batches = kwargs.get("num_batches", 4)

View File

@ -1,6 +1,7 @@
from pyFTS.partitioners import Grid
from pyFTS.models import chen
from pyFTS.benchmarks import Measures
from pyFTS.common import Membership
from pyFTS.common import Util as cUtil, fts
import pandas as pd
import numpy as np
@ -8,12 +9,56 @@ import os
from pyFTS.common import Transformations
from copy import deepcopy
from pyFTS.models import pwfts
from pyFTS.models.multivariate import common, variable, mvfts, wmvfts
from pyFTS.benchmarks import benchmarks as bchmk, Measures
from pyFTS.models.seasonal import partitioner as seasonal
from pyFTS.models.seasonal.common import DateTime
import time
from pyFTS.data import SONDA, Malaysia
from pyFTS.data import Malaysia, SONDA
sonda = SONDA.get_dataframe()[['datahora','glo_avg']].iloc[:600000]
sonda['data'] = pd.to_datetime(sonda["datahora"], format='%Y-%m-%d %H:%M:%S')
sonda = sonda.drop(sonda.index[np.where(sonda["glo_avg"] <= 0.01)])
sonda = sonda.dropna()
print(sonda)
sp = {'seasonality': DateTime.day_of_year , 'names': ['Jan','Fev','Mar','Abr','Mai','Jun','Jul', 'Ago','Set','Out','Nov','Dez']}
vmonth = variable.Variable("Month", data_label="data", partitioner=seasonal.TimeGridPartitioner, npart=12,
data=sonda, partitioner_specific=sp)
sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k)+'hs' for k in range(0,24)]}
vhour = variable.Variable("Hour", data_label="data", partitioner=seasonal.TimeGridPartitioner, npart=24,
data=sonda, partitioner_specific=sp)
vavg = variable.Variable("Radiation", data_label="glo_avg", alias='rad',
partitioner=Grid.GridPartitioner, npart=35,
data=sonda)
model = wmvfts.WeightedMVFTS(explanatory_variables=[vhour, vhour, vavg], target_variable=vavg)
bchmk.distributed_model_train_test_time([model], sonda, 600000, 0.8, inc=1,
num_batches=7, distributed='dispy',nodes=['192.168.0.106','192.168.0.110'],
file='deho.db', tag='speedup', dataset='SONDA')
#model.fit(train_mv, num_batches=4, distributed='dispy',nodes=['192.168.0.106'])
#model.predict(test_mv, num_batches=4, distributed='dispy', nodes=['192.168.0.106'])
#print(model.__dict__['training_time'])
#print(model.__dict__['forecasting_time'])
'''
datasets = {}
sonda = SONDA.get_dataframe()[['datahora','glo_avg','ws_10m']]
@ -31,8 +76,6 @@ datasets['Malaysia.load'] = malaysia["load"].values
windows = [600000, 600000, 10000, 10000]
cpus = 7
for ct, (dataset_name, dataset) in enumerate(datasets.items()):
bchmk.train_test_time(dataset, windowsize=windows[ct], train=0.9, inc=.5,
methods=[pwfts.ProbabilisticWeightedFTS],
@ -43,3 +86,4 @@ for ct, (dataset_name, dataset) in enumerate(datasets.items()):
distributed='dispy', nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name,
tag="speedup")
'''