"""
This module contains the statistical forecasting initiation and management.
The base class for all forecastings is :py:class:`StatisticalForecast` ,
to generate forecasts one of the following subclasses has to be used
.. autosummary::
~StatisticalForecast
~DSHWForecast
~DayTypeForecast
"""
import os
import time
import numpy as np
from sys import platform as _platform
from datetime import date, datetime, timedelta
import multiprocessing
from multiprocessing.process import Process
import cPickle as pickle
import logging
from django.utils.timezone import utc
from server.settings import BASE_DIR, CYTHON_SUPPORT
""""try to import compiled holtwinters (double seasonal) extension by building it.
if this fails, the standard holtwinters is used. """
fast_hw = False
if (CYTHON_SUPPORT):
try:
from server.forecasting.statistical.build_extension import build_holtwinters_extension
try:
t0 = time.time()
build_holtwinters_extension() #compile and link
#if function takes less than 8 seconds, the module was probably already built before
fresh_build = time.time() - t0 > 8
from server.forecasting.statistical.holtwinters_fast import double_seasonal, multiplicative
fast_hw = True
if fresh_build:
print "cython extension built and imported"
except Exception as e:
print "error while building. ", e
print "check ecoControl.log"
except Exception as e:
print "cython probably not installed", e
if not fast_hw:
if (CYTHON_SUPPORT):
print "falling back to python holt-winters"
from server.forecasting.statistical.holt_winters import double_seasonal, multiplicative, additive
else:
#!TODO: additive is not accelerated (yet)
from server.forecasting.statistical.holt_winters import additive
logger = logging.getLogger('simulation')
[docs]class StatisticalForecast:
"""
This is the abstract class for statistical forecasting. Statistical forecasts use exponential smoothing methods
to forecast timeseries into the future. These methods rely on the right parameters to make a realistic forecast.
The parameters can be passed to the function, if they are omitted they are found by minimizing the MSE.
Forecastings are calculated after contruction of class and then whenever needed,
which is set by :attr:`forecast_update_interval`.
:param list input_data: list of consecutive values sampled in ``samples_per_hour``
:param datetime start: start date of input data
:param int samples_per_hour: Number of samples per hour in input_data
:param boolean try_cache: Read and Save forecasts to a cache on the file device to avoid unneccesary recomputation, is ``True`` by default.
:param \*\*kwargs: Any instance variable can be overwritten with a keyword from here, f.e. *input_weeks = 14*. Will be set before any data is processed. Only use, if you know what your doing.
:ivar int forecast_update_interval: The time in seconds for how long forecasts stay valid. When this interval is passed, a new forecast is calculated.
:ivar int input_weeks: only forecast with this many weeks of data, default are 12 weeks.
:ivar int output_weeks: the forecast will cover this many weeks of future data,default are 8 weeks.
:ivar [[],] demands: the processed demands, depending on subclass this contains one or more series
"""
def __init__(self, env, input_data, samples_per_hour=1, start=None,
try_cache=True, **kwargs):
self.calculated_parameters = []
self.samples_per_hour = samples_per_hour
self.env = env
self.try_cache = try_cache
self.forecast_update_interval = 24 * 60 * 60
#
self.input_weeks = 4
self.input_hours = self.input_weeks*24*7*self.samples_per_hour
self.output_weeks = 4
data_length = timedelta(hours=len(input_data) / samples_per_hour)
#only use days
data_length = timedelta(days=data_length.days)
if start == None:
start = datetime.utcfromtimestamp(self.env.now).replace(tzinfo=utc) - data_length
#always set to start of day
start = start.replace(hour=0,minute=0)
# ending time of input data
self.time_series_end = datetime.utcfromtimestamp(self.env.now).replace(tzinfo=utc)
# overwrite standard instance variables by passing in keyword arguments
for key in vars(self).keys():
if key in kwargs:
setattr(self,key,kwargs[key])
if len(input_data) * samples_per_hour < self.input_hours:
raise Exception("not enough values to forecast upon, please use another date or add more inputdata")
self.demands = self.process_inputdata(input_data[-self.input_hours:] ,samples_per_hour,start)
#forecast all demands.. might take long
self.forecasted_demands = self.forecast_demands()
[docs] def get_forecast_at(self, timestamp):
""" Return the forecast at the (unix) timestamp.
Raise a :py:exc:`IndexError` if there is no forecast for the timestamp."""
try:
return self.forecast_at(timestamp)[0]
except IndexError as e:
msg = " .. tried to access at a timepoint, for which no forecast is existant"
logger.error(str(e) + msg)
raise IndexError(str(e) + msg)
[docs] def forecast_demands(self):
"""Forecast and return the :attr:`demands`. The forecasting method depends on the subclass.
"""
raise NotImplementedError("This is an abstract class, please use a subclass")
[docs] def append_values(self, data, start_date=None):
""" Pushes in new values and cuts off values at the beginning to keep input length.
Will update the forecasts if needed.
"""
raise NotImplementedError("This is an abstract class, please use a subclass")
@classmethod
[docs] def make_hourly(cls, data, samples_per_hour):
""" aggregates data series to 1hourly data.
:param list data: the series
:param int samples_per_hour: number of samples per hour contained in `data`
:returns: list of 1hourly data
"""
def avg(hour):
sum = 0
for i in range(samples_per_hour):
sum += data[hour * samples_per_hour + i]
return sum / samples_per_hour
hours = len(data) / samples_per_hour
return [avg(i) for i in range(hours)]
@classmethod
[docs] def MASE(cls, training_series, testing_series, prediction_series):
"""
Computes the MEAN-ABSOLUTE SCALED ERROR forecast error for univariate time series prediction.
See `"Another look at measures of forecast accuracy" <http://robjhyndman.com/papers/another-look-at-measures-of-forecast-accuracy/>`_, Rob J Hyndman
:param list training_series: the series used to train the model
:param list testing_series: the test series to predict
:param list prediction_series: the prediction of testing_series (same size as testing_series)
"""
training_series = np.array(training_series)
testing_series = np.array(testing_series)
prediction_series = np.array(prediction_series)
n = training_series.shape[0]
d = np.abs(np.diff(training_series)).sum() / (n - 1)
errors = np.abs(testing_series - prediction_series)
return errors.mean() / d
[docs] def update_if_needed(self):
"""``Internal Method``
Update the forecasts, if `env.now` is more than `forecast_update_interval` seconds ahead from last demand measurement"""
now = datetime.utcfromtimestamp(self.env.now).replace(tzinfo=utc)
delta = (now - self.time_series_end).total_seconds()
if delta > self.forecast_update_interval:
self.forecasted_demands = self.forecast_demands()
self.time_series_end = datetime.utcfromtimestamp(self.env.now).replace(tzinfo=utc)
[docs] def read_from_cache(self):
""" ``Internal Method``
Return a cached result. If *try_cache* = ``True``, this will try to read from cache/cached_forecasts.cache.
The results are only returned, if the timestamp of the forecast creation is not older than 24h.
Else or if no cached file is available, ``None`` is returned.
:returns: ``list`` or `None`
"""
if self.try_cache:
try:
cached = pickle.load(open( os.path.join(BASE_DIR,"cache", "cached_forecasts.cache"), "rb" ))
diff_time = datetime.utcfromtimestamp(cached["date"]).replace(tzinfo=utc) - self.time_series_end
if diff_time.total_seconds() < 24 * 60 * 60: #24 hours epsilon
forecasted_demands = cached["forecasts"]
self.calculated_parameters = cached["parameters"]
logger.info("read forecasts from cache")
return forecasted_demands
except IOError as e:
logger.info(str(e) + " .. creating new cache file")
return None
[docs]class DSHWForecast(StatisticalForecast):
""" This forecast uses the double seasonal exponential smoothing method. It often delivers better results
than the :class:`DayTypeForecast`. """
def forecast_at(self, timestamp):
date = datetime.utcfromtimestamp(timestamp).replace(tzinfo=utc)
delta = (date - self.time_series_end).total_seconds()
return [self.forecasted_demands[int(delta / 3600 * self.samples_per_hour)]]
[docs] def forecast_demands(self,verbose=False):
""" See :meth:`StatisticalForecast.forecast_demands`."""
logger.debug("forecasting demands with double seasonal HW..")
cached = StatisticalForecast.read_from_cache(self)
if cached != None:
return cached
demand = self.demands[0] #dshw demands only contains one dataset
sph = self.samples_per_hour
fc = self.output_weeks * 24 * 7 * self.samples_per_hour #forecast_length
(alpha, beta, gamma, delta, autocorr) = (None for i in range(5))
t = time.time()
forecast_values, (alpha, beta, gamma, delta, autocorrelation),in_sample = double_seasonal(demand, m=int(24*sph), m2=int(24*7*sph),
forecast=int(fc), alpha=alpha, beta=beta, gamma=gamma, delta=delta,
autocorrelation=autocorr)
if verbose:
mse = ((in_sample - forecast_values) ** 2).mean(axis=None)
calculated_parameters = {
"alpha": alpha, "beta": beta, "gamma": gamma, "delta":delta,
"autocorrelation":autocorrelation, "mse": mse}
print "use auto HW ",calculated_parameters
logger.debug("doubleseasonal completed in:" + str(time.time()-t) + " s")
return forecast_values
[docs] def append_values(self, data, start_date=None):
"""See :meth:`StatisticalForecast.append_values`"""
demand = self.demands[0]
demand += data
if len(demand) > self.input_hours:
#only keep number of input_weeks
start_index = len(demand) - self.input_hours
self.demands[0] = demand[start_index:]
self.update_if_needed()
[docs]class DayTypeForecast(StatisticalForecast):
"""This forecast splits the demands into 7 weekdays and minimizes and forecasts each with the
holt-winters multiplicative method."""
@classmethod
def split_weekdata(cls, data, samples_per_hour, start_date=None):
"""Splits the data into 7 lists, one for each weekday.
:param datetime start_date: the weekday which of the first day of input data. Default = 0
:returns: [mon_series, tue_series, wen_series,..]"""
if start_date != None:
weekday = start_date.weekday()
else:
weekday = 0
split_array = [[] for i in range(7)]
for index, element in enumerate(data):
if index % (24 * samples_per_hour) == 0:
weekday = (weekday + 1) % 7
split_array[weekday].append(element)
return split_array
def forecast_multiplicative(self, demand, index, result_dict, verbose=False):
""" `Internal Method`. Forecasts one series and stores into result_dict"""
#seasonality length -- one day
m = 24 * self.samples_per_hour
fc = self.output_weeks * 24 * self.samples_per_hour #forecast_length
# alpha, beta, gamma. holt.winters determines them automatically
# cost-expensive, so only do this once..
(alpha, beta, gamma) = (None, None, None)
logger.debug("find holt winter parameters for day: " + str(index))
# find values automatically
forecast_values, (alpha, beta, gamma),in_sample = multiplicative(demand, m, fc)
calculated_parameters = {
"alpha": alpha, "beta": beta, "gamma": gamma, "mse": in_sample}
if verbose:
print "use auto HW ",calculated_parameters
result_dict[index] = (forecast_values, calculated_parameters, index)
[docs] def forecast_demands(self):
""" See :meth:`StatisticalForecast.forecast_demands`.
This method uses processes, to speed up the calculation.
This drives the cpu to full load for a short time."""
cached = StatisticalForecast.read_from_cache(self)
if cached != None:
return cached
split_results = [[] for i in range(7)]
#multi processed forecasting
## WARNING: hangups:
#v.1 : pool.map
#v.2 use multiple processes instead of pool.map to circumvent a hangup caused
# by a multiprocessing/django incompabatility
#share results in a multiaccess queue
#note: this queue can not hold unlimited elements and will hang up with no warning if there are too many elements
#12 weeks 1hourly will work, 20 not
#result_queue = multiprocessing.Queue()
#v.3 now uses a shared multiprocessing dict, to circumvent hang up problems with queue on windows
mgr = multiprocessing.Manager()
dict_threadsafe = mgr.dict()
logger.debug("forecasting demands with daytype strategy..")
#call class as Functor because class methods are not pickeable
jobs = [Process(target=self, args=(demand,index,dict_threadsafe)) for index, demand in enumerate(self.demands)]
for job in jobs: job.start()
for job in jobs: job.join()
for index in dict_threadsafe.keys():
split_results[index] = dict_threadsafe[index]
forecasted_demands = []
self.calculated_parameters = []
for fc_tuple in split_results:
forecasted_demands.append(list(fc_tuple[0]))
self.calculated_parameters.append(fc_tuple[1])
#cache forecasts
pickle.dump( {"forecasts" :forecasted_demands, "parameters" : self.calculated_parameters, "date": self.env.now },
open(os.path.join(BASE_DIR,"cache", "cached_forecasts.cache"), "wb" ) )
logger.debug("forecasting completed")
return forecasted_demands
def forecast_at(self, timestamp):
""" `Internal Method`. Returns the forecast at the timestamp.
The timestamps weekday is used to get the right forecasting
:returns: (forecast value, index of week, index of hour)
"""
date = datetime.utcfromtimestamp(timestamp).replace(tzinfo=utc)
delta = (date - self.time_series_end).total_seconds()
arr_index = int((delta / (60.0 * 60.0)) * self.samples_per_hour)
week_index = int(arr_index / (7 * 24))
hour_index = arr_index % 24
return (self.forecasted_demands[date.weekday()][week_index * 24 + hour_index], week_index, hour_index)
[docs] def append_values(self, data, start_date=None):
"""See :meth:`StatisticalForecast.append_values`"""
new_demands = DayTypeForecast.split_weekdata(data, self.samples_per_hour, start_date)
for index, demand in enumerate(new_demands):
self.demands[index] += demand
if len(self.demands[index]) > self.input_hours / 7:
#only keep number of input_weeks
start_index = len(self.demands[index]) - self.input_hours / 7
self.demands[index] = self.demands[index][start_index:]
self.update_if_needed()
# callable class
def __call__(self, demand, index, result_dict):
self.forecast_multiplicative(demand, index, result_dict)