diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index ad6f039..070ce50 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -403,6 +403,26 @@ class FTS(object): self.benchmark_only = model.benchmark_only self.indexer = model.indexer + def append_rule(self, flrg): + """ + Append FLRG rule to the model + + :param flrg: rule + :return: + """ + + if flrg.get_key() not in self.flrgs: + self.flrgs[flrg.get_key()] = flrg + else: + if isinstance(flrg.RHS, (list, set)): + for k in flrg.RHS: + self.flrgs[flrg.get_key()].append_rhs(k) + elif isinstance(flrg.RHS, dict): + for key, value in flrg.RHS.items(): + self.flrgs[flrg.get_key()].append_rhs(key, count=value) + else: + self.flrgs[flrg.get_key()].append_rhs(flrg.RHS) + def merge(self, model): """ Merge the FLRG rules from other model @@ -411,19 +431,8 @@ class FTS(object): :return: """ - for key in model.flrgs.keys(): - flrg = model.flrgs[key] - if flrg.get_key() not in self.flrgs: - self.flrgs[flrg.get_key()] = flrg - else: - if isinstance(flrg.RHS, (list, set)): - for k in flrg.RHS: - self.flrgs[flrg.get_key()].append_rhs(k) - elif isinstance(flrg.RHS, dict): - for k in flrg.RHS.keys(): - self.flrgs[flrg.get_key()].append_rhs(flrg.RHS[k]) - else: - self.flrgs[flrg.get_key()].append_rhs(flrg.RHS) + for key, flrg in model.flrgs.items(): + self.append_rule(flrg) def append_transformation(self, transformation): if transformation is not None: diff --git a/pyFTS/models/hofts.py b/pyFTS/models/hofts.py index 33a6223..2b70bc6 100644 --- a/pyFTS/models/hofts.py +++ b/pyFTS/models/hofts.py @@ -49,11 +49,12 @@ class WeightedHighOrderFLRG(flrg.FLRG): self.w = None def append_rhs(self, fset, **kwargs): + count = kwargs.get('count',1.0) if fset not in self.RHS: - self.RHS[fset] = 1.0 + self.RHS[fset] = count else: - self.RHS[fset] += 1.0 - self.count += 1.0 + self.RHS[fset] += count + self.count += count def append_lhs(self, c): self.LHS.append(c) diff --git a/pyFTS/models/ismailefendi.py b/pyFTS/models/ismailefendi.py index ff0f0d5..195af58 100644 --- a/pyFTS/models/ismailefendi.py +++ b/pyFTS/models/ismailefendi.py @@ -20,12 +20,13 @@ class ImprovedWeightedFLRG(flrg.FLRG): self.w = None def append_rhs(self, c, **kwargs): + count = kwargs.get('count', 1.0) if c not in self.RHS: self.RHS[c] = c - self.rhs_counts[c] = 1.0 + self.rhs_counts[c] = count else: - self.rhs_counts[c] += 1.0 - self.count += 1.0 + self.rhs_counts[c] += count + self.count += count def weights(self): if self.w is None: diff --git a/pyFTS/models/pwfts.py b/pyFTS/models/pwfts.py index 48d9b21..105dda3 100644 --- a/pyFTS/models/pwfts.py +++ b/pyFTS/models/pwfts.py @@ -28,13 +28,12 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG): return sets[self.LHS[0]].membership(data) def append_rhs(self, c, **kwargs): - mv = kwargs.get('mv', 1.0) - self.frequency_count += mv + count = kwargs.get('count', 1.0) + self.frequency_count += count if c in self.RHS: - self.rhs_count[c] += mv + self.RHS[c] += count else: - self.RHS[c] = c - self.rhs_count[c] = mv + self.RHS[c] = count def lhs_conditional_probability(self, x, sets, norm, uod, nbins): pk = self.frequency_count / norm @@ -173,7 +172,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS): mvs = [] for set, mv in fuzzyfied: - self.flrgs[flrg.get_key()].append_rhs(set, mv=lhs_mv * mv) + self.flrgs[flrg.get_key()].append_rhs(set, count=lhs_mv * mv) mvs.append(mv) tmp_fq = sum([lhs_mv*kk for kk in mvs if kk > 0]) diff --git a/pyFTS/models/sadaei.py b/pyFTS/models/sadaei.py index 6bac656..8d5cc31 100644 --- a/pyFTS/models/sadaei.py +++ b/pyFTS/models/sadaei.py @@ -22,8 +22,9 @@ class ExponentialyWeightedFLRG(flrg.FLRG): self.w = None def append_rhs(self, c, **kwargs): + count = kwargs.get('count', 1.0) self.RHS.append(c) - self.count = self.count + 1.0 + self.count += count def weights(self): if self.w is None: diff --git a/pyFTS/models/yu.py b/pyFTS/models/yu.py index 01187d3..b2bd7fe 100644 --- a/pyFTS/models/yu.py +++ b/pyFTS/models/yu.py @@ -20,8 +20,9 @@ class WeightedFLRG(flrg.FLRG): self.w = None def append_rhs(self, c, **kwargs): + count = kwargs.get('count', 1.0) self.RHS.append(c) - self.count = self.count + 1.0 + self.count += count def weights(self, sets): if self.w is None: diff --git a/pyFTS/tests/general.py b/pyFTS/tests/general.py index 94de7af..8ef7d8b 100644 --- a/pyFTS/tests/general.py +++ b/pyFTS/tests/general.py @@ -27,12 +27,14 @@ from pyFTS.models import pwfts partitioner = Grid.GridPartitioner(data=y, npart=35) -model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner, order=2) +model = pwfts.ProbabilisticWeightedFTS(partitioner=partitioner, order=2, lags=[3,4]) model.fit(y[:800]) from pyFTS.benchmarks import benchmarks as bchmk -distributions = model.predict(y[800:820], steps_ahead=20, type='distribution') +distributions = model.predict(y[800:820]) + +print(distributions) ''' diff --git a/pyFTS/tests/spark.py b/pyFTS/tests/spark.py new file mode 100644 index 0000000..3029d38 --- /dev/null +++ b/pyFTS/tests/spark.py @@ -0,0 +1,82 @@ +import numpy as np +import pandas as pd + +from pyFTS.data import Enrollments, TAIEX +from pyFTS.partitioners import Grid, Simple +from pyFTS.models import hofts + +from pyspark import SparkConf +from pyspark import SparkContext + +import os +# make sure pyspark tells workers to use python3 not 2 if both are installed +os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' +os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3' + +conf = SparkConf() +conf.setMaster('spark://192.168.0.110:7077') +conf.setAppName('pyFTS') + +data = TAIEX.get_data() + +fs = Grid.GridPartitioner(data=data, npart=50) + + +def fun(x): + return (x, x % 2) + + +def get_fs(): + fs_tmp = Simple.SimplePartitioner() + + for fset in part.value.keys(): + fz = part.value[fset] + fs_tmp.append(fset, fz.mf, fz.parameters) + + return fs_tmp + +def fuzzyfy(x): + + fs_tmp = get_fs() + + ret = [] + + for k in x: + ret.append(fs_tmp.fuzzyfy(k, mode='both')) + + return ret + + +def train(fuzzyfied): + model = hofts.WeightedHighOrderFTS(partitioner=get_fs(), order=order.value) + + ndata = [k for k in fuzzyfied] + + model.train(ndata) + + return [(k, model.flrgs[k]) for k in model.flrgs] + + +with SparkContext(conf=conf) as sc: + + part = sc.broadcast(fs.sets) + + order = sc.broadcast(2) + + #ret = sc.parallelize(np.arange(0,100)).map(fun) + + #fuzzyfied = sc.parallelize(data).mapPartitions(fuzzyfy) + + flrgs = sc.parallelize(data).mapPartitions(train) + + model = hofts.WeightedHighOrderFTS(partitioner=fs, order=order.value) + + for k in flrgs.collect(): + model.append_rule(k[1]) + + print(model) + + + + +