"""
Non Stationary Fuzzy Sets
GARIBALDI, Jonathan M.; JAROSZEWSKI, Marcin; MUSIKASUWAN, Salang. Nonstationary fuzzy sets.
IEEE Transactions on Fuzzy Systems, v. 16, n. 4, p. 1072-1086, 2008.
"""
import numpy as np
from pyFTS import *
from pyFTS.common import FuzzySet as FS, Membership, FLR
from pyFTS.partitioners import partitioner
from pyFTS.models.nonstationary import perturbation
[docs]class FuzzySet(FS.FuzzySet):
"""
Non Stationary Fuzzy Sets
"""
def __init__(self, name, mf, parameters, **kwargs):
"""
Constructor
"""
super(FuzzySet, self).__init__(name=name, mf=mf, parameters=parameters, centroid=None, alpha=1.0, **kwargs)
self.location = kwargs.get("location", None)
"""Pertubation function that affects the location of the membership function"""
self.location_params = kwargs.get("location_params", None)
"""Parameters for location pertubation function"""
self.location_roots = kwargs.get("location_roots", 0)
self.width = kwargs.get("width", None)
"""Pertubation function that affects the width of the membership function"""
self.width_params = kwargs.get("width_params", None)
"""Parameters for width pertubation function"""
self.width_roots = kwargs.get("width_roots", 0)
self.noise = kwargs.get("noise", None)
"""Pertubation function that adds noise on the membership function"""
self.noise_params = kwargs.get("noise_params", None)
"""Parameters for noise pertubation function"""
self.perturbated_parameters = {}
self.type = 'nonstationary'
if self.location is not None and not isinstance(self.location, (list, set)):
self.location = [self.location]
self.location_params = [self.location_params]
self.location_roots = [self.location_roots]
if self.width is not None and not isinstance(self.width, (list, set)):
self.width = [self.width]
self.width_params = [self.width_params]
self.width_roots = [self.width_roots]
[docs] def membership(self, x, t):
"""
Calculate the membership value of a given input
:param x: input value
:param t: time displacement or perturbation parameters
:return: membership value of x at this fuzzy set
"""
self.perturbate_parameters(t)
tmp = self.mf(x, self.perturbated_parameters[str(t)])
if self.noise is not None:
tmp += self.noise(t, self.noise_params)
return tmp
[docs] def perturbate_parameters(self, t):
if str(t) not in self.perturbated_parameters:
param = self.parameters
if isinstance(t, (list, set)):
param = self.perform_location(t[0], param)
param = self.perform_width(t[1], param)
else:
param = self.perform_location(t, param)
param = self.perform_width(t, param)
self.perturbated_parameters[str(t)] = param
[docs] def get_midpoint(self, t):
self.perturbate_parameters(t)
param = self.perturbated_parameters[str(t)]
if self.mf == Membership.gaussmf:
return param[0]
elif self.mf == Membership.sigmf:
return param[1]
elif self.mf == Membership.trimf:
return param[1]
elif self.mf == Membership.trapmf:
return (param[2] - param[1]) / 2
else:
return param
[docs] def get_lower(self, t):
self.perturbate_parameters(t)
param = self.perturbated_parameters[str(t)]
if self.mf == Membership.gaussmf:
return param[0] - 3*param[1]
elif self.mf == Membership.sigmf:
return param[0] - param[1]
elif self.mf == Membership.trimf:
return param[0]
elif self.mf == Membership.trapmf:
return param[0]
else:
return param
[docs] def get_upper(self, t):
self.perturbate_parameters(t)
param = self.perturbated_parameters[str(t)]
if self.mf == Membership.gaussmf:
return param[0] + 3*param[1]
elif self.mf == Membership.sigmf:
return param[0] + param[1]
elif self.mf == Membership.trimf:
return param[2]
elif self.mf == Membership.trapmf:
return param[3]
else:
return param
def __str__(self):
tmp = ""
if self.location is not None:
tmp += "Location: "
for ct, f in enumerate(self.location):
tmp += str(f.__name__) + "(" + str(["{0:.2f}".format(p) for p in self.location_params[ct]]) + ") "
if self.width is not None:
tmp += "Width: "
for ct, f in enumerate(self.width):
tmp += str(f.__name__) + "(" + str(["{0:.2f}".format(p) for p in self.width_params[ct]]) + ") "
tmp = "(" + str(["{0:.2f}".format(p) for p in self.parameters]) + ") " + tmp
return self.name + ": " + str(self.mf.__name__) + tmp
[docs]def fuzzify(inst, t, fuzzySets):
"""
Calculate the membership values for a data point given nonstationary fuzzy sets
:param inst: data points
:param t: time displacement of the instance
:param fuzzySets: list of fuzzy sets
:return: array of membership values
"""
ret = []
if not isinstance(inst, list):
inst = [inst]
for t, i in enumerate(inst):
mv = np.array([fs.membership(i, t) for fs in fuzzySets])
ret.append(mv)
return ret
[docs]def fuzzySeries(data, fuzzySets, ordered_sets, window_size=1, method='fuzzy', const_t= None):
fts = []
for t, i in enumerate(data):
tdisp = window_index(t, window_size) if const_t is None else const_t
mv = np.array([fuzzySets[fs].membership(i, tdisp) for fs in ordered_sets])
if len(mv) == 0:
sets = [check_bounds(i, fuzzySets, tdisp)]
else:
if method == 'fuzzy':
ix = np.ravel(np.argwhere(mv > 0.0))
elif method == 'maximum':
mx = max(mv)
ix = np.ravel(np.argwhere(mv == mx))
sets = [fuzzySets[ordered_sets[i]] for i in ix]
fts.append(sets)
return fts
[docs]def window_index(t, window_size):
if isinstance(t, (list, set)):
return t
return t - (t % window_size)
[docs]def check_bounds(data, partitioner, t):
if data < partitioner.lower_set().get_lower(t):
return partitioner.lower_set()
elif data > partitioner.upper_set().get_upper(t):
return partitioner.upper_set()
[docs]def check_bounds_index(data, partitioner, t):
if data < partitioner.lower_set().get_lower(t):
return 0
elif data > partitioner.upper_set().get_upper(t):
return len(partitioner.sets) -1