Skip to content

BaseStream

ADLStream.data.stream.BaseStream

Abstract Base Stream.

This is the base class for implementing streams from custom sources.

Every Stream must have the properties below and implement get_message, which should return a stream message as an array of any dimensions. Other methods such as start or stop may be overrided if additional logic is needed.

Examples:

    class RandomStream(BaseStream):
        def __init__(self, seed=1, **kwargs):
            self.seed = seed
            self.super().__init__(**kwargs)

        def start(self):
            random.seed(self.seed)
            self.super().start()

        def get_message(self):
            message = [random.random()]
            return message

    stream = RandomStream()
    stream.start()
    message = stream.next()

Parameters:

Name Type Description Default
stream_period int >=0

Stream time period in milliseconds. It is the minimun time between consecutive messages in the stream. If it is 0, when a message is required (next), it is sent as soon as possible. Defaults to 0.

0
timeout int >0

Stream time out in milliseconds. It is the maximun time to wait for the next message. If it takes longer, StopIteration exception is raised. Defaults to 30000.

30000
Source code in ADLStream/data/stream/base_stream.py
class BaseStream(ABC):
    """Abstract Base Stream.

    This is the base class for implementing streams from custom sources.

    Every `Stream` must have the properties below and implement `get_message`, which
    should return a stream message as an array of any dimensions. Other methods such as
    `start` or `stop` may be overrided if additional logic is needed.

    Examples:
    ```python
        class RandomStream(BaseStream):
            def __init__(self, seed=1, **kwargs):
                self.seed = seed
                self.super().__init__(**kwargs)

            def start(self):
                random.seed(self.seed)
                self.super().start()

            def get_message(self):
                message = [random.random()]
                return message

        stream = RandomStream()
        stream.start()
        message = stream.next()

    ```

    Arguments:
        stream_period (int >=0, optional): Stream time period in milliseconds.
            It is the minimun time between consecutive messages in the stream. If it
            is 0, when a message is required (`next`), it is sent as soon as possible.
            Defaults to 0.
        timeout (int >0, optional): Stream time out in milliseconds.
            It is the maximun time to wait for the next message. If it takes longer,
            `StopIteration` exception is raised.
            Defaults to 30000.
    """

    def __init__(self, stream_period=0, timeout=30000):
        self.stream_period = stream_period
        self.timeout = timeout
        self.last_message_time = None

    def start(self):
        """Function to be called before asking any message."""
        self.last_message_time = time.time()

    def stop(self):
        """Function to be called when stream is finished."""
        pass

    @abstractmethod
    def get_message(self):
        """The function that contains the logic to generate a new message.
        It must return the message as an array.
        This function must be override by every custom stream.

        Raises:
            NotImplementedError: Abstract function has not been overrided.

        Returns:
            list: message
        """
        raise NotImplementedError("Abstract method")

    def next(self):
        """The function that returns the next stream message.

        Raises:
            StopIteration: The stream has finisshed

        Returns:
            list: message
        """
        if self.stream_period > 0:
            if time.time() - self.last_message_time < self.stream_period / 1000:
                time.sleep(
                    self.stream_period / 1000
                    - (
                        (time.time() - self.last_message_time)
                        % (self.stream_period / 1000)
                    )
                )
        starttime = time.time()

        try:
            message = self.get_message()
        except Exception:
            self.stop()
            raise StopIteration

        self.last_message_time = starttime

        return message

get_message(self)

The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.

Exceptions:

Type Description
NotImplementedError

Abstract function has not been overrided.

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/base_stream.py
@abstractmethod
def get_message(self):
    """The function that contains the logic to generate a new message.
    It must return the message as an array.
    This function must be override by every custom stream.

    Raises:
        NotImplementedError: Abstract function has not been overrided.

    Returns:
        list: message
    """
    raise NotImplementedError("Abstract method")

next(self)

The function that returns the next stream message.

Exceptions:

Type Description
StopIteration

The stream has finisshed

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/base_stream.py
def next(self):
    """The function that returns the next stream message.

    Raises:
        StopIteration: The stream has finisshed

    Returns:
        list: message
    """
    if self.stream_period > 0:
        if time.time() - self.last_message_time < self.stream_period / 1000:
            time.sleep(
                self.stream_period / 1000
                - (
                    (time.time() - self.last_message_time)
                    % (self.stream_period / 1000)
                )
            )
    starttime = time.time()

    try:
        message = self.get_message()
    except Exception:
        self.stop()
        raise StopIteration

    self.last_message_time = starttime

    return message

start(self)

Function to be called before asking any message.

Source code in ADLStream/data/stream/base_stream.py
def start(self):
    """Function to be called before asking any message."""
    self.last_message_time = time.time()

stop(self)

Function to be called when stream is finished.

Source code in ADLStream/data/stream/base_stream.py
def stop(self):
    """Function to be called when stream is finished."""
    pass