Skip to content

DataStream

ADLStream.data.stream.DataStream

Data stream.

This class implements a stream given a list of messages.

Parameters:

Name Type Description Default
messages iterable

List with the stream messages.

required
stream_period int >=0

Stream time period in milliseconds. It is the minimun time between consecutive messages in the stream. If it is 0, when a message is required (next), it is sent as soon as possible. Defaults to 0.

0
timeout int >0

Stream time out in milliseconds. It is the maximun time to wait for the next message. If it takes longer, StopIteration exception is raised. Defaults to 30000.

30000
Source code in ADLStream/data/stream/data_stream.py
class DataStream(BaseStream):
    """Data stream.

    This class implements a stream given a list of messages.


    Arguments:
        messages (iterable): List with the stream messages.
        stream_period (int >=0, optional): Stream time period in milliseconds.
            It is the minimun time between consecutive messages in the stream. If it
            is 0, when a message is required (`next`), it is sent as soon as possible.
            Defaults to 0.
        timeout (int >0, optional): Stream time out in milliseconds.
            It is the maximun time to wait for the next message. If it takes longer,
            `StopIteration` exception is raised.
            Defaults to 30000.
    """

    def __init__(self, messages, stream_period=0, timeout=30000, **kwargs):
        super().__init__(stream_period=stream_period, timeout=timeout, **kwargs)
        self.messages = messages.copy()
        self.iterator = None

    def start(self):
        self.iterator = iter(self.messages)
        return super().start()

    def get_message(self):
        return next(self.iterator)

get_message(self)

The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.

Exceptions:

Type Description
NotImplementedError

Abstract function has not been overrided.

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/data_stream.py
def get_message(self):
    return next(self.iterator)

next(self) inherited

The function that returns the next stream message.

Exceptions:

Type Description
StopIteration

The stream has finisshed

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/data_stream.py
def next(self):
    """The function that returns the next stream message.

    Raises:
        StopIteration: The stream has finisshed

    Returns:
        list: message
    """
    if self.stream_period > 0:
        if time.time() - self.last_message_time < self.stream_period / 1000:
            time.sleep(
                self.stream_period / 1000
                - (
                    (time.time() - self.last_message_time)
                    % (self.stream_period / 1000)
                )
            )
    starttime = time.time()

    try:
        message = self.get_message()
    except Exception:
        self.stop()
        raise StopIteration

    self.last_message_time = starttime

    return message

start(self)

Function to be called before asking any message.

Source code in ADLStream/data/stream/data_stream.py
def start(self):
    self.iterator = iter(self.messages)
    return super().start()

stop(self) inherited

Function to be called when stream is finished.

Source code in ADLStream/data/stream/data_stream.py
def stop(self):
    """Function to be called when stream is finished."""
    pass