pySpark support

This commit is contained in:
Petrônio Cândido 2018-12-17 21:49:48 -02:00
parent 6dcdd8f59e
commit 386f28c47d
8 changed files with 125 additions and 29 deletions

View File

@ -403,6 +403,26 @@ class FTS(object):
self.benchmark_only = model.benchmark_only
self.indexer = model.indexer
def append_rule(self, flrg):
"""
Append FLRG rule to the model
:param flrg: rule
:return:
"""
if flrg.get_key() not in self.flrgs:
self.flrgs[flrg.get_key()] = flrg
else:
if isinstance(flrg.RHS, (list, set)):
for k in flrg.RHS:
self.flrgs[flrg.get_key()].append_rhs(k)
elif isinstance(flrg.RHS, dict):
for key, value in flrg.RHS.items():
self.flrgs[flrg.get_key()].append_rhs(key, count=value)
else:
self.flrgs[flrg.get_key()].append_rhs(flrg.RHS)
def merge(self, model):
"""
Merge the FLRG rules from other model
@ -411,19 +431,8 @@ class FTS(object):
:return:
"""
for key in model.flrgs.keys():
flrg = model.flrgs[key]
if flrg.get_key() not in self.flrgs:
self.flrgs[flrg.get_key()] = flrg
else:
if isinstance(flrg.RHS, (list, set)):
for k in flrg.RHS:
self.flrgs[flrg.get_key()].append_rhs(k)
elif isinstance(flrg.RHS, dict):
for k in flrg.RHS.keys():
self.flrgs[flrg.get_key()].append_rhs(flrg.RHS[k])
else:
self.flrgs[flrg.get_key()].append_rhs(flrg.RHS)
for key, flrg in model.flrgs.items():
self.append_rule(flrg)
def append_transformation(self, transformation):
if transformation is not None:

View File

@ -49,11 +49,12 @@ class WeightedHighOrderFLRG(flrg.FLRG):
self.w = None
def append_rhs(self, fset, **kwargs):
count = kwargs.get('count',1.0)
if fset not in self.RHS:
self.RHS[fset] = 1.0
self.RHS[fset] = count
else:
self.RHS[fset] += 1.0
self.count += 1.0
self.RHS[fset] += count
self.count += count
def append_lhs(self, c):
self.LHS.append(c)

View File

@ -20,12 +20,13 @@ class ImprovedWeightedFLRG(flrg.FLRG):
self.w = None
def append_rhs(self, c, **kwargs):
count = kwargs.get('count', 1.0)
if c not in self.RHS:
self.RHS[c] = c
self.rhs_counts[c] = 1.0
self.rhs_counts[c] = count
else:
self.rhs_counts[c] += 1.0
self.count += 1.0
self.rhs_counts[c] += count
self.count += count
def weights(self):
if self.w is None:

View File

@ -28,13 +28,12 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG):
return sets[self.LHS[0]].membership(data)
def append_rhs(self, c, **kwargs):
mv = kwargs.get('mv', 1.0)
self.frequency_count += mv
count = kwargs.get('count', 1.0)
self.frequency_count += count
if c in self.RHS:
self.rhs_count[c] += mv
self.RHS[c] += count
else:
self.RHS[c] = c
self.rhs_count[c] = mv
self.RHS[c] = count
def lhs_conditional_probability(self, x, sets, norm, uod, nbins):
pk = self.frequency_count / norm
@ -173,7 +172,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
mvs = []
for set, mv in fuzzyfied:
self.flrgs[flrg.get_key()].append_rhs(set, mv=lhs_mv * mv)
self.flrgs[flrg.get_key()].append_rhs(set, count=lhs_mv * mv)
mvs.append(mv)
tmp_fq = sum([lhs_mv*kk for kk in mvs if kk > 0])

View File

@ -22,8 +22,9 @@ class ExponentialyWeightedFLRG(flrg.FLRG):
self.w = None
def append_rhs(self, c, **kwargs):
count = kwargs.get('count', 1.0)
self.RHS.append(c)
self.count = self.count + 1.0
self.count += count
def weights(self):
if self.w is None:

View File

@ -20,8 +20,9 @@ class WeightedFLRG(flrg.FLRG):
self.w = None
def append_rhs(self, c, **kwargs):
count = kwargs.get('count', 1.0)
self.RHS.append(c)
self.count = self.count + 1.0
self.count += count
def weights(self, sets):
if self.w is None:

View File

@ -27,12 +27,14 @@ from pyFTS.models import pwfts
partitioner = Grid.GridPartitioner(data=y, npart=35)
model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner, order=2)
model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner, order=2, lags=[3,4])
model.fit(y[:800])
from pyFTS.benchmarks import benchmarks as bchmk
distributions = model.predict(y[800:820], steps_ahead=20, type='distribution')
distributions = model.predict(y[800:820])
print(distributions)
'''

82
pyFTS/tests/spark.py Normal file
View File

@ -0,0 +1,82 @@
import numpy as np
import pandas as pd
from pyFTS.data import Enrollments, TAIEX
from pyFTS.partitioners import Grid, Simple
from pyFTS.models import hofts
from pyspark import SparkConf
from pyspark import SparkContext
import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'
conf = SparkConf()
conf.setMaster('spark://192.168.0.110:7077')
conf.setAppName('pyFTS')
data = TAIEX.get_data()
fs = Grid.GridPartitioner(data=data, npart=50)
def fun(x):
return (x, x % 2)
def get_fs():
fs_tmp = Simple.SimplePartitioner()
for fset in part.value.keys():
fz = part.value[fset]
fs_tmp.append(fset, fz.mf, fz.parameters)
return fs_tmp
def fuzzyfy(x):
fs_tmp = get_fs()
ret = []
for k in x:
ret.append(fs_tmp.fuzzyfy(k, mode='both'))
return ret
def train(fuzzyfied):
model = hofts.WeightedHighOrderFTS(partitioner=get_fs(), order=order.value)
ndata = [k for k in fuzzyfied]
model.train(ndata)
return [(k, model.flrgs[k]) for k in model.flrgs]
with SparkContext(conf=conf) as sc:
part = sc.broadcast(fs.sets)
order = sc.broadcast(2)
#ret = sc.parallelize(np.arange(0,100)).map(fun)
#fuzzyfied = sc.parallelize(data).mapPartitions(fuzzyfy)
flrgs = sc.parallelize(data).mapPartitions(train)
model = hofts.WeightedHighOrderFTS(partitioner=fs, order=order.value)
for k in flrgs.collect():
model.append_rule(k[1])
print(model)