Skip to content

FakeStream

ADLStream.data.stream.KafkaStream

Fake Stream. This stream returns a sine wave in a specific shape and length.

Parameters:

Name Type Description Default
num_features int >=1

Number of features for each message. Defaults to 1.

1
stream_length int

Maximun number of messages to be returned. Defaults to 1000.

1000
**kwargs

BaseStream arguments.

{}
Source code in ADLStream/data/stream/fake_stream.py
class FakeStream(BaseStream):
    """Fake Stream.
    This stream returns a sine wave in a specific shape and length.

    Arguments:
        num_features (int >=1, optional): Number of features for each message.
            Defaults to 1.
        stream_length (int, optional): Maximun number of messages to be returned.
            Defaults to 1000.
        **kwargs: BaseStream arguments.
    """

    def __init__(
        self,
        num_features=1,
        stream_length=1000,
        stream_period=0,
        timeout=30000,
        **kwargs
    ):
        super().__init__(stream_period=stream_period, timeout=timeout, **kwargs)
        self.num_features = num_features
        self.stream_length = stream_length

        self.messages = []

    def start(self):
        super().start()
        data = list(
            np.sin([[x / 100] * self.num_features for x in range(self.stream_length)])
        )
        self.messages = data

    def get_message(self):
        message = self.messages.pop(0)
        return message

get_message(self)

The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.

Exceptions:

Type Description
NotImplementedError

Abstract function has not been overrided.

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/fake_stream.py
def get_message(self):
    message = self.messages.pop(0)
    return message

next(self) inherited

The function that returns the next stream message.

Exceptions:

Type Description
StopIteration

The stream has finisshed

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/fake_stream.py
def next(self):
    """The function that returns the next stream message.

    Raises:
        StopIteration: The stream has finisshed

    Returns:
        list: message
    """
    if self.stream_period > 0:
        if time.time() - self.last_message_time < self.stream_period / 1000:
            time.sleep(
                self.stream_period / 1000
                - (
                    (time.time() - self.last_message_time)
                    % (self.stream_period / 1000)
                )
            )
    starttime = time.time()

    try:
        message = self.get_message()
    except Exception:
        self.stop()
        raise StopIteration

    self.last_message_time = starttime

    return message

start(self)

Function to be called before asking any message.

Source code in ADLStream/data/stream/fake_stream.py
def start(self):
    super().start()
    data = list(
        np.sin([[x / 100] * self.num_features for x in range(self.stream_length)])
    )
    self.messages = data

stop(self) inherited

Function to be called when stream is finished.

Source code in ADLStream/data/stream/fake_stream.py
def stop(self):
    """Function to be called when stream is finished."""
    pass