ADLStream.data.MovingWindowStreamGenerator
Moving window stream generator.
This class performs a moving-window preprocessing method for time series forecasting problems.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
inherits ADLStream.data.stream.BaseStream |
Stream source to be feed to the ADLStream framework. |
required |
past_history |
int |
The width (number of time steps) of the input window ( |
required |
forecasting_horizon |
int |
The width (number of time steps) of the label window ( |
required |
shift |
int >=1 |
The time offset between input and label windows. Defaults to 1. |
1 |
input_idx |
int or list |
The index/indices of the input feature/s. If None, every feature is considered as input feature. Defaults to None. |
None |
target_idx |
int or list |
The index/indices of the target feature/s. If None, every feature is considered as target feature. Defaults to None. |
None |
Source code in ADLStream/data/moving_window_generator.py
class MovingWindowStreamGenerator(BaseStreamGenerator):
"""Moving window stream generator.
This class performs a moving-window preprocessing method for time series forecasting
problems.
Arguments:
stream (inherits ADLStream.data.stream.BaseStream):
Stream source to be feed to the ADLStream framework.
past_history (int): The width (number of time steps) of the input window (`x`).
forecasting_horizon (int):
The width (number of time steps) of the label window (`y`).
shift (int >=1, optional): The time offset between input and label windows.
Defaults to 1.
input_idx (int or list, optional): The index/indices of the input feature/s.
If None, every feature is considered as input feature. Defaults to None.
target_idx (int or list, optional): The index/indices of the target feature/s.
If None, every feature is considered as target feature. Defaults to None.
"""
def __init__(
self,
stream,
past_history,
forecasting_horizon,
shift=1,
input_idx=None,
target_idx=None,
**kwargs
):
super().__init__(stream, **kwargs)
self.past_history = past_history
self.forecasting_horizon = forecasting_horizon
self.shift = shift
self.input_idx = input_idx
self.target_idx = target_idx
self.x_window = []
self.y_window = []
def _select_features(self, message, idx):
res = None
if isinstance(idx, int):
res = [message[idx]]
elif isinstance(idx, list):
res = [message[i] for i in idx]
else:
res = message
return res
def _get_x(self, message):
return self._select_features(message, self.input_idx)
def _get_y(self, message):
return self._select_features(message, self.target_idx)
def preprocess(self, message):
x, y = None, None
self.x_window.append(self._get_x(message))
if self.num_messages >= self.past_history + self.shift:
self.y_window.append(self._get_y(message))
if len(self.x_window) > self.past_history:
self.x_window.pop(0)
x = self.x_window
if len(self.y_window) > self.forecasting_horizon:
self.y_window.pop(0)
y = self.y_window
return x, y
preprocess(self, message)
The function that contains the logic to transform a stream message into
model imput and target data (x ,y)
.
Both output, x
or y
, can be None
what means it should not be added to
the context.
The target data y
can be delayed. Although we are sending x
and y
at
the same time, it does not mean that y
is the corresponding target value
of x
. However, input data and target data should be in order: y_i
is the
target value of x_i
. So the first target data sent (y_0
) corresponds with
the first input sent (x_0
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
list |
message received from the stream |
required |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
This is an abstract method which should be implemented. |
Returns:
Type | Description |
---|---|
x (list) |
instance of model's input data. y (list): instance of model's target data. |
Source code in ADLStream/data/moving_window_generator.py
def preprocess(self, message):
x, y = None, None
self.x_window.append(self._get_x(message))
if self.num_messages >= self.past_history + self.shift:
self.y_window.append(self._get_y(message))
if len(self.x_window) > self.past_history:
self.x_window.pop(0)
x = self.x_window
if len(self.y_window) > self.forecasting_horizon:
self.y_window.pop(0)
y = self.y_window
return x, y