diff --git a/pyFTS/common/fts.py b/pyFTS/common/fts.py index 59bc6b7..9895468 100644 --- a/pyFTS/common/fts.py +++ b/pyFTS/common/fts.py @@ -548,7 +548,7 @@ class FTS(object): params = [ None for k in self.transformations] for c, t in enumerate(self.transformations, start=0): - ndata = t.apply(ndata,params[c]) + ndata = t.apply(ndata, params[c], ) return ndata diff --git a/pyFTS/common/transformations/som.py b/pyFTS/common/transformations/som.py new file mode 100644 index 0000000..a5aee39 --- /dev/null +++ b/pyFTS/common/transformations/som.py @@ -0,0 +1,98 @@ +""" +Kohonen Self Organizing Maps for Fuzzy Time Series +""" +import pandas as pd +import SimpSOM as sps +from pyFTS.models.multivariate import wmvfts +from typing import Tuple +from pyFTS.common.Transformations import Transformation +from typing import List + +class SOMTransformation(Transformation): + def __init__(self, + grid_dimension: Tuple, + **kwargs): + # SOM attributes + self.load_file = kwargs.get('loadFile') + self.net: sps.somNet = None + self.data: pd.DataFrame = None + self.grid_dimension: Tuple = grid_dimension + self.pbc = kwargs.get('PBC', True) + + # debug attributes + self.name = 'Kohonen Self Organizing Maps FTS' + self.shortname = 'SOM-FTS' + + def apply(self, + data: pd.DataFrame, + endogen_variable=None, + names: Tuple[str] = ('x', 'y'), + param=None, + **kwargs): + """ + Transform a M-dimensional dataset into a 3-dimensional dataset, where one dimension is the endogen variable + If endogen_variable = None, the last column will be the endogen_variable. + Args: + data (pd.DataFrame): M-Dimensional dataset + endogen_variable (str): column of dataset + names (Tuple): names for new columns created by SOM Transformation. + param: + **kwargs: params of SOM's train process + percentage_train (float). Percentage of dataset that will be used for train SOM network. default: 0.7 + leaning_rate (float): leaning rate of SOM network. default: 0.01 + epochs: epochs of SOM network. default: 10000 + Returns: + + """ + + if endogen_variable not in data.columns: + endogen_variable = None + cols = data.columns[:-1] if endogen_variable is None else [col for col in data.columns if + col != endogen_variable] + if self.net is None: + train = data[cols] + self.train(data=train, **kwargs) + new_data = self.net.project(data[cols].values) + new_data = pd.DataFrame(new_data, columns=names) + endogen = endogen_variable if endogen_variable is not None else data.columns[-1] + new_data[endogen] = data[endogen].values + return new_data + + def __repr__(self): + status = "is trained" if self.net is not None else "not trained" + return f'{self.name}-{status}' + + def __str__(self): + return self.name + + def __del__(self): + del self.net + + def train(self, + data: pd.DataFrame, + percentage_train: float = .7, + leaning_rate: float = 0.01, + epochs: int = 10000): + data = data.dropna() + self.data = data.values + limit = round(len(self.data) * percentage_train) + train = self.data[:limit] + x, y = self.grid_dimension + self.net = sps.somNet(x, y, train, PBC=self.pbc, loadFile=self.load_file) + self.net.train(startLearnRate=leaning_rate, + epochs=epochs) + + + def save_net(self, + filename: str = "SomNet trained"): + self.net.save(filename) + self.load_file = filename + + def show_grid(self, + graph_type: str = 'nodes_graph', + **kwargs): + if graph_type == 'nodes_graph': + colnum = kwargs.get('colnum', 0) + self.net.nodes_graph(colnum=colnum) + else: + self.net.diff_graph() diff --git a/pyFTS/partitioners/som.py b/pyFTS/partitioners/som.py new file mode 100644 index 0000000..059ea11 --- /dev/null +++ b/pyFTS/partitioners/som.py @@ -0,0 +1,67 @@ +""" +Kohonen Self Organizing Maps for Fuzzy Time Series +""" +import pandas as pd +import SimpSOM as sps +from pyFTS.models.multivariate import wmvfts +from typing import Tuple + + +class SOMPartitioner: + def __init__(self, + grid_dimension: Tuple, + **kwargs): + # SOM attributes + self.net: sps.somNet = None + self.data: pd.DataFrame = None + self.grid_dimension: Tuple = grid_dimension + self.pbc = kwargs.get('PBC', True) + + + # debug attributes + self.name = 'Kohonen Self Organizing Maps FTS' + self.shortname = 'SOM-FTS' + + def __repr__(self): + status = "is trained" if self.is_trained else "not trained" + return f'{self.name}-{status}' + + def __str__(self): + return self.name + + def __del__(self): + del self.net + + def train(self, + data: pd.DataFrame, + percentage_train: float = .7, + leaning_rate: float = 0.01, + epochs: int = 10000): + self.data = data + limit = len(self.data) * percentage_train + train = data[:limit] + x, y = self.grid_dimension + self.net = sps.somNet(x, y, train, self.pbc) + self.net.train(startLearnRate=leaning_rate, + epochs=epochs) + + def save_net(self, + filename: str = "SomNet trained"): + self.net.save(filename) + + def show_grid(self, + graph_type: str = 'nodes_graph', + **kwargs): + if graph_type == 'nodes_graph': + colnum = kwargs.get('colnum', 0) + self.net.nodes_graph(colnum=colnum) + else: + self.net.diff_graph() + + + +""" +Requisitos + + +""" \ No newline at end of file diff --git a/pyFTS/tests/test_SOMTransformation.py b/pyFTS/tests/test_SOMTransformation.py new file mode 100644 index 0000000..a9ec8f6 --- /dev/null +++ b/pyFTS/tests/test_SOMTransformation.py @@ -0,0 +1,81 @@ +import unittest +from pyFTS.common.transformations.som import SOMTransformation +import pandas as pd +import os +import numpy as np + +class MyTestCase(unittest.TestCase): + def test_apply_without_column_names(self): + data = [ + [1, 1, 1, 1, 1], + [1, 1, 1, 1, 1], + [1, 1, 1, 1, 1], + [1, 1, 1, 1, 1], + ] + som = self.som_transformer_trained() + transformed = som.apply(data=pd.DataFrame(data)) + uniques = np.unique(transformed) + + self.assertEqual(1, len(uniques.shape)) + self.assertEqual(3, transformed.values.shape[1]) + + def test_apply_with_column_names(self): + df = self.simple_dataset() + df.columns = ['a', 'b', 'c', 'd', 'e'] + som = SOMTransformation(grid_dimension=(2, 2)) + result = som.apply(df, endogen_variable='a') + result.dropna(inplace=True) + self.assertEqual(5, len(result)) + self.assertEqual(3, len(result.columns)) + + + def test_save_net(self): + som_transformer = self.som_transformer_trained() + + filename = 'test_net.npy' + som_transformer.save_net(filename) + files = os.listdir() + + if filename in files: + is_in_files = True + os.remove(filename) + else: + is_in_files = False + + self.assertEqual(True, is_in_files) + + def test_train_with_invalid_values_should_remove_nan_row(self): + data = [ + [1, 1, float('nan'), 1, 1], + [1, 1, 1, 1, 0], + [1, 1, 1, 0, 0], + [float('nan'), 1, 0, 0, 0], + [1, 0, 0, 0, 0], + ] + df = pd.DataFrame(data) + som = SOMTransformation(grid_dimension=(2, 2)) + som.train(data=df) + + self.assertEqual(3, len(som.data)) + self.assertEqual(5, len(df.columns)) + + @staticmethod + def simple_dataset(): + data = [ + [1, 1, 1, 1, 1], + [1, 1, 1, 1, 0], + [1, 1, 1, 0, 0], + [1, 1, 0, 0, 0], + [1, 0, 0, 0, 0], + ] + df = pd.DataFrame(data) + return df + + def som_transformer_trained(self): + data = self.simple_dataset() + som_transformer = SOMTransformation(grid_dimension=(2, 2)) + som_transformer.train(data=data, epochs=100) + return som_transformer + +if __name__ == '__main__': + unittest.main()