Bugfixes in pwfts, distributed and composite

This commit is contained in:
Petrônio Cândido 2018-12-27 18:46:55 -02:00
parent e6247b0779
commit 87686e5ff0
8 changed files with 119 additions and 61 deletions

View File

@ -28,16 +28,17 @@ class FuzzySet(object):
self.Z = None
"""Partition function in respect to the membership function"""
if self.mf == Membership.gaussmf:
self.lower = parameters[0] - parameters[1]*3
self.upper = parameters[0] + parameters[1]*3
elif self.mf == Membership.sigmf:
k = (parameters[1] / (2 * parameters[0]))
self.lower = parameters[1] - k
self.upper = parameters[1] + k
else:
self.lower = min(parameters)
self.upper = max(parameters)
if parameters is not None:
if self.mf == Membership.gaussmf:
self.lower = parameters[0] - parameters[1]*3
self.upper = parameters[0] + parameters[1]*3
elif self.mf == Membership.sigmf:
k = (parameters[1] / (2 * parameters[0]))
self.lower = parameters[1] - k
self.upper = parameters[1] + k
else:
self.lower = min(parameters)
self.upper = max(parameters)
self.metadata = {}

View File

@ -322,7 +322,7 @@ class FTS(object):
dump = kwargs.get('dump', None)
num_batches = kwargs.get('num_batches', None)
num_batches = kwargs.get('num_batches', 1)
save = kwargs.get('save_model', False) # save model on disk
@ -334,7 +334,7 @@ class FTS(object):
batch_save_interval = kwargs.get('batch_save_interval', 10)
if distributed is not None:
if distributed is not None and distributed:
if distributed == 'dispy':
from pyFTS.distributed import dispy
@ -345,9 +345,10 @@ class FTS(object):
batch_save_interval=batch_save_interval)
elif distributed == 'spark':
from pyFTS.distributed import spark
url = kwargs.get('url', 'spark://192.168.0.110:7077')
app = kwargs.get('app', 'pyFTS')
spark.distributed_train(self, data, self.partitioner,
url='spark://192.168.0.110:7077', app='pyFTS')
spark.distributed_train(self, data, url=url, app=app)
else:
if dump == 'time':

View File

@ -20,7 +20,6 @@ def stop_dispy_cluster(cluster, http_server):
cluster.close()
def simple_model_train(model, data, parameters):
model.train(data, **parameters)
return model
@ -50,7 +49,7 @@ def distributed_train(model, train_method, nodes, fts_method, data, num_batches=
else:
ndata = data[ct - model.order: ct + batch_size]
tmp_model = fts_method(str(bcount))
tmp_model = fts_method()
tmp_model.clone_parameters(model)

View File

@ -13,23 +13,6 @@ import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
conf = None
def get_conf(url, app):
"""
:param url:
:param app:
:return:
"""
if conf is None:
conf = SparkConf()
conf.setMaster(url)
conf.setAppName(app)
return conf
def get_partitioner(shared_partitioner):
@ -47,7 +30,7 @@ def get_partitioner(shared_partitioner):
return fs_tmp
def slave_train(data):
def slave_train(data, shared_method, shared_partitioner, shared_order):
"""
:param data:
@ -64,13 +47,29 @@ def slave_train(data):
return [(k, model.flrgs[k]) for k in model.flrgs]
def distributed_train(model, data, partitioner, url='spark://192.168.0.110:7077', app='pyFTS'):
with SparkContext(conf=get_conf(url=url, app=app)) as context:
shared_partitioner = context.broadcast(partitioner.sets)
def distributed_train(model, data, url='spark://192.168.0.110:7077', app='pyFTS'):
"""
flrgs = context.parallelize(data).mapPartitions(slave_train)
model = hofts.WeightedHighOrderFTS(partitioner=partitioner, order=shared_order.value)
:param model:
:param data:
:param url:
:param app:
:return:
"""
conf = SparkConf()
conf.setMaster(url)
conf.setAppName(app)
with SparkContext(conf=conf) as context:
shared_partitioner = context.broadcast(model.partitioner.sets)
shared_order = context.broadcast(model.order)
shared_method = context.broadcast(type(model))
func = lambda x: slave_train(x, shared_method, shared_partitioner, shared_order)
flrgs = context.parallelize(data).mapPartitions(func)
for k in flrgs.collect():
model.append_rule(k[1])

View File

@ -16,7 +16,6 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG):
def __init__(self, order):
super(ProbabilisticWeightedFLRG, self).__init__(order)
self.RHS = {}
self.rhs_count = {}
self.frequency_count = 0.0
self.Z = None
@ -43,11 +42,11 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG):
return tmp
def rhs_unconditional_probability(self, c):
return self.rhs_count[c] / self.frequency_count
return self.RHS[c] / self.frequency_count
def rhs_conditional_probability(self, x, sets, uod, nbins):
total = 0.0
for rhs in self.RHS:
for rhs in self.RHS.keys():
set = sets[rhs]
wi = self.rhs_unconditional_probability(rhs)
mv = set.membership(x) / set.partition_function(uod, nbins=nbins)
@ -68,28 +67,30 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG):
'''Return the expectation of the PWFLRG, the weighted sum'''
if self.midpoint is None:
self.midpoint = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].centroid
for s in self.RHS]))
for s in self.RHS.keys()]))
return self.midpoint
def get_upper(self, sets):
if self.upper is None:
self.upper = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].upper for s in self.RHS]))
self.upper = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].upper
for s in self.RHS.keys()]))
return self.upper
def get_lower(self, sets):
if self.lower is None:
self.lower = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].lower for s in self.RHS]))
self.lower = np.sum(np.array([self.rhs_unconditional_probability(s) * sets[s].lower
for s in self.RHS.keys()]))
return self.lower
def __str__(self):
tmp2 = ""
for c in sorted(self.RHS):
for c in sorted(self.RHS.keys()):
if len(tmp2) > 0:
tmp2 = tmp2 + ", "
tmp2 = tmp2 + "(" + str(round(self.rhs_count[c] / self.frequency_count, 3)) + ")" + c
tmp2 = tmp2 + "(" + str(round(self.RHS[c] / self.frequency_count, 3)) + ")" + c
return self.get_key() + " -> " + tmp2
@ -530,7 +531,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
def __str__(self):
tmp = self.name + ":\n"
for r in sorted(self.flrgs):
for r in sorted(self.flrgs.keys()):
p = round(self.flrgs[r].frequency_count / self.global_frequency_count, 3)
tmp = tmp + "(" + str(p) + ") " + str(self.flrgs[r]) + "\n"
return tmp

View File

@ -25,20 +25,24 @@ from pyFTS.data import TAIEX, SP500, NASDAQ, Malaysia, Enrollments
from pyFTS.partitioners import Grid
from pyFTS.models import pwfts, tsaur
train = TAIEX.get_data()[:1000]
test = TAIEX.get_data()[1000:1200]
dataset = pd.read_csv('/home/petronio/Downloads/kalang.csv', sep=',')
partitioner = Grid.GridPartitioner(data=train, npart=35)
dataset['date'] = pd.to_datetime(dataset["date"], format='%Y-%m-%d %H:%M:%S')
#model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner) #, order=2, lags=[3,4])
model = tsaur.MarkovWeightedFTS(partitioner=partitioner)
model.fit(train)
train_uv = dataset['value'].values[:24505]
test_uv = dataset['value'].values[24505:]
partitioner = Grid.GridPartitioner(data=train_uv, npart=35)
model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner) #, order=2, lags=[3,4])
#model = tsaur.MarkovWeightedFTS(partitioner=partitioner)
model.fit(train_uv)
from pyFTS.benchmarks import benchmarks as bchmk
print(model)
print(model.forecast(test))
print(model.forecast(test_uv))
#distributions = model.predict(y[800:820])

View File

@ -18,6 +18,53 @@ from pyFTS.models.multivariate import common, variable, mvfts, cmvfts
from pyFTS.models.seasonal import partitioner as seasonal
from pyFTS.models.seasonal.common import DateTime
dataset = pd.read_csv('/home/petronio/Downloads/kalang.csv', sep=',')
dataset['date'] = pd.to_datetime(dataset["date"], format='%Y-%m-%d %H:%M:%S')
train_uv = dataset['value'].values[:24505]
test_uv = dataset['value'].values[24505:]
train_mv = dataset.iloc[:24505]
test_mv = dataset.iloc[24505:]
print(train_mv)
sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k)+'hs' for k in range(0,24)]}
vhour = variable.Variable("Hour", data_label="date", partitioner=seasonal.TimeGridPartitioner, npart=24,
data=train_mv, partitioner_specific=sp)
vvalue = variable.Variable("Pollution", data_label="value", alias='value',
partitioner=Grid.GridPartitioner, npart=35,
data=train_mv)
parameters = [
{},{},
{'order':2, 'knn': 1},
{'order':2, 'knn': 2},
{'order':2, 'knn': 3},
]
for ct, method in enumerate([mvfts.MVFTS, wmvfts.WeightedMVFTS,
cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS,cmvfts.ClusteredMVFTS]):
print(method)
model = method(**parameters[ct])
model.shortname += str(ct)
model.append_variable(vhour)
model.append_variable(vvalue)
model.target_variable = vvalue
model.fit(train_mv)
Util.persist_obj(model, model.shortname)
forecasts = model.predict(test_mv.iloc[:100])
print(model)
'''
from pyFTS.data import henon
df = henon.get_dataframe(iterations=1000)
@ -47,4 +94,5 @@ ax[1][0].scatter(forecasts['y'].values,forecasts['x'].values)
ax[1][1].plot(test['y'].values)
ax[1][1].plot(forecasts['y'].values)
print(forecasts)
print(forecasts)
'''

View File

@ -13,15 +13,20 @@ import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
conf = SparkConf()
conf.setMaster('spark://192.168.0.110:7077')
conf.setAppName('pyFTS')
data = TAIEX.get_data()
fs = Grid.GridPartitioner(data=data, npart=50)
model = hofts.WeightedHighOrderFTS(partitioner=fs, order=2)
model.fit(data, distributed='spark', url='spark://192.168.0.110:7077')
#model.fit(data, distributed='dispy', nodes=['192.168.0.110'])
print(model)
'''
def fun(x):
return (x, x % 2)
@ -76,7 +81,7 @@ with SparkContext(conf=conf) as sc:
print(model)
'''