distributed.spark optimizations and bugfixes

This commit is contained in:
Petrônio Cândido 2019-01-23 20:31:03 -02:00
parent 6373afe136
commit e91d44afd1
6 changed files with 364 additions and 146 deletions

View File

@ -8,7 +8,6 @@ import dill
import numpy as np
def plot_rules(model, size=[5, 5], axis=None, rules_by_axis=None, columns=1):
if axis is None and rules_by_axis is None:
rows = 1
@ -126,8 +125,8 @@ def show_and_save_image(fig, file, flag, lgd=None):
:param flag: if True the image will be saved
:param lgd: legend
"""
plt.show()
if flag:
plt.show()
if lgd is not None:
fig.savefig(file, additional_artists=lgd,bbox_inches='tight') #bbox_extra_artists=(lgd,), )
else:

View File

@ -163,9 +163,7 @@ class FTS(object):
elif distributed == 'spark':
from pyFTS.distributed import spark
nodes = kwargs.get("nodes", 'spark://192.168.0.110:7077')
app = kwargs.get("app", 'pyFTS')
ret = spark.distributed_predict(data=ndata, model=self, url=nodes, app=app)
ret = spark.distributed_predict(data=ndata, model=self, **kwargs)
if not self.is_multivariate:

View File

@ -16,7 +16,21 @@ SPARK_ADDR = 'spark://192.168.0.110:7077'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
def create_spark_conf(**kwargs):
spark_executor_memory = kwargs.get("spark_executor_memory", "2g")
spark_driver_memory = kwargs.get("spark_driver_memory", "2g")
url = kwargs.get("url", SPARK_ADDR)
app = kwargs.get("app", 'pyFTS')
conf = SparkConf()
conf.setMaster(url)
conf.setAppName(app)
conf.set("spark.executor.memory", spark_executor_memory)
conf.set("spark.driver.memory", spark_driver_memory)
conf.set("spark.memory.offHeap.enabled",True)
conf.set("spark.memory.offHeap.size","16g")
return conf
def get_partitioner(shared_partitioner, type='common', variables=[]):
"""
@ -74,6 +88,16 @@ def get_variables(**parameters):
return (explanatory_variables, target_variable)
def create_univariate_model(**parameters):
if parameters['order'].value > 1:
model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']),
order=parameters['order'].value, alpha_cut=parameters['alpha_cut'].value,
lags=parameters['lags'].value)
else:
model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']),
alpha_cut=parameters['alpha_cut'].value)
return model
def slave_train_univariate(data, **parameters):
"""
@ -82,27 +106,32 @@ def slave_train_univariate(data, **parameters):
:return:
"""
if parameters['type'].value == 'common':
model = create_univariate_model(**parameters)
if parameters['order'].value > 1:
model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']),
order=parameters['order'].value, alpha_cut=parameters['alpha_cut'].value,
lags=parameters['lags'].value)
else:
model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']),
alpha_cut=parameters['alpha_cut'].value)
ndata = [k for k in data]
else:
pass
ndata = [k for k in data]
model.train(ndata)
return [(k, model.flrgs[k]) for k in model.flrgs.keys()]
def slave_forecast_univariate(data, **parameters):
"""
:param data:
:return:
"""
model = create_univariate_model(**parameters)
ndata = [k for k in data]
forecasts = model.predict(ndata)
return [(k, k) for k in forecasts]
def slave_train_multivariate(data, **parameters):
def create_multivariate_model(**parameters):
explanatory_variables, target_variable = get_variables(**parameters)
#vars = [(v.name, v.name) for v in explanatory_variables]
@ -129,8 +158,13 @@ def slave_train_multivariate(data, **parameters):
target_variable=target_variable,
alpha_cut=parameters['alpha_cut'].value)
return model
def slave_train_multivariate(data, **parameters):
model = create_multivariate_model(**parameters)
rows = [k for k in data]
ndata = pd.DataFrame.from_records(rows, columns=parameters['columns'].value)
@ -145,7 +179,68 @@ def slave_train_multivariate(data, **parameters):
return [(k, v) for k,v in model.flrgs.items()]
def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'):
def slave_forecast_multivariate(data, **parameters):
model = create_multivariate_model(**parameters)
rows = [k for k in data]
ndata = pd.DataFrame.from_records(rows, columns=parameters['columns'].value)
forecasts = model.predict(ndata)
return [(k, k) for k in forecasts]
def share_parameters(model, context, data):
parameters = {}
if not model.is_multivariate:
parameters['type'] = context.broadcast('common')
parameters['partitioner'] = context.broadcast(model.partitioner.sets)
parameters['alpha_cut'] = context.broadcast(model.alpha_cut)
parameters['order'] = context.broadcast(model.order)
parameters['method'] = context.broadcast(type(model))
parameters['lags'] = context.broadcast(model.lags)
parameters['max_lag'] = context.broadcast(model.max_lag)
else:
if model.is_clustered:
parameters['type'] = context.broadcast('clustered')
names = []
for name, fset in model.partitioner.sets.items():
names.append(name)
parameters['partitioner_{}'.format(name)] = context.broadcast([(k,v) for k,v in fset.sets.items()])
parameters['partitioner_names'] = context.broadcast(names)
else:
parameters['type'] = context.broadcast('multivariate')
names = []
for var in model.explanatory_variables:
#if var.data_type is None:
# raise Exception("It is mandatory to inform the data_type parameter for each variable when the training is distributed! ")
names.append(var.name)
parameters['{}_type'.format(var.name)] = context.broadcast(var.type)
#parameters['{}_data_type'.format(var.name)] = context.broadcast(var.data_type)
#parameters['{}_mask'.format(var.name)] = context.broadcast(var.mask)
parameters['{}_label'.format(var.name)] = context.broadcast(var.data_label)
parameters['{}_alpha'.format(var.name)] = context.broadcast(var.alpha_cut)
parameters['{}_partitioner'.format(var.name)] = context.broadcast(var.partitioner.sets)
parameters['{}_partitioner_type'.format(var.name)] = context.broadcast(var.partitioner.type)
parameters['variables'] = context.broadcast(names)
parameters['target'] = context.broadcast(model.target_variable.name)
parameters['columns'] = context.broadcast(data.columns.values)
parameters['alpha_cut'] = context.broadcast(model.alpha_cut)
parameters['order'] = context.broadcast(model.order)
parameters['method'] = context.broadcast(type(model))
parameters['lags'] = context.broadcast(model.lags)
parameters['max_lag'] = context.broadcast(model.max_lag)
return parameters
def distributed_train(model, data, **kwargs):
"""
@ -155,85 +250,34 @@ def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'):
:param app:
:return:
"""
conf = SparkConf()
conf.setMaster(url)
conf.setAppName(app)
conf.set("spark.executor.memory", "2g")
conf.set("spark.driver.memory", "2g")
conf.set("spark.memory.offHeap.enabled",True)
conf.set("spark.memory.offHeap.size","16g")
parameters = {}
num_batches = kwargs.get("num_batches", 4)
conf = create_spark_conf(**kwargs)
with SparkContext(conf=conf) as context:
nodes = context.defaultParallelism
parameters = share_parameters(model, context, data)
if not model.is_multivariate:
parameters['type'] = context.broadcast('common')
parameters['partitioner'] = context.broadcast(model.partitioner.sets)
parameters['alpha_cut'] = context.broadcast(model.alpha_cut)
parameters['order'] = context.broadcast(model.order)
parameters['method'] = context.broadcast(type(model))
parameters['lags'] = context.broadcast(model.lags)
parameters['max_lag'] = context.broadcast(model.max_lag)
func = lambda x: slave_train_univariate(x, **parameters)
flrgs = context.parallelize(data).repartition(nodes*4).mapPartitions(func)
flrgs = context.parallelize(data).repartition(nodes*num_batches).mapPartitions(func)
for k in flrgs.collect():
model.append_rule(k[1])
return model
else:
if model.is_clustered:
parameters['type'] = context.broadcast('clustered')
names = []
for name, fset in model.partitioner.sets.items():
names.append(name)
parameters['partitioner_{}'.format(name)] = context.broadcast([(k,v) for k,v in fset.sets.items()])
parameters['partitioner_names'] = context.broadcast(names)
else:
parameters['type'] = context.broadcast('multivariate')
names = []
for var in model.explanatory_variables:
#if var.data_type is None:
# raise Exception("It is mandatory to inform the data_type parameter for each variable when the training is distributed! ")
names.append(var.name)
parameters['{}_type'.format(var.name)] = context.broadcast(var.type)
#parameters['{}_data_type'.format(var.name)] = context.broadcast(var.data_type)
#parameters['{}_mask'.format(var.name)] = context.broadcast(var.mask)
parameters['{}_label'.format(var.name)] = context.broadcast(var.data_label)
parameters['{}_alpha'.format(var.name)] = context.broadcast(var.alpha_cut)
parameters['{}_partitioner'.format(var.name)] = context.broadcast(var.partitioner.sets)
parameters['{}_partitioner_type'.format(var.name)] = context.broadcast(var.partitioner.type)
parameters['variables'] = context.broadcast(names)
parameters['target'] = context.broadcast(model.target_variable.name)
parameters['columns'] = context.broadcast(data.columns.values)
data = data.to_dict(orient='records')
parameters['alpha_cut'] = context.broadcast(model.alpha_cut)
parameters['order'] = context.broadcast(model.order)
parameters['method'] = context.broadcast(type(model))
parameters['lags'] = context.broadcast(model.lags)
parameters['max_lag'] = context.broadcast(model.max_lag)
func = lambda x: slave_train_multivariate(x, **parameters)
flrgs = context.parallelize(data).mapPartitions(func)
for k in flrgs.collect():
print(k)
#for g in k:
# print(g)
#return
if parameters['type'].value == 'clustered':
if k[0] == 'counts':
for fset, count in k[1]:
@ -243,8 +287,46 @@ def distributed_train(model, data, url=SPARK_ADDR, app='pyFTS'):
else:
model.append_rule(k[1])
return model
return model
def distributed_predict(data, model, url=SPARK_ADDR, app='pyFTS'):
return None
def distributed_predict(data, model, **kwargs):
"""
:param model:
:param data:
:param url:
:param app:
:return:
"""
num_batches = kwargs.get("num_batches", 4)
conf = create_spark_conf(**kwargs)
ret = []
with SparkContext(conf=conf) as context:
nodes = context.defaultParallelism
parameters = share_parameters(model, context)
if not model.is_multivariate:
func = lambda x: slave_forecast_univariate(x, **parameters)
forecasts = context.parallelize(data).repartition(nodes * num_batches).mapPartitions(func)
else:
data = data.to_dict(orient='records')
func = lambda x: slave_forecast_multivariate(x, **parameters)
forecasts = context.parallelize(data).repartition(nodes * num_batches).mapPartitions(func)
for k in forecasts.collect():
ret.extend(k)
return ret

View File

@ -96,7 +96,7 @@ t1 = time()
Evolutionary.execute('SONDA', dataset,
ngen=20, mgen=5, npop=15, pcruz=.5, pmut=.3,
window_size=35000, train_rate=.6, increment_rate=1,
collect_statistics=True, experiments=1)
collect_statistics=True, experiments=5)
#distributed='dispy', nodes=['192.168.0.110','192.168.0.106','192.168.0.107'])
t2 = time()

View File

@ -1,20 +1,98 @@
import numpy as np
import pandas as pd
import matplotlib.pylab as plt
from pyFTS.data import TAIEX, Malaysia
from pyFTS.common import Transformations
import time
from pyFTS.benchmarks import Measures
from pyFTS.partitioners import Grid, Util as pUtil
from pyFTS.common import Transformations, Util
from pyFTS.models import pwfts
from pyFTS.models.multivariate import common, variable, mvfts, wmvfts
from pyFTS.data import Enrollments, TAIEX, SONDA
from pyFTS.partitioners import Grid, Simple
from pyFTS.common import Util
from pyspark import SparkConf
from pyspark import SparkContext
import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
#'''
data = SONDA.get_dataframe()
data = data[['datahora','glo_avg']]
data = data[~(np.isnan(data['glo_avg']) | np.equal(data['glo_avg'], 0.0))]
train = data.iloc[:1500000]
test = data.iloc[1500000:]
from pyFTS.models.multivariate import common, variable, wmvfts
from pyFTS.models.seasonal import partitioner as seasonal
from pyFTS.models.seasonal.common import DateTime
from pyFTS.partitioners import Grid
bc = Transformations.BoxCox(0)
tdiff = Transformations.Differential(1)
import matplotlib.pyplot as plt
from pyFTS.models.multivariate import common, variable, mvfts, cmvfts
'''
#fig, ax = plt.subplots(nrows=3, ncols=1, figsize=[15,5])
sp = {'seasonality': DateTime.day_of_year , 'names': ['Jan','Feb','Mar','Apr','May','Jun','Jul', 'Aug','Sep','Oct','Nov','Dec']}
vmonth = variable.Variable("Month", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=12, alpha_cut=.25,
data=train, partitioner_specific=sp)
#vmonth.partitioner.plot(ax[0])
sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k) for k in range(0,24)]}
vhour = variable.Variable("Hour", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=24, alpha_cut=.2,
data=train, partitioner_specific=sp)
#vhour.partitioner.plot(ax[1])
vavg = variable.Variable("Radiation", data_label="glo_avg", alias='R',
partitioner=Grid.GridPartitioner, npart=35, alpha_cut=.3,
data=train)
#vavg.partitioner.plot(ax[2])
#plt.tight_layout()
#Util.show_and_save_image(fig, 'variables', True)
model = wmvfts.WeightedMVFTS(explanatory_variables=[vmonth,vhour,vavg], target_variable=vavg)
_s1 = time.time()
model.fit(train)
#model.fit(data, distributed='spark', url='spark://192.168.0.106:7077', num_batches=4)
_s2 = time.time()
print(_s2-_s1)
Util.persist_obj(model, 'sonda_wmvfts')
'''
model = Util.load_obj('sonda_wmvfts')
'''
from pyFTS.benchmarks import Measures
_s1 = time.time()
print(Measures.get_point_statistics(test, model))
_s2 = time.time()
print(_s2-_s1)
'''
print(len(model))
#
#model.fit(data, distributed='dispy', nodes=['192.168.0.110'])
'''
from pyFTS.models.multivariate import common, variable, mvfts, wmvfts, cmvfts, grid
from pyFTS.models.seasonal import partitioner as seasonal
from pyFTS.models.seasonal.common import DateTime
@ -22,70 +100,83 @@ dataset = pd.read_csv('/home/petronio/Downloads/kalang.csv', sep=',')
dataset['date'] = pd.to_datetime(dataset["date"], format='%Y-%m-%d %H:%M:%S')
train_uv = dataset['value'].values[:24505]
test_uv = dataset['value'].values[24505:]
train_mv = dataset.iloc[:24505]
test_mv = dataset.iloc[24505:]
sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k)+'hs' for k in range(0,24)]}
vhour = variable.Variable("Hour", data_label="date", partitioner=seasonal.TimeGridPartitioner, npart=24,
data=train_mv, partitioner_specific=sp)
data=train_mv, partitioner_specific=sp, data_type=pd.datetime, mask='%Y-%m-%d %H:%M:%S')
vvalue = variable.Variable("Pollution", data_label="value", alias='value',
partitioner=Grid.GridPartitioner, npart=35,
partitioner=Grid.GridPartitioner, npart=35, data_type=np.float64,
data=train_mv)
parameters = [
{},{},
{'order':2, 'knn': 1},
{'order':2, 'knn': 2},
{'order':2, 'knn': 3},
]
#for ct, method in enumerate([, wmvfts.WeightedMVFTS,
# cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS]):
model = mvfts.MVFTS()
model.append_variable(vhour)
model.append_variable(vvalue)
model.target_variable = vvalue
model.fit(train_mv)
print(model)
fs = grid.GridCluster(explanatory_variables=[vhour, vvalue], target_variable=vvalue)
#model = wmvfts.WeightedMVFTS(explanatory_variables=[vhour, vvalue], target_variable=vvalue)
model = cmvfts.ClusteredMVFTS(explanatory_variables=[vhour, vvalue], target_variable=vvalue,
partitioner=fs)
model.fit(train_mv, distributed='spark', url='spark://192.168.0.106:7077')
#'''
#print(model)
'''
from pyFTS.data import henon
df = henon.get_dataframe(iterations=1000)
def fun(x):
return (x, x % 2)
from pyFTS.models.multivariate import variable, cmvfts
vx = variable.Variable("x", data_label="x", partitioner=Grid.GridPartitioner, npart=15, data=df)
vy = variable.Variable("y", data_label="y", partitioner=Grid.GridPartitioner, npart=15, data=df)
def get_fs():
fs_tmp = Simple.SimplePartitioner()
model = cmvfts.ClusteredMVFTS(pre_fuzzyfy=False, knn=3, order=2, fts_method=pwfts.ProbabilisticWeightedFTS)
model.append_variable(vx)
model.append_variable(vy)
model.target_variable = vx
for fset in part.value.keys():
fz = part.value[fset]
fs_tmp.append(fset, fz.mf, fz.parameters)
model.fit(df.iloc[:800])
return fs_tmp
test = df.iloc[800:]
def fuzzyfy(x):
fs_tmp = get_fs()
ret = []
for k in x:
ret.append(fs_tmp.fuzzyfy(k, mode='both'))
return ret
def train(fuzzyfied):
model = hofts.WeightedHighOrderFTS(partitioner=get_fs(), order=order.value)
ndata = [k for k in fuzzyfied]
model.train(ndata)
return [(k, model.flrgs[k]) for k in model.flrgs]
with SparkContext(conf=conf) as sc:
part = sc.broadcast(fs.sets)
order = sc.broadcast(2)
#ret = sc.parallelize(np.arange(0,100)).map(fun)
#fuzzyfied = sc.parallelize(data).mapPartitions(fuzzyfy)
flrgs = sc.parallelize(data).mapPartitions(train)
model = hofts.WeightedHighOrderFTS(partitioner=fs, order=order.value)
for k in flrgs.collect():
model.append_rule(k[1])
print(model)
'''
forecasts = model.predict(test, type='multivariate')
fig, ax = plt.subplots(nrows=2, ncols=2, figsize=[15,7])
ax[0][0].plot(test['x'].values)
ax[0][0].plot(forecasts['x'].values)
ax[0][1].scatter(test['x'].values,test['y'].values)
ax[0][1].scatter(forecasts['x'].values,forecasts['y'].values)
ax[1][0].scatter(test['y'].values,test['x'].values)
ax[1][0].scatter(forecasts['y'].values,forecasts['x'].values)
ax[1][1].plot(test['y'].values)
ax[1][1].plot(forecasts['y'].values)
print(forecasts)
'''

View File

@ -4,7 +4,7 @@ import time
from pyFTS.data import Enrollments, TAIEX, SONDA
from pyFTS.partitioners import Grid, Simple
from pyFTS.models import hofts
from pyFTS.common import Util
from pyspark import SparkConf
from pyspark import SparkContext
@ -14,18 +14,66 @@ import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
#'''
data = SONDA.get_data('glo_avg')
data = SONDA.get_dataframe()
fs = Grid.GridPartitioner(data=data, npart=50)
data = data[['datahora','glo_avg']]
data = data[~(np.isnan(data['glo_avg']) | np.equal(data['glo_avg'], 0.0))]
train = data.iloc[:1500000]
test = data.iloc[1500000:]
from pyFTS.models.multivariate import common, variable, wmvfts
from pyFTS.models.seasonal import partitioner as seasonal
from pyFTS.models.seasonal.common import DateTime
from pyFTS.partitioners import Grid
import matplotlib.pyplot as plt
#fig, ax = plt.subplots(nrows=3, ncols=1, figsize=[15,5])
sp = {'seasonality': DateTime.day_of_year , 'names': ['Jan','Feb','Mar','Apr','May','Jun','Jul', 'Aug','Sep','Oct','Nov','Dec']}
vmonth = variable.Variable("Month", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=12, alpha_cut=.25,
data=train, partitioner_specific=sp)
#vmonth.partitioner.plot(ax[0])
sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k) for k in range(0,24)]}
vhour = variable.Variable("Hour", data_label="datahora", partitioner=seasonal.TimeGridPartitioner, npart=24, alpha_cut=.2,
data=train, partitioner_specific=sp)
#vhour.partitioner.plot(ax[1])
vavg = variable.Variable("Radiation", data_label="glo_avg", alias='R',
partitioner=Grid.GridPartitioner, npart=35, alpha_cut=.3,
data=train)
#vavg.partitioner.plot(ax[2])
#plt.tight_layout()
#Util.show_and_save_image(fig, 'variables', True)
model = wmvfts.WeightedMVFTS(explanatory_variables=[vmonth,vhour,vavg], target_variable=vavg)
model = hofts.WeightedHighOrderFTS(partitioner=fs, order=2)
_s1 = time.time()
model.fit(data, distributed='spark', url='spark://192.168.0.106:7077')
#model.fit(train)
model.fit(data, distributed='spark', url='spark://192.168.0.106:7077', num_batches=5)
_s2 = time.time()
print(_s2-_s1)
Util.persist_obj(model, 'sonda_wmvfts')
from pyFTS.benchmarks import Measures
#print(Measures.get_point_statistics(test, model))
#model.fit(data, distributed='dispy', nodes=['192.168.0.110'])
'''
@ -56,7 +104,7 @@ model = cmvfts.ClusteredMVFTS(explanatory_variables=[vhour, vvalue], target_vari
model.fit(train_mv, distributed='spark', url='spark://192.168.0.106:7077')
#'''
print(model)
#print(model)
'''
def fun(x):