FakeStream
ADLStream.data.stream.KafkaStream
Fake Stream. This stream returns a sine wave in a specific shape and length.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_features |
int >=1 |
Number of features for each message. Defaults to 1. |
1 |
stream_length |
int |
Maximun number of messages to be returned. Defaults to 1000. |
1000 |
**kwargs |
BaseStream arguments. |
{} |
Source code in ADLStream/data/stream/fake_stream.py
class FakeStream(BaseStream):
"""Fake Stream.
This stream returns a sine wave in a specific shape and length.
Arguments:
num_features (int >=1, optional): Number of features for each message.
Defaults to 1.
stream_length (int, optional): Maximun number of messages to be returned.
Defaults to 1000.
**kwargs: BaseStream arguments.
"""
def __init__(
self,
num_features=1,
stream_length=1000,
stream_period=0,
timeout=30000,
**kwargs
):
super().__init__(stream_period=stream_period, timeout=timeout, **kwargs)
self.num_features = num_features
self.stream_length = stream_length
self.messages = []
def start(self):
super().start()
data = list(
np.sin([[x / 100] * self.num_features for x in range(self.stream_length)])
)
self.messages = data
def get_message(self):
message = self.messages.pop(0)
return message
get_message(self)
¶
The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Abstract function has not been overrided. |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/fake_stream.py
def get_message(self):
message = self.messages.pop(0)
return message
next(self)
inherited
¶
The function that returns the next stream message.
Exceptions:
Type | Description |
---|---|
StopIteration |
The stream has finisshed |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/fake_stream.py
def next(self):
"""The function that returns the next stream message.
Raises:
StopIteration: The stream has finisshed
Returns:
list: message
"""
if self.stream_period > 0:
if time.time() - self.last_message_time < self.stream_period / 1000:
time.sleep(
self.stream_period / 1000
- (
(time.time() - self.last_message_time)
% (self.stream_period / 1000)
)
)
starttime = time.time()
try:
message = self.get_message()
except Exception:
self.stop()
raise StopIteration
self.last_message_time = starttime
return message
start(self)
¶
Function to be called before asking any message.
Source code in ADLStream/data/stream/fake_stream.py
def start(self):
super().start()
data = list(
np.sin([[x / 100] * self.num_features for x in range(self.stream_length)])
)
self.messages = data
stop(self)
inherited
¶
Function to be called when stream is finished.
Source code in ADLStream/data/stream/fake_stream.py
def stop(self):
"""Function to be called when stream is finished."""
pass