Skip to content

MovingWindowStreamGenerator

ADLStream.data.MovingWindowStreamGenerator

Moving window stream generator.

This class performs a moving-window preprocessing method for time series forecasting problems.

The main logic can be found in the prepocess(self, message) function.

Parameters:

Name Type Description Default
stream inherits ADLStream.data.stream.BaseStream

Stream source to be feed to the ADLStream framework.

required
past_history int

The width (number of time steps) of the input window (x).

required
forecasting_horizon int

The width (number of time steps) of the label window (y).

required
shift int >=1

The time offset between input and label windows. Defaults to 1.

1
input_idx int or list

The index/indices of the input feature/s. If None, every feature is considered as input feature. Defaults to None.

None
target_idx int or list

The index/indices of the target feature/s. If None, every feature is considered as target feature. Defaults to None.

None
Source code in ADLStream/data/moving_window_generator.py
class MovingWindowStreamGenerator(BaseStreamGenerator):
    """Moving window stream generator.

    This class performs a moving-window preprocessing method for time series forecasting
    problems.

    The main logic can be found in the `prepocess(self, message)` function.

    Arguments:
        stream (inherits ADLStream.data.stream.BaseStream):
            Stream source to be feed to the ADLStream framework.
        past_history (int): The width (number of time steps) of the input window (`x`).
        forecasting_horizon (int):
            The width (number of time steps) of the label window (`y`).
        shift (int >=1, optional): The time offset between input and label windows.
            Defaults to 1.
        input_idx (int or list, optional): The index/indices of the input feature/s.
            If None, every feature is considered as input feature. Defaults to None.
        target_idx (int or list, optional): The index/indices of the target feature/s.
            If None, every feature is considered as target feature. Defaults to None.
    """

    def __init__(
        self,
        stream: "BaseStream",
        past_history: int,
        forecasting_horizon: int,
        shift: int = 1,
        input_idx: Optional[Union[int, list]] = None,
        target_idx: Optional[Union[int, list]] = None,
        **kwargs
    ) -> None:
        super().__init__(stream, **kwargs)
        self.past_history = past_history
        self.forecasting_horizon = forecasting_horizon
        self.shift = shift
        self.input_idx = input_idx
        self.target_idx = target_idx

        self.x_window = []
        self.y_window = []

    def _select_features(
        self, message: List[float], idx: Optional[Union[int, List[int]]]
    ) -> List[float]:
        """Gets features indicated by `idx` from the `message`.

        Args:
            message (List[float]): stream message.
            idx (Optional[Union[int,List[int]]]): features indices.

        Returns:
            List[float]: selected features from the stream message.
        """
        res = None
        if isinstance(idx, int):
            res = [message[idx]]
        elif isinstance(idx, list):
            res = [message[i] for i in idx]
        else:
            res = message
        return res

    def _get_x(self, message: List[float]) -> List[float]:
        """Get input features from the message.

        Args:
            message (List[float]): stream message.

        Returns:
            List[float]: Input instance (`x`).
        """
        return self._select_features(message, self.input_idx)

    def _get_y(self, message: List[float]) -> List[float]:
        """Get output features from the message.

        Args:
            message (List[float]): stream message.

        Returns:
            List[float]: Expected output (`y`).
        """
        return self._select_features(message, self.target_idx)

    def preprocess(self, message: List[float]) -> Tuple[List[float], List[float]]:
        """Apply the moving window to the stream data for time series forecasting
        problems.

        Args:
            message (List[float]): stream message.

        Returns:
            Tuple[Optional[List[float]],Optional[List[float]]]: (`x`, `y`) input and output instances.
        """
        x, y = None, None
        self.x_window.append(self._get_x(message))
        if self.num_messages >= self.past_history + self.shift:
            self.y_window.append(self._get_y(message))

        if len(self.x_window) > self.past_history:
            self.x_window.pop(0)
            x = self.x_window

        if len(self.y_window) > self.forecasting_horizon:
            self.y_window.pop(0)
            y = self.y_window

        return x, y

num_messages: int inherited property writable

Return number of messages processed from the stream.

Returns:

Type Description
int

number of messages.

next(self, context) inherited

Get the next message from the stream.

Parameters:

Name Type Description Default
context ADLStreamContext

ADLStream shared object. Used for logging.

required

Returns:

Type Description
List[float]

new message.

Source code in ADLStream/data/moving_window_generator.py
def next(self, context: "ADLStreamContext") -> List[float]:
    """Get the next message from the stream.

    Args:
        context (ADLStreamContext): ADLStream shared object. Used for logging.

    Returns:
        List[float]: new message.
    """
    message = None
    try:
        self._check_number_instances()
        message = self.stream.next()
        self.num_messages += 1
    except StopIteration:
        context.log("INFO", "GENERATOR-PROCESS - Stream has finished")
        context.set_time_out()
    except Exception as e:
        context.log(
            "ERROR",
            "GENERATOR-PROCESS - Error getting messages from stream {}".format(
                str(e)
            ),
        )
        context.set_time_out()
    return message

preprocess(self, message)

Apply the moving window to the stream data for time series forecasting problems.

Parameters:

Name Type Description Default
message List[float]

stream message.

required

Returns:

Type Description
Tuple[Optional[List[float]],Optional[List[float]]]

(x, y) input and output instances.

Source code in ADLStream/data/moving_window_generator.py
def preprocess(self, message: List[float]) -> Tuple[List[float], List[float]]:
    """Apply the moving window to the stream data for time series forecasting
    problems.

    Args:
        message (List[float]): stream message.

    Returns:
        Tuple[Optional[List[float]],Optional[List[float]]]: (`x`, `y`) input and output instances.
    """
    x, y = None, None
    self.x_window.append(self._get_x(message))
    if self.num_messages >= self.past_history + self.shift:
        self.y_window.append(self._get_y(message))

    if len(self.x_window) > self.past_history:
        self.x_window.pop(0)
        x = self.x_window

    if len(self.y_window) > self.forecasting_horizon:
        self.y_window.pop(0)
        y = self.y_window

    return x, y

run(self, context) inherited

The function that sends data to ADLStream framework

It gets messages from the stream, preprocesses them and sends to the specific ADLStream context.

Parameters:

Name Type Description Default
context ADLStream.ADLStreamContext

context where to send the stream data

required
Source code in ADLStream/data/moving_window_generator.py
def run(self, context: "ADLStreamContext"):
    """The function that sends data to ADLStream framework

    It gets messages from the stream, preprocesses them and sends to the specific
    ADLStream context.

    Args:
        context (ADLStream.ADLStreamContext): context where to send the stream data
    """
    self.stream.start()
    message = self.next(context)
    while message is not None:
        (x, y) = self.preprocess(message)
        if x is not None or y is not None:
            x = self._perform_preprocessing_steps(x)
            context.add(x, y)
        message = self.next(context)