# -*- coding: utf8 -*-
"""
pyFTS module for common benchmark metrics
"""
import time
import numpy as np
import pandas as pd
from pyFTS.common import FuzzySet, SortedCollection
from pyFTS.probabilistic import ProbabilityDistribution
[docs]def acf(data, k):
"""
Autocorrelation function estimative
:param data:
:param k:
:return:
"""
mu = np.mean(data)
sigma = np.var(data)
n = len(data)
s = 0
for t in np.arange(0, n - k):
s += (data[t] - mu) * (data[t + k] - mu)
return 1 / ((n - k) * sigma) * s
[docs]def rmse(targets, forecasts):
"""
Root Mean Squared Error
:param targets:
:param forecasts:
:return:
"""
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
forecasts = np.array(forecasts)
return np.sqrt(np.nanmean((targets - forecasts) ** 2))
[docs]def rmse_interval(targets, forecasts):
"""
Root Mean Squared Error
:param targets:
:param forecasts:
:return:
"""
fmean = [np.mean(i) for i in forecasts]
return np.sqrt(np.nanmean((fmean - targets) ** 2))
[docs]def mape(targets, forecasts):
"""
Mean Average Percentual Error
:param targets:
:param forecasts:
:return:
"""
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
forecasts = np.array(forecasts)
return np.mean(np.abs((targets - forecasts) / targets)) * 100
[docs]def smape(targets, forecasts, type=2):
"""
Symmetric Mean Average Percentual Error
:param targets:
:param forecasts:
:param type:
:return:
"""
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
forecasts = np.array(forecasts)
if type == 1:
return np.mean(np.abs(forecasts - targets) / ((forecasts + targets) / 2))
elif type == 2:
return np.mean(np.abs(forecasts - targets) / (abs(forecasts) + abs(targets))) * 100
else:
return sum(np.abs(forecasts - targets)) / sum(forecasts + targets)
[docs]def mape_interval(targets, forecasts):
fmean = [np.mean(i) for i in forecasts]
return np.mean(abs(fmean - targets) / fmean) * 100
[docs]def UStatistic(targets, forecasts):
"""
Theil's U Statistic
:param targets:
:param forecasts:
:return:
"""
l = len(targets)
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
forecasts = np.array(forecasts)
naive = []
y = []
for k in np.arange(0, l - 1):
y.append((forecasts[k] - targets[k]) ** 2)
naive.append((targets[k + 1] - targets[k]) ** 2)
return np.sqrt(sum(y) / sum(naive))
[docs]def TheilsInequality(targets, forecasts):
"""
Theil’s Inequality Coefficient
:param targets:
:param forecasts:
:return:
"""
res = targets - forecasts
t = len(res)
us = np.sqrt(sum([u ** 2 for u in res]))
ys = np.sqrt(sum([y ** 2 for y in targets]))
fs = np.sqrt(sum([f ** 2 for f in forecasts]))
return us / (ys + fs)
[docs]def BoxPierceStatistic(data, h):
"""
Q Statistic for Box-Pierce test
:param data:
:param h:
:return:
"""
n = len(data)
s = 0
for k in np.arange(1, h + 1):
r = acf(data, k)
s += r ** 2
return n * s
[docs]def BoxLjungStatistic(data, h):
"""
Q Statistic for Ljung–Box test
:param data:
:param h:
:return:
"""
n = len(data)
s = 0
for k in np.arange(1, h + 1):
r = acf(data, k)
s += r ** 2 / (n - k)
return n * (n - 2) * s
[docs]def sharpness(forecasts):
"""Sharpness - Mean size of the intervals"""
tmp = [i[1] - i[0] for i in forecasts]
return np.mean(tmp)
[docs]def resolution(forecasts):
"""Resolution - Standard deviation of the intervals"""
shp = sharpness(forecasts)
tmp = [abs((i[1] - i[0]) - shp) for i in forecasts]
return np.mean(tmp)
[docs]def coverage(targets, forecasts):
"""Percent of target values that fall inside forecasted interval"""
preds = []
for i in np.arange(0, len(forecasts)):
if targets[i] >= forecasts[i][0] and targets[i] <= forecasts[i][1]:
preds.append(1)
else:
preds.append(0)
return np.mean(preds)
[docs]def pinball(tau, target, forecast):
"""
Pinball loss function. Measure the distance of forecast to the tau-quantile of the target
:param tau: quantile value in the range (0,1)
:param target:
:param forecast:
:return: float, distance of forecast to the tau-quantile of the target
"""
if target >= forecast:
return (target - forecast) * tau
else:
return (forecast - target) * (1 - tau)
[docs]def pinball_mean(tau, targets, forecasts):
"""
Mean pinball loss value of the forecast for a given tau-quantile of the targets
:param tau: quantile value in the range (0,1)
:param targets: list of target values
:param forecasts: list of prediction intervals
:return: float, the pinball loss mean for tau quantile
"""
if tau <= 0.5:
preds = [pinball(tau, targets[i], forecasts[i][0]) for i in np.arange(0, len(forecasts))]
else:
preds = [pinball(tau, targets[i], forecasts[i][1]) for i in np.arange(0, len(forecasts))]
return np.nanmean(preds)
[docs]def winkler_score(tau, target, forecast):
'''R. L. Winkler, A Decision-Theoretic Approach to Interval Estimation, J. Am. Stat. Assoc. 67 (337) (1972) 187–191. doi:10.2307/2284720. '''
delta = forecast[1] - forecast[0]
if forecast[0] < target and target < forecast[1]:
return delta
elif forecast[0] > target:
return delta + 2 * (forecast[0] - target) / tau
elif forecast[1] < target:
return delta + 2 * (target - forecast[1]) / tau
[docs]def winkler_mean(tau, targets, forecasts):
"""
Mean Winkler score value of the forecast for a given tau-quantile of the targets
:param tau: quantile value in the range (0,1)
:param targets: list of target values
:param forecasts: list of prediction intervals
:return: float, the Winkler score mean for tau quantile
"""
preds = [winkler_score(tau, targets[i], forecasts[i]) for i in np.arange(0, len(forecasts))]
return np.nanmean(preds)
[docs]def brier_score(targets, densities):
'''Brier (1950). "Verification of Forecasts Expressed in Terms of Probability". Monthly Weather Review. 78: 1–3. '''
ret = []
for ct, d in enumerate(densities):
try:
v = d.bin_index.find_ge(targets[ct])
score = sum([d.distribution[k] ** 2 for k in d.bins if k != v])
score += (d.distribution[v] - 1) ** 2
ret.append(score)
except ValueError as ex:
ret.append(sum([d.distribution[k] ** 2 for k in d.bins]))
return sum(ret) / len(ret)
[docs]def pmf_to_cdf(density):
ret = []
for row in density.index:
tmp = []
prev = 0
for col in density.columns:
prev += density[col][row] if not np.isnan(density[col][row]) else 0
tmp.append(prev)
ret.append(tmp)
df = pd.DataFrame(ret, columns=density.columns)
return df
[docs]def heavyside(bin, target):
return 1 if bin >= target else 0
[docs]def heavyside_cdf(bins, targets):
ret = []
for t in targets:
result = [1 if b >= t else 0 for b in bins]
ret.append(result)
df = pd.DataFrame(ret, columns=bins)
return df
[docs]def crps(targets, densities):
'''
Continuous Ranked Probability Score
:param targets: a list with the target values
:param densities: a list with pyFTS.probabil objectsistic.ProbabilityDistribution
:return: float
'''
_crps = float(0.0)
if isinstance(densities, ProbabilityDistribution.ProbabilityDistribution):
densities = [densities]
l = len(densities[0].bins)
n = len(densities)
for ct, df in enumerate(densities):
_crps += sum([(df.cumulative(bin) - (1 if bin >= targets[ct] else 0)) ** 2 for bin in df.bins])
return _crps / float(l * n)
[docs]def get_point_statistics(data, model, **kwargs):
'''
Condensate all measures for point forecasters
:param data: test data
:param model: FTS model with point forecasting capability
:param kwargs:
:return: a list with the RMSE, SMAPE and U Statistic
'''
steps_ahead = kwargs.get('steps_ahead', 1)
kwargs['type'] = 'point'
indexer = kwargs.get('indexer', None)
if indexer is not None:
ndata = np.array(indexer.get_data(data))
elif model.is_multivariate:
if not isinstance(data, pd.DataFrame):
raise ValueError("Multivariate data must be a Pandas DataFrame!")
ndata = data
else:
ndata = np.array(data)
ret = list()
if steps_ahead == 1:
forecasts = model.predict(ndata, **kwargs)
if model.is_multivariate and model.has_seasonality:
ndata = model.indexer.get_data(ndata)
elif model.is_multivariate:
ndata = ndata[model.target_variable.data_label].values
if not isinstance(forecasts, (list, np.ndarray)):
forecasts = [forecasts]
if len(forecasts) != len(ndata) - model.max_lag:
forecasts = np.array(forecasts[:-1])
else:
forecasts = np.array(forecasts)
ret.append(np.round(rmse(ndata[model.max_lag:], forecasts), 2))
ret.append(np.round(mape(ndata[model.max_lag:], forecasts), 2))
ret.append(np.round(UStatistic(ndata[model.max_lag:], forecasts), 2))
else:
steps_ahead_sampler = kwargs.get('steps_ahead_sampler', 1)
nforecasts = []
for k in np.arange(model.order, len(ndata) - steps_ahead, steps_ahead_sampler):
sample = ndata[k - model.order: k]
tmp = model.predict(sample, **kwargs)
nforecasts.append(tmp[-1])
start = model.max_lag + steps_ahead - 1
ret.append(np.round(rmse(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
ret.append(np.round(mape(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
ret.append(np.round(UStatistic(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
return ret
[docs]def get_interval_statistics(data, model, **kwargs):
"""
Condensate all measures for point interval forecasters
:param data: test data
:param model: FTS model with interval forecasting capability
:param kwargs:
:return: a list with the sharpness, resolution, coverage, .05 pinball mean,
.25 pinball mean, .75 pinball mean and .95 pinball mean.
"""
steps_ahead = kwargs.get('steps_ahead', 1)
kwargs['type'] = 'interval'
ret = list()
if steps_ahead == 1:
forecasts = model.predict(data, **kwargs)
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
ret.append(round(coverage(data[model.order:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.05, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.25, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.75, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.95, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(winkler_mean(0.05, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(winkler_mean(0.25, data[model.max_lag:], forecasts[:-1]), 2))
else:
forecasts = []
for k in np.arange(model.order, len(data) - steps_ahead):
sample = data[k - model.order: k]
tmp = model.predict(sample, **kwargs)
forecasts.append(tmp[-1])
start = model.max_lag + steps_ahead - 1
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
ret.append(round(coverage(data[model.max_lag:], forecasts), 2))
ret.append(round(pinball_mean(0.05, data[start:], forecasts), 2))
ret.append(round(pinball_mean(0.25, data[start:], forecasts), 2))
ret.append(round(pinball_mean(0.75, data[start:], forecasts), 2))
ret.append(round(pinball_mean(0.95, data[start:], forecasts), 2))
ret.append(round(winkler_mean(0.05, data[start:], forecasts), 2))
ret.append(round(winkler_mean(0.25, data[start:], forecasts), 2))
return ret
[docs]def get_distribution_statistics(data, model, **kwargs):
"""
Get CRPS statistic and time for a forecasting model
:param data: test data
:param model: FTS model with probabilistic forecasting capability
:param kwargs:
:return: a list with the CRPS and execution time
"""
steps_ahead = kwargs.get('steps_ahead', 1)
kwargs['type'] = 'distribution'
ret = list()
if steps_ahead == 1:
_s1 = time.time()
forecasts = model.predict(data, **kwargs)
_e1 = time.time()
ret.append(round(crps(data[model.max_lag:], forecasts[:-1]), 3))
ret.append(round(_e1 - _s1, 3))
ret.append(round(brier_score(data[model.max_lag:], forecasts[:-1]), 3))
else:
skip = kwargs.get('steps_ahead_sampler', 1)
forecasts = []
_s1 = time.time()
for k in np.arange(model.max_lag, len(data) - steps_ahead, skip):
sample = data[k - model.max_lag: k]
tmp = model.predict(sample, **kwargs)
forecasts.append(tmp[-1])
_e1 = time.time()
start = model.max_lag + steps_ahead
ret.append(round(crps(data[start:-1:skip], forecasts), 3))
ret.append(round(_e1 - _s1, 3))
ret.append(round(brier_score(data[start:-1:skip], forecasts), 3))
return ret