Source code for pyFTS.benchmarks.knn
#!/usr/bin/python
# -*- coding: utf8 -*-
import numpy as np
from statsmodels.tsa.tsatools import lagmat
from pyFTS.common import fts
from pyFTS.probabilistic import ProbabilityDistribution
[docs]class KNearestNeighbors(fts.FTS):
"""
K-Nearest Neighbors
"""
def __init__(self, **kwargs):
super(KNearestNeighbors, self).__init__(**kwargs)
self.name = "kNN"
self.shortname = "kNN"
self.detail = "K-Nearest Neighbors"
self.is_high_order = True
self.has_point_forecasting = True
self.has_interval_forecasting = True
self.has_probability_forecasting = True
self.benchmark_only = True
self.min_order = 1
self.alpha = kwargs.get("alpha", 0.05)
self.lag = None
self.k = kwargs.get("k", 30)
self.uod = None
[docs] def train(self, data, **kwargs):
self.data = np.array(data)
[docs] def knn(self, sample):
if self.order == 1:
dist = np.apply_along_axis(lambda x: (x - sample) ** 2, 0, self.data)
ix = np.argsort(dist) + 1
else:
dist = []
for k in np.arange(self.order, len(self.data)):
dist.append(sum([ (self.data[k - kk] - sample[kk])**2 for kk in range(self.order)]))
ix = np.argsort(np.array(dist)) + self.order + 1
ix2 = np.clip(ix[:self.k], 0, len(self.data)-1)
return self.data[ix2]
[docs] def forecast_distribution(self, data, **kwargs):
ret = []
smooth = kwargs.get("smooth", "KDE")
alpha = kwargs.get("alpha", None)
uod = self.get_UoD()
for k in np.arange(self.order, len(data)):
sample = data[k-self.order : k]
forecasts = self.knn(sample)
dist = ProbabilityDistribution.ProbabilityDistribution(smooth, uod=uod, data=forecasts,
name="", **kwargs)
ret.append(dist)
return ret