Skip to content

BaseStreamGenerator

ADLStream.data.BaseStreamGenerator

Abstract Base Stream Generator

This is the base class for implementing stream generators with custom behavior.

Every StreamGenerator must have the properties below and implement preprocess with the signature (x, y) = preprocess(message).

Examples:

    class MinimalBaseGenerator(BaseStreamGenerator):

        def __init__(self, stream, class_index=-1, **kwargs):
            self.class_index = class_index
            super().__init__(stream, **kwargs)

        def preprocess(message):
            x = message
            y = x.pop(self.class_index)
            return x, y

Parameters:

Name Type Description Default
stream inherits [BaseStream]

Stream source to be feed to the ADLStream framework.

required
preprocessing_steps list of BasePreprocessor

List of operations to be perform sequentially over the input data (x). Defaults to [].

[]
max_instances int > 0

Max number of instance to generate. If None, it will continue generating new message until the stream stops. Defaults to None.

None
Source code in ADLStream/data/base_stream_generator.py
class BaseStreamGenerator(ABC):
    """Abstract Base Stream Generator

    This is the base class for implementing stream generators with custom behavior.

    Every `StreamGenerator` must have the properties below and implement `preprocess` with
    the signature `(x, y) = preprocess(message)`.

    Examples:
    ```python
        class MinimalBaseGenerator(BaseStreamGenerator):

            def __init__(self, stream, class_index=-1, **kwargs):
                self.class_index = class_index
                super().__init__(stream, **kwargs)

            def preprocess(message):
                x = message
                y = x.pop(self.class_index)
                return x, y
    ```

    Arguments:
        stream (inherits [BaseStream]):
            Stream source to be feed to the ADLStream framework.
        preprocessing_steps (list of BasePreprocessor):
            List of operations to be perform sequentially over the input data (x).
            Defaults to [].
        max_instances (int > 0): Max number of instance to generate.  If None, it will
            continue generating new message until the stream stops.
            Defaults to None.
    """

    def __init__(
        self,
        stream: Type["BaseStream"],
        preprocessing_steps: List[Type["BasePreprocessor"]] = [],
        max_instances: int = None,
    ) -> None:
        self.stream = stream
        self.num_messages = 0
        self.preprocessing_steps = preprocessing_steps
        self.max_messages = max_instances

    @property
    def num_messages(self) -> int:
        """Return number of messages processed from the stream.

        Returns:
            int: number of messages.
        """
        return self._num_messages

    @num_messages.setter
    def num_messages(self, value: int) -> None:
        self._num_messages = value

    def _check_number_instances(self):
        """Stop when it reaches the maximum number of messages.

        Raises:
            StopIteration:
        """
        if self.max_messages is not None:
            if self.num_messages >= self.max_messages:
                self.stream.stop()
                raise StopIteration

    def next(self, context: "ADLStreamContext") -> List[float]:
        """Get the next message from the stream.

        Args:
            context (ADLStreamContext): ADLStream shared object. Used for logging.

        Returns:
            List[float]: new message.
        """
        message = None
        try:
            self._check_number_instances()
            message = self.stream.next()
            self.num_messages += 1
        except StopIteration:
            context.log("INFO", "GENERATOR-PROCESS - Stream has finished")
            context.set_time_out()
        except Exception as e:
            context.log(
                "ERROR",
                "GENERATOR-PROCESS - Error getting messages from stream {}".format(
                    str(e)
                ),
            )
            context.set_time_out()
        return message

    @abstractmethod
    def preprocess(self, message: List[float]) -> Tuple[List[float], List[float]]:
        """The function that contains the logic to transform a stream message into
        model imput and target data `(x ,y)`.

        Both output, `x` or `y`, can be `None` what means it should not be added to
        the context.

        The target data `y` can be delayed. Although we are sending `x` and `y` at
        the same time, it does not mean that `y` is the corresponding target value
        of `x`. However, input data and target data should be in order: `y_i` is the
        target value of `x_i`. So the first target data sent (`y_0`) corresponds with
        the first input sent (`x_0`).

        Args:
            message (list): message received from the stream

        Raises:
            NotImplementedError: This is an abstract method which should be implemented.

        Returns:
            x (list): instance of model's input data.
            y (list): instance of model's target data.
        """
        raise NotImplementedError("Abstract method")

    def _perform_preprocessing_steps(self, x: List[float]) -> List[float]:
        """Apply the different preprocessors to a message.

        Args:
            x (List[float]): message.

        Returns:
            List[float]: updated message.
        """
        for preprocessor in self.preprocessing_steps:
            x = preprocessor.learn_one(x).transform_one(x)
        return x

    def run(self, context: "ADLStreamContext"):
        """The function that sends data to ADLStream framework

        It gets messages from the stream, preprocesses them and sends to the specific
        ADLStream context.

        Args:
            context (ADLStream.ADLStreamContext): context where to send the stream data
        """
        self.stream.start()
        message = self.next(context)
        while message is not None:
            (x, y) = self.preprocess(message)
            if x is not None or y is not None:
                x = self._perform_preprocessing_steps(x)
                context.add(x, y)
            message = self.next(context)

num_messages: int property writable

Return number of messages processed from the stream.

Returns:

Type Description
int

number of messages.

next(self, context)

Get the next message from the stream.

Parameters:

Name Type Description Default
context ADLStreamContext

ADLStream shared object. Used for logging.

required

Returns:

Type Description
List[float]

new message.

Source code in ADLStream/data/base_stream_generator.py
def next(self, context: "ADLStreamContext") -> List[float]:
    """Get the next message from the stream.

    Args:
        context (ADLStreamContext): ADLStream shared object. Used for logging.

    Returns:
        List[float]: new message.
    """
    message = None
    try:
        self._check_number_instances()
        message = self.stream.next()
        self.num_messages += 1
    except StopIteration:
        context.log("INFO", "GENERATOR-PROCESS - Stream has finished")
        context.set_time_out()
    except Exception as e:
        context.log(
            "ERROR",
            "GENERATOR-PROCESS - Error getting messages from stream {}".format(
                str(e)
            ),
        )
        context.set_time_out()
    return message

preprocess(self, message)

The function that contains the logic to transform a stream message into model imput and target data (x ,y).

Both output, x or y, can be None what means it should not be added to the context.

The target data y can be delayed. Although we are sending x and y at the same time, it does not mean that y is the corresponding target value of x. However, input data and target data should be in order: y_i is the target value of x_i. So the first target data sent (y_0) corresponds with the first input sent (x_0).

Parameters:

Name Type Description Default
message list

message received from the stream

required

Exceptions:

Type Description
NotImplementedError

This is an abstract method which should be implemented.

Returns:

Type Description
x (list)

instance of model's input data. y (list): instance of model's target data.

Source code in ADLStream/data/base_stream_generator.py
@abstractmethod
def preprocess(self, message: List[float]) -> Tuple[List[float], List[float]]:
    """The function that contains the logic to transform a stream message into
    model imput and target data `(x ,y)`.

    Both output, `x` or `y`, can be `None` what means it should not be added to
    the context.

    The target data `y` can be delayed. Although we are sending `x` and `y` at
    the same time, it does not mean that `y` is the corresponding target value
    of `x`. However, input data and target data should be in order: `y_i` is the
    target value of `x_i`. So the first target data sent (`y_0`) corresponds with
    the first input sent (`x_0`).

    Args:
        message (list): message received from the stream

    Raises:
        NotImplementedError: This is an abstract method which should be implemented.

    Returns:
        x (list): instance of model's input data.
        y (list): instance of model's target data.
    """
    raise NotImplementedError("Abstract method")

run(self, context)

The function that sends data to ADLStream framework

It gets messages from the stream, preprocesses them and sends to the specific ADLStream context.

Parameters:

Name Type Description Default
context ADLStream.ADLStreamContext

context where to send the stream data

required
Source code in ADLStream/data/base_stream_generator.py
def run(self, context: "ADLStreamContext"):
    """The function that sends data to ADLStream framework

    It gets messages from the stream, preprocesses them and sends to the specific
    ADLStream context.

    Args:
        context (ADLStream.ADLStreamContext): context where to send the stream data
    """
    self.stream.start()
    message = self.next(context)
    while message is not None:
        (x, y) = self.preprocess(message)
        if x is not None or y is not None:
            x = self._perform_preprocessing_steps(x)
            context.add(x, y)
        message = self.next(context)