Distributed module

This commit is contained in:
Petrônio Cândido 2018-12-24 10:37:23 -02:00
parent 88e788cdca
commit e6247b0779
6 changed files with 239 additions and 135 deletions

View File

@ -207,128 +207,3 @@ def load_env(file):
def start_dispy_cluster(method, nodes):
import dispy, dispy.httpd, logging
cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000)
http_server = dispy.httpd.DispyHTTPServer(cluster)
return cluster, http_server
def stop_dispy_cluster(cluster, http_server):
cluster.wait() # wait for all jobs to finish
cluster.print_status()
http_server.shutdown() # this waits until browser gets all updates
cluster.close()
def simple_model_train(model, data, parameters):
model.train(data, **parameters)
return model
def distributed_train(model, train_method, nodes, fts_method, data, num_batches=10,
train_parameters={}, **kwargs):
import dispy, dispy.httpd, datetime
batch_save = kwargs.get('batch_save', False) # save model between batches
batch_save_interval = kwargs.get('batch_save_interval', 1)
file_path = kwargs.get('file_path', None)
cluster, http_server = start_dispy_cluster(train_method, nodes)
print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now()))
jobs = []
n = len(data)
batch_size = int(n / num_batches)
bcount = 1
for ct in range(model.order, n, batch_size):
if model.is_multivariate:
ndata = data.iloc[ct - model.order:ct + batch_size]
else:
ndata = data[ct - model.order: ct + batch_size]
tmp_model = fts_method(str(bcount))
tmp_model.clone_parameters(model)
job = cluster.submit(tmp_model, ndata, train_parameters)
job.id = bcount # associate an ID to identify jobs (if needed later)
jobs.append(job)
bcount += 1
for job in jobs:
print("[{0: %H:%M:%S}] Processing batch ".format(datetime.datetime.now()) + str(job.id))
tmp = job()
if job.status == dispy.DispyJob.Finished and tmp is not None:
model.merge(tmp)
if batch_save and (job.id % batch_save_interval) == 0:
persist_obj(model, file_path)
else:
print(job.exception)
print(job.stdout)
print("[{0: %H:%M:%S}] Finished batch ".format(datetime.datetime.now()) + str(job.id))
print("[{0: %H:%M:%S}] Distrituted Train Finished".format(datetime.datetime.now()))
stop_dispy_cluster(cluster, http_server)
return model
def simple_model_predict(model, data, parameters):
return model.predict(data, **parameters)
def distributed_predict(model, parameters, nodes, data, num_batches):
import dispy, dispy.httpd
cluster, http_server = start_dispy_cluster(simple_model_predict, nodes)
jobs = []
n = len(data)
batch_size = int(n / num_batches)
bcount = 1
for ct in range(model.order, n, batch_size):
if model.is_multivariate:
ndata = data.iloc[ct - model.order:ct + batch_size]
else:
ndata = data[ct - model.order: ct + batch_size]
job = cluster.submit(model, ndata, parameters)
job.id = bcount # associate an ID to identify jobs (if needed later)
jobs.append(job)
bcount += 1
ret = []
for job in jobs:
tmp = job()
if job.status == dispy.DispyJob.Finished and tmp is not None:
if job.id < batch_size:
ret.extend(tmp[:-1])
else:
ret.extend(tmp)
else:
print(job.exception)
print(job.stdout)
stop_dispy_cluster(cluster, http_server)
return ret

View File

@ -147,10 +147,21 @@ class FTS(object):
else:
nodes = kwargs.get("nodes", ['127.0.0.1'])
num_batches = kwargs.get('num_batches', 10)
if distributed == 'dispy':
from pyFTS.distributed import dispy
nodes = kwargs.get("nodes", ['127.0.0.1'])
num_batches = kwargs.get('num_batches', 10)
ret = dispy.distributed_predict(self, kwargs, nodes, ndata, num_batches)
elif distributed == 'spark':
from pyFTS.distributed import spark
nodes = kwargs.get("nodes", 'spark://192.168.0.110:7077')
app = kwargs.get("app", 'pyFTS')
ret = spark.distributed_predict(data=ndata, model=self, url=nodes, app=app)
ret = Util.distributed_predict(self, kwargs, nodes, ndata, num_batches)
if not self.is_multivariate:
kwargs['type'] = type
@ -323,12 +334,20 @@ class FTS(object):
batch_save_interval = kwargs.get('batch_save_interval', 10)
if distributed:
nodes = kwargs.get('nodes', False)
train_method = kwargs.get('train_method', Util.simple_model_train)
Util.distributed_train(self, train_method, nodes, type(self), data, num_batches, {},
batch_save=batch_save, file_path=file_path,
batch_save_interval=batch_save_interval)
if distributed is not None:
if distributed == 'dispy':
from pyFTS.distributed import dispy
nodes = kwargs.get('nodes', False)
train_method = kwargs.get('train_method', dispy.simple_model_train)
dispy.distributed_train(self, train_method, nodes, type(self), data, num_batches, {},
batch_save=batch_save, file_path=file_path,
batch_save_interval=batch_save_interval)
elif distributed == 'spark':
from pyFTS.distributed import spark
spark.distributed_train(self, data, self.partitioner,
url='spark://192.168.0.110:7077', app='pyFTS')
else:
if dump == 'time':

View File

127
pyFTS/distributed/dispy.py Normal file
View File

@ -0,0 +1,127 @@
import dispy, dispy.httpd, logging
from pyFTS.common import Util
def start_dispy_cluster(method, nodes):
cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000)
http_server = dispy.httpd.DispyHTTPServer(cluster)
return cluster, http_server
def stop_dispy_cluster(cluster, http_server):
cluster.wait() # wait for all jobs to finish
cluster.print_status()
http_server.shutdown() # this waits until browser gets all updates
cluster.close()
def simple_model_train(model, data, parameters):
model.train(data, **parameters)
return model
def distributed_train(model, train_method, nodes, fts_method, data, num_batches=10,
train_parameters={}, **kwargs):
import dispy, dispy.httpd, datetime
batch_save = kwargs.get('batch_save', False) # save model between batches
batch_save_interval = kwargs.get('batch_save_interval', 1)
file_path = kwargs.get('file_path', None)
cluster, http_server = start_dispy_cluster(train_method, nodes)
print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now()))
jobs = []
n = len(data)
batch_size = int(n / num_batches)
bcount = 1
for ct in range(model.order, n, batch_size):
if model.is_multivariate:
ndata = data.iloc[ct - model.order:ct + batch_size]
else:
ndata = data[ct - model.order: ct + batch_size]
tmp_model = fts_method(str(bcount))
tmp_model.clone_parameters(model)
job = cluster.submit(tmp_model, ndata, train_parameters)
job.id = bcount # associate an ID to identify jobs (if needed later)
jobs.append(job)
bcount += 1
for job in jobs:
print("[{0: %H:%M:%S}] Processing batch ".format(datetime.datetime.now()) + str(job.id))
tmp = job()
if job.status == dispy.DispyJob.Finished and tmp is not None:
model.merge(tmp)
if batch_save and (job.id % batch_save_interval) == 0:
Util.persist_obj(model, file_path)
else:
print(job.exception)
print(job.stdout)
print("[{0: %H:%M:%S}] Finished batch ".format(datetime.datetime.now()) + str(job.id))
print("[{0: %H:%M:%S}] Distrituted Train Finished".format(datetime.datetime.now()))
stop_dispy_cluster(cluster, http_server)
return model
def simple_model_predict(model, data, parameters):
return model.predict(data, **parameters)
def distributed_predict(model, parameters, nodes, data, num_batches):
import dispy, dispy.httpd
cluster, http_server = start_dispy_cluster(simple_model_predict, nodes)
jobs = []
n = len(data)
batch_size = int(n / num_batches)
bcount = 1
for ct in range(model.order, n, batch_size):
if model.is_multivariate:
ndata = data.iloc[ct - model.order:ct + batch_size]
else:
ndata = data[ct - model.order: ct + batch_size]
job = cluster.submit(model, ndata, parameters)
job.id = bcount # associate an ID to identify jobs (if needed later)
jobs.append(job)
bcount += 1
ret = []
for job in jobs:
tmp = job()
if job.status == dispy.DispyJob.Finished and tmp is not None:
if job.id < batch_size:
ret.extend(tmp[:-1])
else:
ret.extend(tmp)
else:
print(job.exception)
print(job.stdout)
stop_dispy_cluster(cluster, http_server)
return ret

View File

@ -0,0 +1,83 @@
import numpy as np
import pandas as pd
from pyFTS.data import Enrollments, TAIEX
from pyFTS.partitioners import Grid, Simple
from pyFTS.models import hofts
from pyspark import SparkConf
from pyspark import SparkContext
import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
conf = None
def get_conf(url, app):
"""
:param url:
:param app:
:return:
"""
if conf is None:
conf = SparkConf()
conf.setMaster(url)
conf.setAppName(app)
return conf
def get_partitioner(shared_partitioner):
"""
:param part:
:return:
"""
fs_tmp = Simple.SimplePartitioner()
for fset in shared_partitioner.value.keys():
fz = shared_partitioner.value[fset]
fs_tmp.append(fset, fz.mf, fz.parameters)
return fs_tmp
def slave_train(data):
"""
:param data:
:return:
"""
model = shared_method.value(partitioner=get_partitioner(shared_partitioner),
order=shared_order.value)
ndata = [k for k in data]
model.train(ndata)
return [(k, model.flrgs[k]) for k in model.flrgs]
def distributed_train(model, data, partitioner, url='spark://192.168.0.110:7077', app='pyFTS'):
with SparkContext(conf=get_conf(url=url, app=app)) as context:
shared_partitioner = context.broadcast(partitioner.sets)
flrgs = context.parallelize(data).mapPartitions(slave_train)
model = hofts.WeightedHighOrderFTS(partitioner=partitioner, order=shared_order.value)
for k in flrgs.collect():
model.append_rule(k[1])
return model
def distributed_predict(data, model, url='spark://192.168.0.110:7077', app='pyFTS'):
return None

View File

@ -5,7 +5,7 @@ setup(
packages=['pyFTS', 'pyFTS.benchmarks', 'pyFTS.common', 'pyFTS.data', 'pyFTS.models.ensemble',
'pyFTS.models', 'pyFTS.models.seasonal', 'pyFTS.partitioners', 'pyFTS.probabilistic',
'pyFTS.tests', 'pyFTS.models.nonstationary', 'pyFTS.models.multivariate',
'pyFTS.models.incremental', 'pyFTS.hyperparam'],
'pyFTS.models.incremental', 'pyFTS.hyperparam', 'pyFTS.distributed'],
version='1.4',
description='Fuzzy Time Series for Python',
author='Petronio Candido L. e Silva',