MovingWindowStreamGenerator
ADLStream.data.MovingWindowStreamGenerator
Moving window stream generator.
This class performs a moving-window preprocessing method for time series forecasting problems.
The main logic can be found in the prepocess(self, message)
function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
inherits ADLStream.data.stream.BaseStream |
Stream source to be feed to the ADLStream framework. |
required |
past_history |
int |
The width (number of time steps) of the input window ( |
required |
forecasting_horizon |
int |
The width (number of time steps) of the label window ( |
required |
shift |
int >=1 |
The time offset between input and label windows. Defaults to 1. |
1 |
input_idx |
int or list |
The index/indices of the input feature/s. If None, every feature is considered as input feature. Defaults to None. |
None |
target_idx |
int or list |
The index/indices of the target feature/s. If None, every feature is considered as target feature. Defaults to None. |
None |
Source code in ADLStream/data/moving_window_generator.py
class MovingWindowStreamGenerator(BaseStreamGenerator):
"""Moving window stream generator.
This class performs a moving-window preprocessing method for time series forecasting
problems.
The main logic can be found in the `prepocess(self, message)` function.
Arguments:
stream (inherits ADLStream.data.stream.BaseStream):
Stream source to be feed to the ADLStream framework.
past_history (int): The width (number of time steps) of the input window (`x`).
forecasting_horizon (int):
The width (number of time steps) of the label window (`y`).
shift (int >=1, optional): The time offset between input and label windows.
Defaults to 1.
input_idx (int or list, optional): The index/indices of the input feature/s.
If None, every feature is considered as input feature. Defaults to None.
target_idx (int or list, optional): The index/indices of the target feature/s.
If None, every feature is considered as target feature. Defaults to None.
"""
def __init__(
self,
stream: "BaseStream",
past_history: int,
forecasting_horizon: int,
shift: int = 1,
input_idx: Optional[Union[int, list]] = None,
target_idx: Optional[Union[int, list]] = None,
**kwargs
) -> None:
super().__init__(stream, **kwargs)
self.past_history = past_history
self.forecasting_horizon = forecasting_horizon
self.shift = shift
self.input_idx = input_idx
self.target_idx = target_idx
self.x_window = []
self.y_window = []
def _select_features(
self, message: List[float], idx: Optional[Union[int, List[int]]]
) -> List[float]:
"""Gets features indicated by `idx` from the `message`.
Args:
message (List[float]): stream message.
idx (Optional[Union[int,List[int]]]): features indices.
Returns:
List[float]: selected features from the stream message.
"""
res = None
if isinstance(idx, int):
res = [message[idx]]
elif isinstance(idx, list):
res = [message[i] for i in idx]
else:
res = message
return res
def _get_x(self, message: List[float]) -> List[float]:
"""Get input features from the message.
Args:
message (List[float]): stream message.
Returns:
List[float]: Input instance (`x`).
"""
return self._select_features(message, self.input_idx)
def _get_y(self, message: List[float]) -> List[float]:
"""Get output features from the message.
Args:
message (List[float]): stream message.
Returns:
List[float]: Expected output (`y`).
"""
return self._select_features(message, self.target_idx)
def preprocess(self, message: List[float]) -> Tuple[List[float], List[float]]:
"""Apply the moving window to the stream data for time series forecasting
problems.
Args:
message (List[float]): stream message.
Returns:
Tuple[Optional[List[float]],Optional[List[float]]]: (`x`, `y`) input and output instances.
"""
x, y = None, None
self.x_window.append(self._get_x(message))
if self.num_messages >= self.past_history + self.shift:
self.y_window.append(self._get_y(message))
if len(self.x_window) > self.past_history:
self.x_window.pop(0)
x = self.x_window
if len(self.y_window) > self.forecasting_horizon:
self.y_window.pop(0)
y = self.y_window
return x, y
num_messages: int
inherited
property
writable
¶
Return number of messages processed from the stream.
Returns:
Type | Description |
---|---|
int |
number of messages. |
next(self, context)
inherited
¶
Get the next message from the stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStreamContext |
ADLStream shared object. Used for logging. |
required |
Returns:
Type | Description |
---|---|
List[float] |
new message. |
Source code in ADLStream/data/moving_window_generator.py
def next(self, context: "ADLStreamContext") -> List[float]:
"""Get the next message from the stream.
Args:
context (ADLStreamContext): ADLStream shared object. Used for logging.
Returns:
List[float]: new message.
"""
message = None
try:
self._check_number_instances()
message = self.stream.next()
self.num_messages += 1
except StopIteration:
context.log("INFO", "GENERATOR-PROCESS - Stream has finished")
context.set_time_out()
except Exception as e:
context.log(
"ERROR",
"GENERATOR-PROCESS - Error getting messages from stream {}".format(
str(e)
),
)
context.set_time_out()
return message
preprocess(self, message)
¶
Apply the moving window to the stream data for time series forecasting problems.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
List[float] |
stream message. |
required |
Returns:
Type | Description |
---|---|
Tuple[Optional[List[float]],Optional[List[float]]] |
( |
Source code in ADLStream/data/moving_window_generator.py
def preprocess(self, message: List[float]) -> Tuple[List[float], List[float]]:
"""Apply the moving window to the stream data for time series forecasting
problems.
Args:
message (List[float]): stream message.
Returns:
Tuple[Optional[List[float]],Optional[List[float]]]: (`x`, `y`) input and output instances.
"""
x, y = None, None
self.x_window.append(self._get_x(message))
if self.num_messages >= self.past_history + self.shift:
self.y_window.append(self._get_y(message))
if len(self.x_window) > self.past_history:
self.x_window.pop(0)
x = self.x_window
if len(self.y_window) > self.forecasting_horizon:
self.y_window.pop(0)
y = self.y_window
return x, y
run(self, context)
inherited
¶
The function that sends data to ADLStream framework
It gets messages from the stream, preprocesses them and sends to the specific ADLStream context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStream.ADLStreamContext |
context where to send the stream data |
required |
Source code in ADLStream/data/moving_window_generator.py
def run(self, context: "ADLStreamContext"):
"""The function that sends data to ADLStream framework
It gets messages from the stream, preprocesses them and sends to the specific
ADLStream context.
Args:
context (ADLStream.ADLStreamContext): context where to send the stream data
"""
self.stream.start()
message = self.next(context)
while message is not None:
(x, y) = self.preprocess(message)
if x is not None or y is not None:
x = self._perform_preprocessing_steps(x)
context.add(x, y)
message = self.next(context)