Skip to content

ADLStream.data.MovingWindowStreamGenerator

Moving window stream generator.

This class performs a moving-window preprocessing method for time series forecasting problems.

Parameters:

Name Type Description Default
stream inherits ADLStream.data.stream.BaseStream

Stream source to be feed to the ADLStream framework.

required
past_history int

The width (number of time steps) of the input window (x).

required
forecasting_horizon int

The width (number of time steps) of the label window (y).

required
shift int >=1

The time offset between input and label windows. Defaults to 1.

1
input_idx int or list

The index/indices of the input feature/s. If None, every feature is considered as input feature. Defaults to None.

None
target_idx int or list

The index/indices of the target feature/s. If None, every feature is considered as target feature. Defaults to None.

None
Source code in ADLStream/data/moving_window_generator.py
class MovingWindowStreamGenerator(BaseStreamGenerator):
    """Moving window stream generator.

    This class performs a moving-window preprocessing method for time series forecasting
    problems.

    Arguments:
        stream (inherits ADLStream.data.stream.BaseStream):
            Stream source to be feed to the ADLStream framework.
        past_history (int): The width (number of time steps) of the input window (`x`).
        forecasting_horizon (int):
            The width (number of time steps) of the label window (`y`).
        shift (int >=1, optional): The time offset between input and label windows.
            Defaults to 1.
        input_idx (int or list, optional): The index/indices of the input feature/s.
            If None, every feature is considered as input feature. Defaults to None.
        target_idx (int or list, optional): The index/indices of the target feature/s.
            If None, every feature is considered as target feature. Defaults to None.
    """

    def __init__(
        self,
        stream,
        past_history,
        forecasting_horizon,
        shift=1,
        input_idx=None,
        target_idx=None,
        **kwargs
    ):
        super().__init__(stream, **kwargs)
        self.past_history = past_history
        self.forecasting_horizon = forecasting_horizon
        self.shift = shift
        self.input_idx = input_idx
        self.target_idx = target_idx

        self.x_window = []
        self.y_window = []

    def _select_features(self, message, idx):
        res = None
        if isinstance(idx, int):
            res = [message[idx]]
        elif isinstance(idx, list):
            res = [message[i] for i in idx]
        else:
            res = message
        return res

    def _get_x(self, message):
        return self._select_features(message, self.input_idx)

    def _get_y(self, message):
        return self._select_features(message, self.target_idx)

    def preprocess(self, message):
        x, y = None, None
        self.x_window.append(self._get_x(message))
        if self.num_messages >= self.past_history + self.shift:
            self.y_window.append(self._get_y(message))

        if len(self.x_window) > self.past_history:
            self.x_window.pop(0)
            x = self.x_window

        if len(self.y_window) > self.forecasting_horizon:
            self.y_window.pop(0)
            y = self.y_window

        return x, y

preprocess(self, message)

The function that contains the logic to transform a stream message into model imput and target data (x ,y).

Both output, x or y, can be None what means it should not be added to the context.

The target data y can be delayed. Although we are sending x and y at the same time, it does not mean that y is the corresponding target value of x. However, input data and target data should be in order: y_i is the target value of x_i. So the first target data sent (y_0) corresponds with the first input sent (x_0).

Parameters:

Name Type Description Default
message list

message received from the stream

required

Exceptions:

Type Description
NotImplementedError

This is an abstract method which should be implemented.

Returns:

Type Description
x (list)

instance of model's input data. y (list): instance of model's target data.

Source code in ADLStream/data/moving_window_generator.py
def preprocess(self, message):
    x, y = None, None
    self.x_window.append(self._get_x(message))
    if self.num_messages >= self.past_history + self.shift:
        self.y_window.append(self._get_y(message))

    if len(self.x_window) > self.past_history:
        self.x_window.pop(0)
        x = self.x_window

    if len(self.y_window) > self.forecasting_horizon:
        self.y_window.pop(0)
        y = self.y_window

    return x, y