# -*- coding: utf8 -*-
"""
pyFTS module for common benchmark metrics
"""
import time
import numpy as np
import pandas as pd
from pyFTS.common import FuzzySet, SortedCollection
from pyFTS.probabilistic import ProbabilityDistribution
[docs]def acf(data, k):
"""
Autocorrelation function estimative
:param data:
:param k:
:return:
"""
mu = np.nanmean(data)
sigma = np.var(data)
n = len(data)
s = 0
for t in np.arange(0, n - k):
s += (data[t] - mu) * (data[t + k] - mu)
return 1 / ((n - k) * sigma) * s
[docs]def rmse(targets, forecasts, order=0, offset=0):
"""
Root Mean Squared Error
:param targets: array of targets
:param forecasts: array of forecasts
:param order: model order
:param offset: forecast offset related to target.
:return:
"""
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
forecasts = np.array(forecasts)
if offset == 0:
return np.sqrt(np.nanmean((targets[order:] - forecasts[:]) ** 2))
else:
return np.sqrt(np.nanmean((targets[order+offset:] - forecasts[:-offset]) ** 2))
[docs]def nmrse(targets, forecasts, order=0, offset=0):
""" Normalized Root Mean Squared Error """
return rmse(targets, forecasts, order, offset) / (np.max(targets) - np.min(targets)) ## normalizing in targets because on forecasts might explode to inf (when model predict a line)
[docs]def rmse_interval(targets, forecasts):
"""
Root Mean Squared Error
:param targets:
:param forecasts:
:return:
"""
fmean = [np.mean(i) for i in forecasts]
return np.sqrt(np.nanmean((fmean - targets) ** 2))
[docs]def mape(targets, forecasts):
"""
Mean Average Percentual Error
:param targets:
:param forecasts:
:return:
"""
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
forecasts = np.array(forecasts)
return np.nanmean(np.abs(np.divide(np.subtract(targets, forecasts), targets))) * 100
[docs]def smape(targets, forecasts, type=2):
"""
Symmetric Mean Average Percentual Error
:param targets:
:param forecasts:
:param type:
:return:
"""
if isinstance(targets, list):
targets = np.array(targets)
if isinstance(forecasts, list):
forecasts = np.array(forecasts)
if type == 1:
return np.nanmean(np.abs(forecasts - targets) / ((forecasts + targets) / 2))
elif type == 2:
return np.nanmean(np.abs(forecasts - targets) / (np.abs(forecasts) + abs(targets))) * 100
else:
return np.nansum(np.abs(forecasts - targets)) / np.nansum(forecasts + targets)
[docs]def mape_interval(targets, forecasts):
fmean = [np.mean(i) for i in forecasts]
return np.mean(abs(fmean - targets) / fmean) * 100
[docs]def UStatistic(targets, forecasts):
"""
Theil's U Statistic
:param targets:
:param forecasts:
:return:
"""
if not isinstance(forecasts, (list, np.ndarray)):
forecasts = np.array([forecasts])
else:
forecasts = np.array(forecasts)
if not isinstance(targets, (list, np.ndarray)):
targets = np.array([targets])
else:
targets = np.array(targets)
l = forecasts.size
l = 2 if l == 1 else l
naive = []
y = []
for k in np.arange(0, l - 1):
y.append(np.subtract(forecasts[k], targets[k]) ** 2)
naive.append(np.subtract(targets[k + 1], targets[k]) ** 2)
return np.sqrt(np.divide(np.nansum(y), np.nansum(naive)))
[docs]def TheilsInequality(targets, forecasts):
"""
Theil’s Inequality Coefficient
:param targets:
:param forecasts:
:return:
"""
res = targets - forecasts
t = len(res)
us = np.sqrt(np.nansum([u ** 2 for u in res]))
ys = np.sqrt(np.nansum([y ** 2 for y in targets]))
fs = np.sqrt(np.nansum([f ** 2 for f in forecasts]))
return us / (ys + fs)
[docs]def sharpness(forecasts):
"""Sharpness - Mean size of the intervals"""
tmp = [i[1] - i[0] for i in forecasts]
return np.mean(tmp)
[docs]def resolution(forecasts):
"""Resolution - Standard deviation of the intervals"""
shp = sharpness(forecasts)
tmp = [abs((i[1] - i[0]) - shp) for i in forecasts]
return np.mean(tmp)
[docs]def coverage(targets, forecasts):
"""Percent of target values that fall inside forecasted interval"""
preds = []
for i in np.arange(0, len(forecasts)):
if targets[i] >= forecasts[i][0] and targets[i] <= forecasts[i][1]:
preds.append(1)
else:
preds.append(0)
return np.nanmean(preds)
[docs]def pinball(tau, target, forecast):
"""
Pinball loss function. Measure the distance of forecast to the tau-quantile of the target
:param tau: quantile value in the range (0,1)
:param target:
:param forecast:
:return: float, distance of forecast to the tau-quantile of the target
"""
if target >= forecast:
return np.subtract(target, forecast) * tau
else:
return np.subtract(forecast, target) * (1 - tau)
[docs]def pinball_mean(tau, targets, forecasts):
"""
Mean pinball loss value of the forecast for a given tau-quantile of the targets
:param tau: quantile value in the range (0,1)
:param targets: list of target values
:param forecasts: list of prediction intervals
:return: float, the pinball loss mean for tau quantile
"""
if tau <= 0.5:
preds = [pinball(tau, targets[i], forecasts[i][0]) for i in np.arange(0, len(forecasts))]
else:
preds = [pinball(tau, targets[i], forecasts[i][1]) for i in np.arange(0, len(forecasts))]
return np.nanmean(preds)
[docs]def winkler_score(tau, target, forecast):
"""
R. L. Winkler, A Decision-Theoretic Approach to Interval Estimation, J. Am. Stat. Assoc. 67 (337) (1972) 187–191. doi:10.2307/2284720.
:param tau:
:param target:
:param forecast:
:return:
"""
delta = forecast[1] - forecast[0]
if forecast[0] <= target <= forecast[1]:
return delta
elif forecast[0] > target:
return delta + (2 * (forecast[0] - target)) / tau
elif forecast[1] < target:
return delta + (2 * (target - forecast[1])) / tau
[docs]def winkler_mean(tau, targets, forecasts):
"""
Mean Winkler score value of the forecast for a given tau-quantile of the targets
:param tau: quantile value in the range (0,1)
:param targets: list of target values
:param forecasts: list of prediction intervals
:return: float, the Winkler score mean for tau quantile
"""
preds = [winkler_score(tau, targets[i], forecasts[i]) for i in np.arange(0, len(forecasts))]
return np.nanmean(preds)
[docs]def brier_score(targets, densities):
"""
Brier Score for probabilistic forecasts.
Brier (1950). "Verification of Forecasts Expressed in Terms of Probability". Monthly Weather Review. 78: 1–3.
:param targets: a list with the target values
:param densities: a list with pyFTS.probabil objectsistic.ProbabilityDistribution
:return: float
"""
if not isinstance(densities, list):
densities = [densities]
targets = [targets]
ret = []
for ct, d in enumerate(densities):
try:
v = d.bin_index.find_le(targets[ct])
score = np.nansum([d.density(k) ** 2 for k in d.bins if k != v])
score += (d.density(v) - 1) ** 2
ret.append(score)
except ValueError as ex:
ret.append(np.nansum([d.density(k) ** 2 for k in d.bins]))
return np.nansum(ret) / len(ret)
[docs]def logarithm_score(targets, densities):
"""
Logarithm Score for probabilistic forecasts.
Good IJ (1952). “Rational Decisions.”Journal of the Royal Statistical Society B,14(1),107–114. URLhttps://www.jstor.org/stable/2984087.
:param targets: a list with the target values
:param densities: a list with pyFTS.probabil objectsistic.ProbabilityDistribution
:return: float
"""
_ls = float(0.0)
if isinstance(densities, ProbabilityDistribution.ProbabilityDistribution):
densities = [densities]
targets = [targets]
n = len(densities)
for ct, df in enumerate(densities):
_ls += -np.log(df.density(targets[ct]))
return _ls / n
[docs]def crps(targets, densities):
"""
Continuous Ranked Probability Score
:param targets: a list with the target values
:param densities: a list with pyFTS.probabil objectsistic.ProbabilityDistribution
:return: float
"""
_crps = float(0.0)
if isinstance(densities, ProbabilityDistribution.ProbabilityDistribution):
densities = [densities]
targets = [targets]
n = len(densities)
if n == 0:
return np.nan
for ct, df in enumerate(densities):
_crps += np.nansum([(df.cumulative(bin) - (1 if bin >= targets[ct] else 0)) ** 2 for bin in df.bins])
return _crps / n
[docs]def get_point_statistics(data, model, **kwargs):
"""
Condensate all measures for point forecasters
:param data: test data
:param model: FTS model with point forecasting capability
:param kwargs:
:return: a list with the RMSE, SMAPE and U Statistic
"""
steps_ahead = kwargs.get('steps_ahead', 1)
kwargs['type'] = 'point'
indexer = kwargs.get('indexer', None)
if indexer is not None:
ndata = np.array(indexer.get_data(data))
elif model.is_multivariate:
if not isinstance(data, pd.DataFrame):
raise ValueError("Multivariate data must be a Pandas DataFrame!")
ndata = data
else:
ndata = np.array(data)
ret = list()
if steps_ahead == 1:
forecasts = model.predict(ndata, **kwargs)
if model.is_multivariate and model.has_seasonality:
ndata = model.indexer.get_data(ndata)
elif model.is_multivariate:
ndata = ndata[model.target_variable.data_label].values
if not isinstance(forecasts, (list, np.ndarray)):
forecasts = [forecasts]
if len(forecasts) != len(ndata) - model.max_lag:
forecasts = np.array(forecasts[:-1])
else:
forecasts = np.array(forecasts)
ret.append(np.round(rmse(ndata[model.max_lag:], forecasts), 2))
ret.append(np.round(mape(ndata[model.max_lag:], forecasts), 2))
ret.append(np.round(UStatistic(ndata[model.max_lag:], forecasts), 2))
else:
steps_ahead_sampler = kwargs.get('steps_ahead_sampler', 1)
nforecasts = []
for k in np.arange(model.order, len(ndata) - steps_ahead, steps_ahead_sampler):
sample = ndata[k - model.order: k]
tmp = model.predict(sample, **kwargs)
nforecasts.append(tmp[-1])
start = model.max_lag + steps_ahead - 1
ret.append(np.round(rmse(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
ret.append(np.round(mape(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
ret.append(np.round(UStatistic(ndata[start:-1:steps_ahead_sampler], nforecasts), 2))
return ret
[docs]def get_point_ahead_statistics(data, forecasts, **kwargs):
"""
Condensate all measures for point forecasters
:param data: test data
:param model: FTS model with point forecasting capability
:param kwargs:
:return: a list with the RMSE, SMAPE and U Statistic
"""
l = len(forecasts)
if len(data) != l:
raise Exception("Data and intervals have different lenghts!")
lags = {}
for lag in range(l):
ret = {}
datum = data[lag]
forecast = forecasts[lag]
ret['steps'] = lag
ret['method'] = ''
ret['rmse'] = rmse(datum, forecast)
ret['mape'] = mape(datum, forecast)
sample = data[lag-1:lag+1] if lag > 0 else [datum, datum]
ret['u'] = UStatistic(sample, forecast)
lags[lag] = ret
return lags
[docs]def get_interval_statistics(data, model, **kwargs):
"""
Condensate all measures for point interval forecasters
:param data: test data
:param model: FTS model with interval forecasting capability
:param kwargs:
:return: a list with the sharpness, resolution, coverage, .05 pinball mean,
.25 pinball mean, .75 pinball mean and .95 pinball mean.
"""
steps_ahead = kwargs.get('steps_ahead', 1)
kwargs['type'] = 'interval'
ret = list()
if steps_ahead == 1:
forecasts = model.predict(data, **kwargs)
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
if model.is_multivariate:
data = data[model.target_variable.data_label].values
ret.append(round(coverage(data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.05, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.25, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.75, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(pinball_mean(0.95, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(winkler_mean(0.05, data[model.max_lag:], forecasts[:-1]), 2))
ret.append(round(winkler_mean(0.25, data[model.max_lag:], forecasts[:-1]), 2))
else:
forecasts = []
for k in np.arange(model.order, len(data) - steps_ahead):
sample = data[k - model.order: k]
tmp = model.predict(sample, **kwargs)
forecasts.append(tmp[-1])
start = model.max_lag + steps_ahead - 1
ret.append(round(sharpness(forecasts), 2))
ret.append(round(resolution(forecasts), 2))
if model.is_multivariate:
data = data[model.target_variable.data_label].values
ret.append(round(coverage(data[model.max_lag:], forecasts), 2))
ret.append(round(pinball_mean(0.05, data[start:], forecasts), 2))
ret.append(round(pinball_mean(0.25, data[start:], forecasts), 2))
ret.append(round(pinball_mean(0.75, data[start:], forecasts), 2))
ret.append(round(pinball_mean(0.95, data[start:], forecasts), 2))
ret.append(round(winkler_mean(0.05, data[start:], forecasts), 2))
ret.append(round(winkler_mean(0.25, data[start:], forecasts), 2))
return ret
[docs]def get_interval_ahead_statistics(data, intervals, **kwargs):
"""
Condensate all measures for point interval forecasters
:param data: test data
:param intervals: predicted intervals for each datapoint
:param kwargs:
:return: a list with the sharpness, resolution, coverage, .05 pinball mean,
.25 pinball mean, .75 pinball mean and .95 pinball mean.
"""
l = len(intervals)
if len(data) != l:
raise Exception("Data and intervals have different lenghts!")
lags = {}
for lag in range(l):
ret = {}
datum = data[lag]
interval = intervals[lag]
ret['steps'] = lag
ret['method'] = ''
ret['sharpness'] = round(interval[1] - interval[0], 2)
ret['coverage'] = 1 if interval[0] <= datum <= interval[1] else 0
ret['pinball05'] = round(pinball(0.05, datum, interval[0]), 2)
ret['pinball25'] = round(pinball(0.25, datum, interval[0]), 2)
ret['pinball75'] = round(pinball(0.75, datum, interval[1]), 2)
ret['pinball95'] = round(pinball(0.95, datum, interval[1]), 2)
ret['winkler05'] = round(winkler_score(0.05, datum, interval), 2)
ret['winkler25'] = round(winkler_score(0.25, datum, interval), 2)
lags[lag] = ret
return lags
[docs]def get_distribution_statistics(data, model, **kwargs):
"""
Get CRPS statistic and time for a forecasting model
:param data: test data
:param model: FTS model with probabilistic forecasting capability
:param kwargs:
:return: a list with the CRPS and execution time
"""
steps_ahead = kwargs.get('steps_ahead', 1)
kwargs['type'] = 'distribution'
ret = list()
if steps_ahead == 1:
_s1 = time.time()
forecasts = model.predict(data, **kwargs)
_e1 = time.time()
if model.is_multivariate:
data = data[model.target_variable.data_label].values
ret.append(round(crps(data[model.max_lag:], forecasts[:-1]), 3))
ret.append(round(_e1 - _s1, 3))
ret.append(round(brier_score(data[model.max_lag:], forecasts[:-1]), 3))
else:
skip = kwargs.get('steps_ahead_sampler', 1)
forecasts = []
_s1 = time.time()
for k in np.arange(model.max_lag, len(data) - steps_ahead, skip):
sample = data[k - model.max_lag: k]
tmp = model.predict(sample, **kwargs)
forecasts.append(tmp[-1])
_e1 = time.time()
start = model.max_lag + steps_ahead
if model.is_multivariate:
data = data[model.target_variable.data_label].values
ret.append(round(crps(data[start:-1:skip], forecasts), 3))
ret.append(round(_e1 - _s1, 3))
ret.append(round(brier_score(data[start:-1:skip], forecasts), 3))
return ret
[docs]def get_distribution_ahead_statistics(data, distributions):
"""
Get CRPS statistic and time for a forecasting model
:param data: test data
:param model: FTS model with probabilistic forecasting capability
:param kwargs:
:return: a list with the CRPS and execution time
"""
l = len(distributions)
if len(data) != l:
raise Exception("Data and distributions have different lenghts!")
lags = {}
for lag in range(l):
ret = {}
datum = data[lag]
dist = distributions[lag]
ret['steps'] = lag
ret['method'] = ''
ret['crps'] = round(crps(datum, dist), 3)
ret['brier'] = round(brier_score(datum, dist), 3)
ret['log'] = round(logarithm_score(datum, dist), 3)
lags[lag] = ret
return lags