import numpy as np
import pandas as pd
from pyFTS.common import FuzzySet, FLR, fts, flrg
from pyFTS.models import hofts
from pyFTS.models.multivariate import mvfts, grid, common
from types import LambdaType
[docs]class ClusteredMVFTS(mvfts.MVFTS):
"""
Meta model for high order, clustered multivariate FTS
"""
def __init__(self, **kwargs):
super(ClusteredMVFTS, self).__init__(**kwargs)
self.fts_method = kwargs.get('fts_method', hofts.WeightedHighOrderFTS)
"""The FTS method to be called when a new model is build"""
self.fts_params = kwargs.get('fts_params', {})
"""The FTS method specific parameters"""
self.model = None
"""The most recent trained model"""
self.knn = kwargs.get('knn', 2)
self.is_high_order = True
self.is_clustered = True
self.order = kwargs.get("order", 2)
self.lags = kwargs.get("lags", None)
self.alpha_cut = kwargs.get('alpha_cut', 0.0)
self.shortname = "ClusteredMVFTS"
self.name = "Clustered Multivariate FTS"
self.pre_fuzzyfy = kwargs.get('pre_fuzzyfy', True)
self.fuzzyfy_mode = kwargs.get('fuzzyfy_mode', 'sets')
[docs] def fuzzyfy(self,data):
ndata = []
for index, row in data.iterrows() if isinstance(data, pd.DataFrame) else enumerate(data):
data_point = self.format_data(row)
ndata.append(self.partitioner.fuzzyfy(data_point, mode=self.fuzzyfy_mode))
return ndata
[docs] def train(self, data, **kwargs):
self.fts_params['order'] = self.order
self.model = self.fts_method(partitioner=self.partitioner, **self.fts_params)
ndata = self.apply_transformations(data)
ndata = self.check_data(ndata)
self.model.train(ndata, fuzzyfied=self.pre_fuzzyfy)
self.partitioner.prune()
[docs] def check_data(self, data):
if self.pre_fuzzyfy:
ndata = self.fuzzyfy(data)
else:
ndata = [self.format_data(k) for k in data.to_dict('records')]
return ndata
[docs] def forecast(self, data, **kwargs):
ndata1 = self.apply_transformations(data)
ndata = self.check_data(ndata1)
pre_fuzz = kwargs.get('pre_fuzzyfy', self.pre_fuzzyfy)
ret = self.model.forecast(ndata, fuzzyfied=pre_fuzz, **kwargs)
ret = self.target_variable.apply_inverse_transformations(ret,
params=data[self.target_variable.data_label].values)
return ret
[docs] def forecast_interval(self, data, **kwargs):
if not self.model.has_interval_forecasting:
raise Exception("The internal method does not support interval forecasting!")
data = self.check_data(data)
pre_fuzz = kwargs.get('pre_fuzzyfy', self.pre_fuzzyfy)
return self.model.forecast_interval(data, fuzzyfied=pre_fuzz, **kwargs)
[docs] def forecast_distribution(self, data, **kwargs):
if not self.model.has_probability_forecasting:
raise Exception("The internal method does not support probabilistic forecasting!")
data = self.check_data(data)
pre_fuzz = kwargs.get('pre_fuzzyfy', self.pre_fuzzyfy)
return self.model.forecast_distribution(data, fuzzyfied=pre_fuzz, **kwargs)
[docs] def forecast_ahead_distribution(self, data, steps, **kwargs):
generators = kwargs.get('generators', None)
if generators is None:
raise Exception('You must provide parameter \'generators\'! generators is a dict where the keys' +
' are the dataframe column names (except the target_variable) and the values are ' +
'lambda functions that accept one value (the actual value of the variable) '
' and return the next value or trained FTS models that accept the actual values and '
'forecast new ones.')
ndata = self.apply_transformations(data)
start = kwargs.get('start_at', 0)
ret = []
sample = ndata.iloc[start: start + self.max_lag]
for k in np.arange(0, steps):
tmp = self.forecast_distribution(sample.iloc[-self.max_lag:], **kwargs)[0]
ret.append(tmp)
new_data_point = {}
for data_label in generators.keys():
if data_label != self.target_variable.data_label:
if isinstance(generators[data_label], LambdaType):
last_data_point = sample.iloc[-1]
new_data_point[data_label] = generators[data_label](last_data_point[data_label])
elif isinstance(generators[data_label], fts.FTS):
gen_model = generators[data_label]
last_data_point = sample.iloc[-gen_model.order:]
if not gen_model.is_multivariate:
last_data_point = last_data_point[data_label].values
new_data_point[data_label] = gen_model.forecast(last_data_point)[0]
new_data_point[self.target_variable.data_label] = tmp.expected_value()
sample = sample.append(new_data_point, ignore_index=True)
return ret[-steps:]
[docs] def forecast_multivariate(self, data, **kwargs):
ndata = self.check_data(data)
generators = kwargs.get('generators', {})
already_processed_cols = []
ret = {}
ret[self.target_variable.data_label] = self.model.forecast(ndata, fuzzyfied=self.pre_fuzzyfy, **kwargs)
for var in self.explanatory_variables:
if var.data_label not in already_processed_cols:
if var.data_label in generators:
if isinstance(generators[var.data_label], LambdaType):
fx = generators[var.data_label]
if len(data[var.data_label].values) > self.order:
ret[var.data_label] = [fx(k) for k in data[var.data_label].values[self.order:]]
else:
ret[var.data_label] = [fx(data[var.data_label].values[-1])]
elif isinstance(generators[var.data_label], fts.FTS):
model = generators[var.data_label]
if not model.is_multivariate:
ret[var.data_label] = model.forecast(data[var.data_label].values)
else:
ret[var.data_label] = model.forecast(data)
elif self.target_variable.name != var.name:
self.target_variable = var
self.partitioner.change_target_variable(var)
self.model.partitioner = self.partitioner
self.model.reset_calculated_values()
ret[var.data_label] = self.model.forecast(ndata, fuzzyfied=self.pre_fuzzyfy, **kwargs)
already_processed_cols.append(var.data_label)
return pd.DataFrame(ret, columns=ret.keys())
[docs] def forecast_ahead_multivariate(self, data, steps, **kwargs):
ndata = self.apply_transformations(data)
start = kwargs.get('start_at', 0)
ret = ndata.iloc[start:self.order+start]
for k in np.arange(0, steps):
sample = ret.iloc[k:self.order+k]
tmp = self.forecast_multivariate(sample, **kwargs)
ret = ret.append(tmp, ignore_index=True)
return ret
def __str__(self):
"""String representation of the model"""
return str(self.model)
def __len__(self):
"""
The length (number of rules) of the model
:return: number of rules
"""
return len(self.model)