Improvements on pwfts to support GranularFTS; Improvementos on mvfts methods, fuzzy sets and partitioners to support interval and probabilistic forecasting

This commit is contained in:
Petrônio Cândido 2019-06-21 11:32:56 -03:00
parent 354a3131c9
commit 1237f3c2e3
11 changed files with 300 additions and 91 deletions

View File

@ -321,7 +321,7 @@ class FTS(object):
if 'partitioner' in kwargs:
self.partitioner = kwargs.pop('partitioner')
if not self.is_wrapper and not self.benchmark_only:
if not self.is_multivariate and not self.is_wrapper and not self.benchmark_only:
if self.partitioner is None:
raise Exception("Fuzzy sets were not provided for the model. Use 'partitioner' parameter. ")

View File

@ -34,12 +34,13 @@ class ClusteredMVFTS(mvfts.MVFTS):
self.name = "Clustered Multivariate FTS"
self.pre_fuzzyfy = kwargs.get('pre_fuzzyfy', True)
self.fuzzyfy_mode = kwargs.get('fuzzyfy_mode', 'sets')
def fuzzyfy(self,data):
ndata = []
for index, row in data.iterrows():
data_point = self.format_data(row)
ndata.append(self.partitioner.fuzzyfy(data_point, mode='sets'))
ndata.append(self.partitioner.fuzzyfy(data_point, mode=self.fuzzyfy_mode))
return ndata
@ -71,6 +72,50 @@ class ClusteredMVFTS(mvfts.MVFTS):
return self.model.forecast(ndata, fuzzyfied=pre_fuzz, **kwargs)
def forecast_interval(self, data, **kwargs):
if not self.model.has_interval_forecasting:
raise Exception("The internal method does not support interval forecasting!")
data = self.check_data(data)
pre_fuzz = kwargs.get('pre_fuzzyfy', self.pre_fuzzyfy)
return self.model.forecast_interval(data, fuzzyfied=pre_fuzz, **kwargs)
def forecast_ahead_interval(self, data, steps, **kwargs):
if not self.model.has_interval_forecasting:
raise Exception("The internal method does not support interval forecasting!")
data = self.check_data(data)
pre_fuzz = kwargs.get('pre_fuzzyfy', self.pre_fuzzyfy)
return self.model.forecast_ahead_interval(data, steps, fuzzyfied=pre_fuzz, **kwargs)
def forecast_distribution(self, data, **kwargs):
if not self.model.has_probability_forecasting:
raise Exception("The internal method does not support probabilistic forecasting!")
data = self.check_data(data)
pre_fuzz = kwargs.get('pre_fuzzyfy', self.pre_fuzzyfy)
return self.model.forecast_distribution(data, fuzzyfied=pre_fuzz, **kwargs)
def forecast_ahead_distribution(self, data, steps, **kwargs):
if not self.model.has_probability_forecasting:
raise Exception("The internal method does not support probabilistic forecasting!")
data = self.check_data(data)
pre_fuzz = kwargs.get('pre_fuzzyfy', self.pre_fuzzyfy)
return self.model.forecast_ahead_distribution(data, steps, fuzzyfied=pre_fuzz, **kwargs)
def forecast_multivariate(self, data, **kwargs):
ndata = self.check_data(data)

View File

@ -27,18 +27,26 @@ class MultivariateFuzzySet(Composite.FuzzySet):
if variable == self.target_variable.name:
self.centroid = set.centroid
self.upper = set.upper
self.lower = set.lower
self.name += set.name
def set_target_variable(self, variable):
self.target_variable = variable
self.centroid = self.sets[variable.name].centroid
self.upper = self.sets[variable.name].upper
self.lower = self.sets[variable.name].lower
def membership(self, x):
mv = []
if isinstance(x, (dict, pd.DataFrame)):
for var in self.sets.keys():
data = x[var]
mv.append(self.sets[var].membership(data))
else:
mv = [self.sets[self.target_variable.name].membership(x)]
return np.nanmin(mv)

View File

@ -10,18 +10,19 @@ class GranularWMVFTS(cmvfts.ClusteredMVFTS):
def __init__(self, **kwargs):
super(GranularWMVFTS, self).__init__(**kwargs)
self.fts_method = hofts.WeightedHighOrderFTS
self.fts_method = kwargs.get('fts_method', hofts.WeightedHighOrderFTS)
self.model = None
"""The most recent trained model"""
self.knn = kwargs.get('knn', 2)
self.order = kwargs.get("order", 2)
self.shortname = "GranularWMVFTS"
self.name = "Granular Weighted Multivariate FTS"
self.mode = kwargs.get('mode','sets')
def train(self, data, **kwargs):
self.partitioner = grid.IncrementalGridCluster(
explanatory_variables=self.explanatory_variables,
target_variable=self.target_variable,
neighbors=self.knn)
super(GranularWMVFTS, self).train(data,**kwargs)
super(GranularWMVFTS, self).train(data, mode=self.mode, **kwargs)

View File

@ -31,6 +31,34 @@ class GridCluster(partitioner.MultivariatePartitioner):
self.build_index()
def defuzzyfy(self, values, mode='both'):
if not isinstance(values, list):
values = [values]
ret = []
for val in values:
if mode == 'both':
num = []
den = []
for fset, mv in val:
num.append(self.sets[fset].centroid * mv)
den.append(mv)
ret.append(np.sum(num) / np.sum(den))
elif mode == 'both':
num = np.mean([self.sets[fset].centroid for fset in val])
ret.append(num)
elif mode == 'vector':
num = []
den = []
for fset, mv in enumerate(val):
num.append(self.sets[self.ordered_sets[fset]].centroid * mv)
den.append(mv)
ret.append(np.sum(num) / np.sum(den))
else:
raise Exception('Unknown deffuzyfication mode')
return ret
class IncrementalGridCluster(partitioner.MultivariatePartitioner):
"""
@ -67,6 +95,7 @@ class IncrementalGridCluster(partitioner.MultivariatePartitioner):
for key in fsets:
mvfset = self.sets[key]
ret.append((key, mvfset.membership(data)))
return ret
def incremental_search(self, data, **kwargs):
@ -77,21 +106,30 @@ class IncrementalGridCluster(partitioner.MultivariatePartitioner):
ret = []
for var in self.explanatory_variables:
ac = alpha_cut if alpha_cut > 0. else var.alpha_cut
fsets[var.name] = var.partitioner.fuzzyfy(data[var.name], mode='sets', alpha_cut=ac)
fsets[var.name] = var.partitioner.fuzzyfy(data[var.name], mode=mode, alpha_cut=ac)
fset = [val for key, val in fsets.items()]
fsets_by_var = [fsets for var, fsets in fsets.items()]
for p in product(*fset):
for p in product(*fsets_by_var):
if mode == 'both':
path = [fset for fset, mv in p]
mv = [mv for fset, mv in p]
key = ''.join(path)
elif mode == 'sets':
key = ''.join(p)
path = p
if key not in self.sets:
mvfset = MultivariateFuzzySet(target_variable=self.target_variable)
for ct, fs in enumerate(p):
for ct, fs in enumerate(path):
mvfset.append_set(self.explanatory_variables[ct].name,
self.explanatory_variables[ct].partitioner[fs])
mvfset.name = key
self.sets[key] = mvfset
ret.append(key)
if mode == 'sets':
ret.append(key)
elif mode == 'both':
ret.append( tuple(key,np.nanmin(mv)) )
return ret

View File

@ -302,7 +302,6 @@ class MVFTS(fts.FTS):
return ret[-steps]
def clone_parameters(self, model):
super(MVFTS, self).clone_parameters(model)

View File

@ -26,6 +26,11 @@ class MultivariatePartitioner(partitioner.Partitioner):
self.count = {}
data = kwargs.get('data', None)
self.build(data)
self.uod = {}
self.min = self.target_variable.partitioner.min
self.max = self.target_variable.partitioner.max
def format_data(self, data):
ndata = {}
@ -88,8 +93,11 @@ class MultivariatePartitioner(partitioner.Partitioner):
return fuzzyfy_instance_clustered(data, self, **kwargs)
def change_target_variable(self, variable):
self.target_variable = variable
for fset in self.sets.values():
fset.set_target_variable(variable)
self.min = variable.partitioner.min
self.max = variable.partitioner.max
def build_index(self):

View File

@ -41,6 +41,13 @@ class ProbabilisticWeightedFLRG(hofts.HighOrderFLRG):
return tmp
def lhs_conditional_probability_fuzzyfied(self, lhs_mv, sets, norm, uod, nbins):
pk = self.frequency_count / norm
tmp = pk * (lhs_mv / self.partition_function(sets, uod, nbins=nbins))
return tmp
def rhs_unconditional_probability(self, c):
return self.RHS[c] / self.frequency_count
@ -114,14 +121,54 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
def train(self, data, **kwargs):
self.configure_lags(**kwargs)
parameters = kwargs.get('parameters','fuzzy')
if parameters == 'monotonic':
tmpdata = self.partitioner.fuzzyfy(data, mode='sets', method='maximum')
flrs = FLR.generate_recurrent_flrs(tmpdata)
self.generate_flrg(flrs)
if not kwargs.get('fuzzyfied',False):
self.generate_flrg2(data)
else:
self.generate_flrg(data)
self.generate_flrg_fuzzyfied(data)
def generate_flrg2(self, data):
fuzz = []
l = len(data)
for k in np.arange(0, l):
fuzz.append(self.partitioner.fuzzyfy(data[k], mode='both', method='fuzzy',
alpha_cut=self.alpha_cut))
self.generate_flrg_fuzzyfied(fuzz)
def generate_flrg_fuzzyfied(self, data):
l = len(data)
for k in np.arange(self.max_lag, l):
sample = data[k - self.max_lag: k]
set_sample = []
for instance in sample:
set_sample.append([k for k, v in instance])
flrgs = self.generate_lhs_flrg_fuzzyfied(set_sample)
for flrg in flrgs:
if flrg.get_key() not in self.flrgs:
self.flrgs[flrg.get_key()] = flrg;
lhs_mv = self.pwflrg_lhs_memberhip_fuzzyfied(flrg, sample)
mvs = []
inst = data[k]
for set, mv in inst:
self.flrgs[flrg.get_key()].append_rhs(set, count=lhs_mv * mv)
mvs.append(mv)
tmp_fq = sum([lhs_mv * kk for kk in mvs if kk > 0])
self.global_frequency_count += tmp_fq
def pwflrg_lhs_memberhip_fuzzyfied(self, flrg, sample):
vals = []
for ct, fuzz in enumerate(sample):
vals.append([mv for fset, mv in fuzz if fset == flrg.LHS[ct]])
return np.nanprod(vals)
def generate_lhs_flrg(self, sample, explain=False):
nsample = [self.partitioner.fuzzyfy(k, mode="sets", alpha_cut=self.alpha_cut)
@ -206,6 +253,11 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
pb = self.flrg_lhs_unconditional_probability(flrg)
return mv * pb
def flrg_lhs_conditional_probability_fuzzyfied(self, x, flrg):
mv = self.pwflrg_lhs_memberhip_fuzzyfied(flrg, x)
pb = self.flrg_lhs_unconditional_probability(flrg)
return mv * pb
def get_midpoint(self, flrg):
if flrg.get_key() in self.flrgs:
tmp = self.flrgs[flrg.get_key()]
@ -273,11 +325,16 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
def point_heuristic(self, sample, **kwargs):
explain = kwargs.get('explain', False)
fuzzyfied = kwargs.get('fuzzyfied', False)
if explain:
print("Fuzzyfication \n")
if not fuzzyfied:
flrgs = self.generate_lhs_flrg(sample, explain)
else:
fsets = self.get_sets_from_both_fuzzyfication(sample)
flrgs = self.generate_lhs_flrg_fuzzyfied(fsets, explain)
mp = []
norms = []
@ -286,16 +343,17 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
print("Rules:\n")
for flrg in flrgs:
if not fuzzyfied:
norm = self.flrg_lhs_conditional_probability(sample, flrg)
else:
norm = self.flrg_lhs_conditional_probability_fuzzyfied(sample, flrg)
if norm == 0:
norm = self.flrg_lhs_unconditional_probability(flrg)
if explain:
print("\t {} \t Midpoint: {}\t Norm: {}\n".format(str(self.flrgs[flrg.get_key()]),
self.get_midpoint(flrg), norm))
mp.append(norm * self.get_midpoint(flrg))
norms.append(norm)
@ -307,10 +365,13 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
print("Deffuzyfied value: {} \n".format(final))
return final
def get_sets_from_both_fuzzyfication(self, sample):
return [[k for k, v in inst] for inst in sample]
def point_expected_value(self, sample, **kwargs):
explain = kwargs.get('explain', False)
dist = self.forecast_distribution(sample)[0]
dist = self.forecast_distribution(sample, **kwargs)[0]
final = dist.expected_value()
return final
@ -329,28 +390,37 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
sample = ndata[k - (self.max_lag - 1): k + 1]
if method == 'heuristic':
ret.append(self.interval_heuristic(sample))
ret.append(self.interval_heuristic(sample, **kwargs))
elif method == 'quantile':
ret.append(self.interval_quantile(sample, alpha))
ret.append(self.interval_quantile(sample, alpha, **kwargs))
else:
raise ValueError("Unknown interval forecasting method!")
return ret
def interval_quantile(self, ndata, alpha):
dist = self.forecast_distribution(ndata)
def interval_quantile(self, ndata, alpha, **kwargs):
dist = self.forecast_distribution(ndata, **kwargs)
itvl = dist[0].quantile([alpha, 1.0 - alpha])
return itvl
def interval_heuristic(self, sample):
def interval_heuristic(self, sample, **kwargs):
fuzzyfied = kwargs.get('fuzzyfied', False)
if not fuzzyfied:
flrgs = self.generate_lhs_flrg(sample)
else:
fsets = self.get_sets_from_both_fuzzyfication(sample)
flrgs = self.generate_lhs_flrg_fuzzyfied(fsets)
up = []
lo = []
norms = []
for flrg in flrgs:
if not fuzzyfied:
norm = self.flrg_lhs_conditional_probability(sample, flrg)
else:
norm = self.flrg_lhs_conditional_probability_fuzzyfied(sample, flrg)
if norm == 0:
norm = self.flrg_lhs_unconditional_probability(flrg)
up.append(norm * self.get_upper(flrg))
@ -370,6 +440,8 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
smooth = kwargs.get("smooth", "none")
fuzzyfied = kwargs.get('fuzzyfied', False)
l = len(ndata)
uod = self.get_UoD()
@ -385,7 +457,11 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
for k in np.arange(self.max_lag - 1, l):
sample = ndata[k - (self.max_lag - 1): k + 1]
if not fuzzyfied:
flrgs = self.generate_lhs_flrg(sample)
else:
fsets = self.get_sets_from_both_fuzzyfication(sample)
flrgs = self.generate_lhs_flrg_fuzzyfied(fsets)
if 'type' in kwargs:
kwargs.pop('type')
@ -398,8 +474,14 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
for s in flrgs:
if s.get_key() in self.flrgs:
flrg = self.flrgs[s.get_key()]
pk = flrg.lhs_conditional_probability(sample, self.partitioner.sets, self.global_frequency_count, uod, nbins)
wi = flrg.rhs_conditional_probability(bin, self.partitioner.sets, uod, nbins)
if not fuzzyfied:
pk = flrg.lhs_conditional_probability(sample, self.partitioner.sets, self.global_frequency_count, uod, nbins)
else:
lhs_mv = self.pwflrg_lhs_memberhip_fuzzyfied(flrg, sample)
pk = flrg.lhs_conditional_probability_fuzzyfied(lhs_mv, self.partitioner.sets,
self.global_frequency_count, uod, nbins)
num.append(wi * pk)
den.append(pk)
else:
@ -422,13 +504,15 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
l = len(data)
fuzzyfied = kwargs.get('fuzzyfied', False)
start = kwargs.get('start_at', 0)
ret = data[start: start+self.max_lag].tolist()
for k in np.arange(self.max_lag, steps+self.max_lag):
if self.__check_point_bounds(ret[-1]) :
if self.__check_point_bounds(ret[-1]) and not fuzzyfied:
ret.append(ret[-1])
else:
mp = self.forecast(ret[k - self.max_lag: k], **kwargs)
@ -448,11 +532,19 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
start = kwargs.get('start_at', 0)
fuzzyfied = kwargs.get('fuzzyfied', False)
sample = data[start: start + self.max_lag]
if not fuzzyfied:
ret = [[k, k] for k in sample]
else:
ret = []
for k in sample:
kv = self.partitioner.deffuzyfy(k,mode='both')
ret.append([kv,kv])
ret.append(self.forecast_interval(sample)[0])
ret.append(self.forecast_interval(sample, **kwargs)[0])
for k in np.arange(self.max_lag+1, steps+self.max_lag):
@ -492,7 +584,7 @@ class ProbabilisticWeightedFTS(ifts.IntervalFTS):
tmp.set(dat, 1.0)
ret.append(tmp)
dist = self.forecast_distribution(sample, bins=_bins)[0]
dist = self.forecast_distribution(sample, bins=_bins, **kwargs)[0]
ret.append(dist)

View File

@ -181,6 +181,34 @@ class Partitioner(object):
sets = [(self.ordered_sets[i], mv[i]) for i in ix]
return sets
def defuzzyfy(self, values, mode='both'):
if not isinstance(values, list):
values = [values]
ret = []
for val in values:
if mode == 'both':
num = []
den = []
for fset, mv in val:
num.append( self.sets[fset].centroid * mv )
den.append(mv)
ret.append(np.sum(num)/np.sum(den))
elif mode == 'both':
num = np.mean([self.sets[fset].centroid for fset in val ])
ret.append(num)
elif mode == 'vector':
num = []
den = []
for fset, mv in enumerate(val):
num.append(self.sets[self.ordered_sets[fset]].centroid * mv)
den.append(mv)
ret.append(np.sum(num) / np.sum(den))
else:
raise Exception('Unknown deffuzyfication mode')
return ret
def check_bounds(self, data):
"""
Check if the input data is outside the known Universe of Discourse and, if it is, round it to the closest

View File

@ -31,7 +31,7 @@ datasets['Malaysia.load'] = malaysia["load"].values
windows = [600000, 600000, 10000, 10000]
cpus = 3
cpus = 7
for ct, (dataset_name, dataset) in enumerate(datasets.items()):
bchmk.train_test_time(dataset, windowsize=windows[ct], train=0.9, inc=.5,
@ -40,6 +40,6 @@ for ct, (dataset_name, dataset) in enumerate(datasets.items()):
partitions=50,
steps=cpus,
num_batches=cpus,
distributed='dispy', nodes=['192.168.0.110'], #, '192.168.0.107','192.168.0.106'],
distributed='dispy', nodes=['192.168.0.110', '192.168.0.107','192.168.0.106'],
file="experiments.db", dataset=dataset_name,
tag="speedup")

View File

@ -9,70 +9,60 @@ from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from pyFTS.common import Util
from pyFTS.data import TAIEX
taiex = TAIEX.get_data()
train = taiex[:3000]
test = taiex[3000:3200]
from pyFTS.common import Transformations
tdiff = Transformations.Differential(1)
from pyFTS.benchmarks import benchmarks as bchmk, Measures
from pyFTS.models import pwfts,hofts,ifts
from pyFTS.models.multivariate import granular, grid
from pyFTS.partitioners import Grid, Util as pUtil
fs = Grid.GridPartitioner(data=train, npart=30) #, transformation=tdiff)
from pyFTS.models.multivariate import common, variable, mvfts
from pyFTS.models.seasonal import partitioner as seasonal
from pyFTS.models.seasonal.common import DateTime
from pyFTS.common import Membership
model1 = hofts.HighOrderFTS(partitioner=fs, lags=[1,2])#lags=[0,1])
model1.shortname = "1"
model2 = pwfts.ProbabilisticWeightedFTS(partitioner=fs, lags=[1,2])
#model2.append_transformation(tdiff)
model2.shortname = "2"
#model = pwfts.ProbabilisticWeightedFTS(partitioner=fs, order=2)# lags=[1,2])
from pyFTS.data import SONDA, Malaysia
model1.fit(train)
model2.fit(train)
df = Malaysia.get_dataframe()
df['time'] = pd.to_datetime(df["time"], format='%m/%d/%y %I:%M %p')
#print(model1)
train_mv = df.iloc[:8000]
test_mv = df.iloc[8000:10000]
#print(model2)
sp = {'seasonality': DateTime.minute_of_day, 'names': [str(k)+'hs' for k in range(0,24)]}
for model in [model1, model2]:
#forecasts = model.predict(test)
print(model.shortname)
print(Measures.get_point_statistics(test, model))
vhour = variable.Variable("Hour", data_label="time", partitioner=seasonal.TimeGridPartitioner, npart=24,
data=train_mv, partitioner_specific=sp, alpha_cut=.3)
vtemp = variable.Variable("Temperature", data_label="temperature", alias='temp',
partitioner=Grid.GridPartitioner, npart=5, func=Membership.gaussmf,
data=train_mv, alpha_cut=.3)
vload = variable.Variable("Load", data_label="load", alias='load',
partitioner=Grid.GridPartitioner, npart=5, func=Membership.gaussmf,
data=train_mv, alpha_cut=.3)
#handles, labels = ax.get_legend_handles_labels()
#ax.legend(handles, labels, loc=2, bbox_to_anchor=(1, 1))
order = 1
knn = 1
#print(Measures.get_point_statistics(test,model))
model = granular.GranularWMVFTS(explanatory_variables=[vhour, vtemp, vload], target_variable=vload,
fts_method=pwfts.ProbabilisticWeightedFTS, fuzzyfy_mode='both',
order=order, knn=knn)
model.fit(train_mv)
print(model)
print(model.predict(test_mv.iloc[:10], type='point'))
print(model.predict(test_mv.iloc[:10], type='interval'))
print(model.predict(test_mv.iloc[:10], type='distribution'))
'''
bchmk.sliding_window_benchmarks(train,1000,0.8,
methods=[pwfts.ProbabilisticWeightedFTS], #,ifts.IntervalFTS],
orders=[1,2,3],
partitions=[10])
from pyFTS.data import Enrollments
train = Enrollments.get_data()
fs = Grid.GridPartitioner(data=train, npart=10) #, transformation=tdiff)
model = pwfts.ProbabilisticWeightedFTS(partitioner=fs, order=2)
model.fit(train)
print(model)
print(model.predict(train))
'''
'''
from pyFTS.common import FLR,FuzzySet,Membership,SortedCollection
taiex_fs1 = Grid.GridPartitioner(data=train, npart=30)
taiex_fs2 = Grid.GridPartitioner(data=train, npart=10, transformation=tdiff)
#pUtil.plot_partitioners(train, [taiex_fs1,taiex_fs2], tam=[15,7])
from pyFTS.common import fts,tree
from pyFTS.models import hofts, pwfts
pfts1_taiex = pwfts.ProbabilisticWeightedFTS("1", partitioner=taiex_fs1)
#pfts1_taiex.appendTransformation(diff)
pfts1_taiex.fit(train, save_model=True, file_path='pwfts')
pfts1_taiex.shortname = "1st Order"
print(pfts1_taiex)
'''