import numpy as np
import pandas as pd
from pyFTS.data import Enrollments, TAIEX
from pyFTS.partitioners import Grid, Simple
from pyFTS.models.multivariate import partitioner as mv_partitioner
from pyFTS.models import hofts
from pyspark import SparkConf
from pyspark import SparkContext
import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
SPARK_ADDR = 'spark://192.168.0.110:7077'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
[docs]def create_spark_conf(**kwargs):
spark_executor_memory = kwargs.get("spark_executor_memory", "2g")
spark_driver_memory = kwargs.get("spark_driver_memory", "2g")
url = kwargs.get("url", SPARK_ADDR)
app = kwargs.get("app", 'pyFTS')
conf = SparkConf()
conf.setMaster(url)
conf.setAppName(app)
conf.set("spark.executor.memory", spark_executor_memory)
conf.set("spark.driver.memory", spark_driver_memory)
conf.set("spark.memory.offHeap.enabled",True)
conf.set("spark.memory.offHeap.size","16g")
return conf
[docs]def get_partitioner(shared_partitioner, type='common', variables=[]):
"""
:param part:
:return:
"""
if type=='common':
fs_tmp = Simple.SimplePartitioner()
for fset in shared_partitioner.value.keys():
fz = shared_partitioner.value[fset]
if type=='common':
fs_tmp.append_complex(fz)
elif type == 'multivariate':
fs_tmp.append(fz)
return fs_tmp
[docs]def get_clustered_partitioner(explanatory_variables, target_variable, **parameters):
from pyFTS.models.multivariate.common import MultivariateFuzzySet
fs_tmp = mv_partitioner.MultivariatePartitioner(explanatory_variables=explanatory_variables,
target_variable=target_variable)
for tmp in parameters['partitioner_names'].value:
fs = MultivariateFuzzySet(target_variable=target_variable)
for var, fset in parameters['partitioner_{}'.format(tmp)].value:
fs.append_set(var, fset)
fs_tmp.append(fs)
fs_tmp.build_index()
return fs_tmp
[docs]def get_variables(**parameters):
explanatory_variables = []
target_variable = None
for name in parameters['variables'].value:
from pyFTS.models.multivariate import common, variable
var = variable.Variable(name,
type=parameters['{}_type'.format(name)].value,
data_label=parameters['{}_label'.format(name)].value,
alpha_cut=parameters['{}_alpha'.format(name)].value,
#data_type=parameters['{}_data_type'.format(name)].value,
#mask=parameters['{}_mask'.format(name)].value,
)
var.partitioner = get_partitioner(parameters['{}_partitioner'.format(name)])
var.partitioner.type = parameters['{}_partitioner_type'.format(name)].value
explanatory_variables.append(var)
if var.name == parameters['target'].value:
target_variable = var
return (explanatory_variables, target_variable)
[docs]def create_univariate_model(**parameters):
if parameters['order'].value > 1:
model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']),
order=parameters['order'].value, alpha_cut=parameters['alpha_cut'].value,
lags=parameters['lags'].value)
else:
model = parameters['method'].value(partitioner=get_partitioner(parameters['partitioner']),
alpha_cut=parameters['alpha_cut'].value)
return model
[docs]def slave_train_univariate(data, **parameters):
"""
:param data:
:return:
"""
model = create_univariate_model(**parameters)
ndata = [k for k in data]
model.train(ndata)
return [(k, model.flrgs[k]) for k in model.flrgs.keys()]
[docs]def slave_forecast_univariate(data, **parameters):
"""
:param data:
:return:
"""
model = create_univariate_model(**parameters)
ndata = [k for k in data]
forecasts = model.predict(ndata)
return [(k, k) for k in forecasts]
[docs]def create_multivariate_model(**parameters):
explanatory_variables, target_variable = get_variables(**parameters)
#vars = [(v.name, v.name) for v in explanatory_variables]
#return [('vars', vars), ('target',[target_variable.name])]
if parameters['type'].value == 'clustered':
fs = get_clustered_partitioner(explanatory_variables, target_variable, **parameters)
model = parameters['method'].value(explanatory_variables=explanatory_variables,
target_variable=target_variable,
partitioner=fs,
order=parameters['order'].value,
alpha_cut=parameters['alpha_cut'].value,
lags=parameters['lags'].value)
else:
if parameters['order'].value > 1:
model = parameters['method'].value(explanatory_variables=explanatory_variables,
target_variable=target_variable,
order=parameters['order'].value,
alpha_cut=parameters['alpha_cut'].value,
lags=parameters['lags'].value)
else:
model = parameters['method'].value(explanatory_variables=explanatory_variables,
target_variable=target_variable,
alpha_cut=parameters['alpha_cut'].value)
return model
[docs]def slave_train_multivariate(data, **parameters):
model = create_multivariate_model(**parameters)
rows = [k for k in data]
ndata = pd.DataFrame.from_records(rows, columns=parameters['columns'].value)
model.train(ndata)
if parameters['type'].value == 'clustered':
counts = [(fset, count) for fset,count in model.partitioner.count.items()]
flrgs = [(k, v) for k,v in model.flrgs.items()]
return [('counts', counts), ('flrgs', flrgs)]
else:
return [(k, v) for k,v in model.flrgs.items()]
[docs]def slave_forecast_multivariate(data, **parameters):
model = create_multivariate_model(**parameters)
rows = [k for k in data]
ndata = pd.DataFrame.from_records(rows, columns=parameters['columns'].value)
forecasts = model.predict(ndata)
return [(k, k) for k in forecasts]
[docs]def share_parameters(model, context, data):
parameters = {}
if not model.is_multivariate:
parameters['type'] = context.broadcast('common')
parameters['partitioner'] = context.broadcast(model.partitioner.sets)
parameters['alpha_cut'] = context.broadcast(model.alpha_cut)
parameters['order'] = context.broadcast(model.order)
parameters['method'] = context.broadcast(type(model))
parameters['lags'] = context.broadcast(model.lags)
parameters['max_lag'] = context.broadcast(model.max_lag)
else:
if model.is_clustered:
parameters['type'] = context.broadcast('clustered')
names = []
for name, fset in model.partitioner.sets.items():
names.append(name)
parameters['partitioner_{}'.format(name)] = context.broadcast([(k,v) for k,v in fset.sets.items()])
parameters['partitioner_names'] = context.broadcast(names)
else:
parameters['type'] = context.broadcast('multivariate')
names = []
for var in model.explanatory_variables:
#if var.data_type is None:
# raise Exception("It is mandatory to inform the data_type parameter for each variable when the training is distributed! ")
names.append(var.name)
parameters['{}_type'.format(var.name)] = context.broadcast(var.type)
#parameters['{}_data_type'.format(var.name)] = context.broadcast(var.data_type)
#parameters['{}_mask'.format(var.name)] = context.broadcast(var.mask)
parameters['{}_label'.format(var.name)] = context.broadcast(var.data_label)
parameters['{}_alpha'.format(var.name)] = context.broadcast(var.alpha_cut)
parameters['{}_partitioner'.format(var.name)] = context.broadcast(var.partitioner.sets)
parameters['{}_partitioner_type'.format(var.name)] = context.broadcast(var.partitioner.type)
parameters['variables'] = context.broadcast(names)
parameters['target'] = context.broadcast(model.target_variable.name)
parameters['columns'] = context.broadcast(data.columns.values)
parameters['alpha_cut'] = context.broadcast(model.alpha_cut)
parameters['order'] = context.broadcast(model.order)
parameters['method'] = context.broadcast(type(model))
parameters['lags'] = context.broadcast(model.lags)
parameters['max_lag'] = context.broadcast(model.max_lag)
return parameters
[docs]def distributed_train(model, data, **kwargs):
"""
:param model:
:param data:
:param url:
:param app:
:return:
"""
num_batches = kwargs.get("num_batches", 4)
conf = create_spark_conf(**kwargs)
with SparkContext(conf=conf) as context:
nodes = context.defaultParallelism
parameters = share_parameters(model, context, data)
if not model.is_multivariate:
func = lambda x: slave_train_univariate(x, **parameters)
flrgs = context.parallelize(data).repartition(nodes*num_batches).mapPartitions(func)
for k in flrgs.collect():
model.append_rule(k[1])
else:
data = data.to_dict(orient='records')
func = lambda x: slave_train_multivariate(x, **parameters)
flrgs = context.parallelize(data).mapPartitions(func)
for k in flrgs.collect():
if parameters['type'].value == 'clustered':
if k[0] == 'counts':
for fset, count in k[1]:
model.partitioner.count[fset] = count
elif k[0] == 'flrgs':
model.append_rule(k[1])
else:
model.append_rule(k[1])
return model
[docs]def distributed_predict(data, model, **kwargs):
"""
:param model:
:param data:
:param url:
:param app:
:return:
"""
num_batches = kwargs.get("num_batches", 4)
conf = create_spark_conf(**kwargs)
ret = []
with SparkContext(conf=conf) as context:
nodes = context.defaultParallelism
parameters = share_parameters(model, context)
if not model.is_multivariate:
func = lambda x: slave_forecast_univariate(x, **parameters)
forecasts = context.parallelize(data).repartition(nodes * num_batches).mapPartitions(func)
else:
data = data.to_dict(orient='records')
func = lambda x: slave_forecast_multivariate(x, **parameters)
forecasts = context.parallelize(data).repartition(nodes * num_batches).mapPartitions(func)
for k in forecasts.collect():
ret.extend(k)
return ret