Source code for pyFTS.benchmarks.knn

#!/usr/bin/python
# -*- coding: utf8 -*-

import numpy as np
from statsmodels.tsa.tsatools import lagmat
from pyFTS.common import fts
from pyFTS.probabilistic import ProbabilityDistribution
from sklearn.neighbors import KDTree
from itertools import product
from pyFTS.models.ensemble.ensemble import sampler

[docs]class KNearestNeighbors(fts.FTS): """ A façade for sklearn.neighbors """ def __init__(self, **kwargs): super(KNearestNeighbors, self).__init__(**kwargs) self.name = "kNN" self.shortname = "kNN" self.detail = "K-Nearest Neighbors" self.uod_clip = False self.is_high_order = True self.has_point_forecasting = True self.has_interval_forecasting = True self.has_probability_forecasting = True self.benchmark_only = True self.min_order = 1 self.alpha = kwargs.get("alpha", 0.05) self.max_lag = self.order self.lag = None self.k = kwargs.get("k", 30) self.uod = None self.kdtree = None self.values = None def _prepare_x(self, data): l = len(data) X = [] if l == self.order: l += 1 for t in np.arange(self.order, l): X.append([data[t - k - 1] for k in np.arange(self.order)]) return X def _prepare_xy(self, data): l = len(data) X = [] Y = [] for t in np.arange(self.order, l): X.append([data[t - k - 1] for k in np.arange(self.order)]) Y.append(data[t]) return (X,Y)
[docs] def train(self, data, **kwargs): X,Y = self._prepare_xy(data) self.kdtree = KDTree(np.array(X)) self.values = Y self.shortname = "kNN({})-{}".format(self.order, self.alpha)
[docs] def knn(self, sample): X = self._prepare_x(sample) _, ix = self.kdtree.query(np.array(X), self.k) return [self.values[k] for k in ix.flatten() ]
[docs] def forecast(self, data, **kwargs): l = len(data) ret = [] for k in np.arange(self.order, l+(1 if self.order == l else 0)): sample = data[k-self.order : k] forecasts = self.knn(sample) ret.append(np.nanmean(forecasts)) return ret
[docs] def forecast_interval(self, data, **kwargs): alpha = kwargs.get('alpha',self.alpha) ret = [] for k in np.arange(self.order, len(data)): sample = data[k-self.order : k] forecasts = self.knn(sample) i = np.percentile(forecasts, [alpha*100, (1-alpha)*100]).tolist() ret.append(i) return ret
[docs] def forecast_ahead_interval(self, data, steps, **kwargs): alpha = kwargs.get('alpha', self.alpha) ret = [] start = kwargs.get('start', self.order) sample = [[k] for k in data[start - self.order: start]] for k in np.arange(self.order, steps + self.order): forecasts = [] lags = [sample[k - i - 1] for i in np.arange(0, self.order)] # Trace the possible paths for path in product(*lags): forecasts.extend(self.knn(path)) sample.append(sampler(forecasts, np.arange(.1, 1, 0.1), bounds=True)) interval = np.percentile(forecasts, [alpha*100, (1-alpha)*100]).tolist() ret.append(interval) return ret
[docs] def forecast_distribution(self, data, **kwargs): ret = [] smooth = kwargs.get("smooth", "histogram") uod = self.get_UoD() for k in np.arange(self.order, len(data)): sample = data[k-self.order : k] forecasts = self.knn(sample) dist = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, data=forecasts, name="", **kwargs) ret.append(dist) return ret
[docs] def forecast_ahead_distribution(self, data, steps, **kwargs): smooth = kwargs.get("smooth", "histogram") ret = [] start = kwargs.get('start', self.order) uod = self.get_UoD() sample = [[k] for k in data[start - self.order: start]] for k in np.arange(self.order, steps + self.order): forecasts = [] lags = [sample[k - i - 1] for i in np.arange(0, self.order)] # Trace the possible paths for path in product(*lags): forecasts.extend(self.knn(path)) dist = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, data=forecasts, name="", **kwargs) ret.append(dist) sample.append(sampler(forecasts, np.arange(.1, 1, 0.1), bounds=True)) return ret