Skip to content

ADLStream.data.stream.KafkaStream

Fake Stream. This stream returns a sine wave in a specific shape and length.

Parameters:

Name Type Description Default
num_features int >=1

Number of features for each message. Defaults to 1.

1
stream_length int

Maximun number of messages to be returned. Defaults to 1000.

1000
**kwargs

BaseStream arguments.

{}
Source code in ADLStream/data/stream/fake_stream.py
class FakeStream(BaseStream):
    """Fake Stream.
    This stream returns a sine wave in a specific shape and length.

    Arguments:
        num_features (int >=1, optional): Number of features for each message.
            Defaults to 1.
        stream_length (int, optional): Maximun number of messages to be returned.
            Defaults to 1000.
        **kwargs: BaseStream arguments.
    """

    def __init__(
        self,
        num_features=1,
        stream_length=1000,
        stream_period=0,
        timeout=30000,
        **kwargs
    ):
        super().__init__(stream_period=stream_period, timeout=timeout, **kwargs)
        self.num_features = num_features
        self.stream_length = stream_length

        self.messages = []

    def start(self):
        super().start()
        data = list(
            np.sin([[x / 100] * self.num_features for x in range(self.stream_length)])
        )
        self.messages = data

    def get_message(self):
        message = self.messages.pop(0)
        return message

get_message(self)

The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.

Exceptions:

Type Description
NotImplementedError

Abstract function has not been overrided.

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/fake_stream.py
def get_message(self):
    message = self.messages.pop(0)
    return message

start(self)

Function to be called before asking any message.

Source code in ADLStream/data/stream/fake_stream.py
def start(self):
    super().start()
    data = list(
        np.sin([[x / 100] * self.num_features for x in range(self.stream_length)])
    )
    self.messages = data