diff --git a/pyFTS/common/Util.py b/pyFTS/common/Util.py index e24afb6..b73b0b2 100644 --- a/pyFTS/common/Util.py +++ b/pyFTS/common/Util.py @@ -207,128 +207,3 @@ def load_env(file): -def start_dispy_cluster(method, nodes): - import dispy, dispy.httpd, logging - - cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000) - - http_server = dispy.httpd.DispyHTTPServer(cluster) - - return cluster, http_server - - - -def stop_dispy_cluster(cluster, http_server): - cluster.wait() # wait for all jobs to finish - - cluster.print_status() - - http_server.shutdown() # this waits until browser gets all updates - cluster.close() - - - -def simple_model_train(model, data, parameters): - model.train(data, **parameters) - return model - - -def distributed_train(model, train_method, nodes, fts_method, data, num_batches=10, - train_parameters={}, **kwargs): - import dispy, dispy.httpd, datetime - - batch_save = kwargs.get('batch_save', False) # save model between batches - - batch_save_interval = kwargs.get('batch_save_interval', 1) - - file_path = kwargs.get('file_path', None) - - cluster, http_server = start_dispy_cluster(train_method, nodes) - - print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now())) - - jobs = [] - n = len(data) - batch_size = int(n / num_batches) - bcount = 1 - for ct in range(model.order, n, batch_size): - if model.is_multivariate: - ndata = data.iloc[ct - model.order:ct + batch_size] - else: - ndata = data[ct - model.order: ct + batch_size] - - tmp_model = fts_method(str(bcount)) - - tmp_model.clone_parameters(model) - - job = cluster.submit(tmp_model, ndata, train_parameters) - job.id = bcount # associate an ID to identify jobs (if needed later) - jobs.append(job) - - bcount += 1 - - for job in jobs: - print("[{0: %H:%M:%S}] Processing batch ".format(datetime.datetime.now()) + str(job.id)) - tmp = job() - if job.status == dispy.DispyJob.Finished and tmp is not None: - model.merge(tmp) - - if batch_save and (job.id % batch_save_interval) == 0: - persist_obj(model, file_path) - - else: - print(job.exception) - print(job.stdout) - - print("[{0: %H:%M:%S}] Finished batch ".format(datetime.datetime.now()) + str(job.id)) - - print("[{0: %H:%M:%S}] Distrituted Train Finished".format(datetime.datetime.now())) - - stop_dispy_cluster(cluster, http_server) - - return model - - - -def simple_model_predict(model, data, parameters): - return model.predict(data, **parameters) - - - -def distributed_predict(model, parameters, nodes, data, num_batches): - import dispy, dispy.httpd - - cluster, http_server = start_dispy_cluster(simple_model_predict, nodes) - - jobs = [] - n = len(data) - batch_size = int(n / num_batches) - bcount = 1 - for ct in range(model.order, n, batch_size): - if model.is_multivariate: - ndata = data.iloc[ct - model.order:ct + batch_size] - else: - ndata = data[ct - model.order: ct + batch_size] - - job = cluster.submit(model, ndata, parameters) - job.id = bcount # associate an ID to identify jobs (if needed later) - jobs.append(job) - - bcount += 1 - - ret = [] - - for job in jobs: - tmp = job() - if job.status == dispy.DispyJob.Finished and tmp is not None: - if job.id < batch_size: - ret.extend(tmp[:-1]) - else: - ret.extend(tmp) - else: - print(job.exception) - print(job.stdout) - - stop_dispy_cluster(cluster, http_server) - - return ret diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index 070ce50..d527a4b 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -147,10 +147,21 @@ class FTS(object): else: - nodes = kwargs.get("nodes", ['127.0.0.1']) - num_batches = kwargs.get('num_batches', 10) + if distributed == 'dispy': + from pyFTS.distributed import dispy + + nodes = kwargs.get("nodes", ['127.0.0.1']) + num_batches = kwargs.get('num_batches', 10) + + ret = dispy.distributed_predict(self, kwargs, nodes, ndata, num_batches) + + elif distributed == 'spark': + from pyFTS.distributed import spark + + nodes = kwargs.get("nodes", 'spark://192.168.0.110:7077') + app = kwargs.get("app", 'pyFTS') + ret = spark.distributed_predict(data=ndata, model=self, url=nodes, app=app) - ret = Util.distributed_predict(self, kwargs, nodes, ndata, num_batches) if not self.is_multivariate: kwargs['type'] = type @@ -323,12 +334,20 @@ class FTS(object): batch_save_interval = kwargs.get('batch_save_interval', 10) - if distributed: - nodes = kwargs.get('nodes', False) - train_method = kwargs.get('train_method', Util.simple_model_train) - Util.distributed_train(self, train_method, nodes, type(self), data, num_batches, {}, - batch_save=batch_save, file_path=file_path, - batch_save_interval=batch_save_interval) + if distributed is not None: + + if distributed == 'dispy': + from pyFTS.distributed import dispy + nodes = kwargs.get('nodes', False) + train_method = kwargs.get('train_method', dispy.simple_model_train) + dispy.distributed_train(self, train_method, nodes, type(self), data, num_batches, {}, + batch_save=batch_save, file_path=file_path, + batch_save_interval=batch_save_interval) + elif distributed == 'spark': + from pyFTS.distributed import spark + + spark.distributed_train(self, data, self.partitioner, + url='spark://192.168.0.110:7077', app='pyFTS') else: if dump == 'time': diff --git a/pyFTS/distributed/__init__.py b/pyFTS/distributed/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyFTS/distributed/dispy.py b/pyFTS/distributed/dispy.py new file mode 100644 index 0000000..91366b2 --- /dev/null +++ b/pyFTS/distributed/dispy.py @@ -0,0 +1,127 @@ +import dispy, dispy.httpd, logging +from pyFTS.common import Util + + +def start_dispy_cluster(method, nodes): + + cluster = dispy.JobCluster(method, nodes=nodes, loglevel=logging.DEBUG, ping_interval=1000) + + http_server = dispy.httpd.DispyHTTPServer(cluster) + + return cluster, http_server + + +def stop_dispy_cluster(cluster, http_server): + cluster.wait() # wait for all jobs to finish + + cluster.print_status() + + http_server.shutdown() # this waits until browser gets all updates + cluster.close() + + + +def simple_model_train(model, data, parameters): + model.train(data, **parameters) + return model + + +def distributed_train(model, train_method, nodes, fts_method, data, num_batches=10, + train_parameters={}, **kwargs): + import dispy, dispy.httpd, datetime + + batch_save = kwargs.get('batch_save', False) # save model between batches + + batch_save_interval = kwargs.get('batch_save_interval', 1) + + file_path = kwargs.get('file_path', None) + + cluster, http_server = start_dispy_cluster(train_method, nodes) + + print("[{0: %H:%M:%S}] Distrituted Train Started".format(datetime.datetime.now())) + + jobs = [] + n = len(data) + batch_size = int(n / num_batches) + bcount = 1 + for ct in range(model.order, n, batch_size): + if model.is_multivariate: + ndata = data.iloc[ct - model.order:ct + batch_size] + else: + ndata = data[ct - model.order: ct + batch_size] + + tmp_model = fts_method(str(bcount)) + + tmp_model.clone_parameters(model) + + job = cluster.submit(tmp_model, ndata, train_parameters) + job.id = bcount # associate an ID to identify jobs (if needed later) + jobs.append(job) + + bcount += 1 + + for job in jobs: + print("[{0: %H:%M:%S}] Processing batch ".format(datetime.datetime.now()) + str(job.id)) + tmp = job() + if job.status == dispy.DispyJob.Finished and tmp is not None: + model.merge(tmp) + + if batch_save and (job.id % batch_save_interval) == 0: + Util.persist_obj(model, file_path) + + else: + print(job.exception) + print(job.stdout) + + print("[{0: %H:%M:%S}] Finished batch ".format(datetime.datetime.now()) + str(job.id)) + + print("[{0: %H:%M:%S}] Distrituted Train Finished".format(datetime.datetime.now())) + + stop_dispy_cluster(cluster, http_server) + + return model + + + +def simple_model_predict(model, data, parameters): + return model.predict(data, **parameters) + + + +def distributed_predict(model, parameters, nodes, data, num_batches): + import dispy, dispy.httpd + + cluster, http_server = start_dispy_cluster(simple_model_predict, nodes) + + jobs = [] + n = len(data) + batch_size = int(n / num_batches) + bcount = 1 + for ct in range(model.order, n, batch_size): + if model.is_multivariate: + ndata = data.iloc[ct - model.order:ct + batch_size] + else: + ndata = data[ct - model.order: ct + batch_size] + + job = cluster.submit(model, ndata, parameters) + job.id = bcount # associate an ID to identify jobs (if needed later) + jobs.append(job) + + bcount += 1 + + ret = [] + + for job in jobs: + tmp = job() + if job.status == dispy.DispyJob.Finished and tmp is not None: + if job.id < batch_size: + ret.extend(tmp[:-1]) + else: + ret.extend(tmp) + else: + print(job.exception) + print(job.stdout) + + stop_dispy_cluster(cluster, http_server) + + return ret diff --git a/pyFTS/distributed/spark.py b/pyFTS/distributed/spark.py new file mode 100644 index 0000000..b7c0865 --- /dev/null +++ b/pyFTS/distributed/spark.py @@ -0,0 +1,83 @@ +import numpy as np +import pandas as pd + +from pyFTS.data import Enrollments, TAIEX +from pyFTS.partitioners import Grid, Simple +from pyFTS.models import hofts + +from pyspark import SparkConf +from pyspark import SparkContext + +import os +# make sure pyspark tells workers to use python3 not 2 if both are installed +os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' +os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3' + +conf = None + + +def get_conf(url, app): + """ + + + :param url: + :param app: + :return: + """ + if conf is None: + conf = SparkConf() + conf.setMaster(url) + conf.setAppName(app) + + return conf + + +def get_partitioner(shared_partitioner): + """ + + :param part: + :return: + """ + fs_tmp = Simple.SimplePartitioner() + + for fset in shared_partitioner.value.keys(): + fz = shared_partitioner.value[fset] + fs_tmp.append(fset, fz.mf, fz.parameters) + + return fs_tmp + + +def slave_train(data): + """ + + :param data: + :return: + """ + + model = shared_method.value(partitioner=get_partitioner(shared_partitioner), + order=shared_order.value) + + ndata = [k for k in data] + + model.train(ndata) + + return [(k, model.flrgs[k]) for k in model.flrgs] + + +def distributed_train(model, data, partitioner, url='spark://192.168.0.110:7077', app='pyFTS'): + with SparkContext(conf=get_conf(url=url, app=app)) as context: + shared_partitioner = context.broadcast(partitioner.sets) + + flrgs = context.parallelize(data).mapPartitions(slave_train) + + model = hofts.WeightedHighOrderFTS(partitioner=partitioner, order=shared_order.value) + + for k in flrgs.collect(): + model.append_rule(k[1]) + + return model + + + +def distributed_predict(data, model, url='spark://192.168.0.110:7077', app='pyFTS'): + return None diff --git a/setup.py b/setup.py index 7fd00a7..a582278 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( packages=['pyFTS', 'pyFTS.benchmarks', 'pyFTS.common', 'pyFTS.data', 'pyFTS.models.ensemble', 'pyFTS.models', 'pyFTS.models.seasonal', 'pyFTS.partitioners', 'pyFTS.probabilistic', 'pyFTS.tests', 'pyFTS.models.nonstationary', 'pyFTS.models.multivariate', - 'pyFTS.models.incremental', 'pyFTS.hyperparam'], + 'pyFTS.models.incremental', 'pyFTS.hyperparam', 'pyFTS.distributed'], version='1.4', description='Fuzzy Time Series for Python', author='Petronio Candido L. e Silva',