DataStream
ADLStream.data.stream.DataStream
Data stream.
This class implements a stream given a list of messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages |
iterable |
List with the stream messages. |
required |
stream_period |
int >=0 |
Stream time period in milliseconds.
It is the minimun time between consecutive messages in the stream. If it
is 0, when a message is required ( |
0 |
timeout |
int >0 |
Stream time out in milliseconds.
It is the maximun time to wait for the next message. If it takes longer,
|
30000 |
Source code in ADLStream/data/stream/data_stream.py
class DataStream(BaseStream):
"""Data stream.
This class implements a stream given a list of messages.
Arguments:
messages (iterable): List with the stream messages.
stream_period (int >=0, optional): Stream time period in milliseconds.
It is the minimun time between consecutive messages in the stream. If it
is 0, when a message is required (`next`), it is sent as soon as possible.
Defaults to 0.
timeout (int >0, optional): Stream time out in milliseconds.
It is the maximun time to wait for the next message. If it takes longer,
`StopIteration` exception is raised.
Defaults to 30000.
"""
def __init__(self, messages, stream_period=0, timeout=30000, **kwargs):
super().__init__(stream_period=stream_period, timeout=timeout, **kwargs)
self.messages = messages.copy()
self.iterator = None
def start(self):
self.iterator = iter(self.messages)
return super().start()
def get_message(self):
return next(self.iterator)
get_message(self)
¶
The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Abstract function has not been overrided. |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/data_stream.py
def get_message(self):
return next(self.iterator)
next(self)
inherited
¶
The function that returns the next stream message.
Exceptions:
Type | Description |
---|---|
StopIteration |
The stream has finisshed |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/data_stream.py
def next(self):
"""The function that returns the next stream message.
Raises:
StopIteration: The stream has finisshed
Returns:
list: message
"""
if self.stream_period > 0:
if time.time() - self.last_message_time < self.stream_period / 1000:
time.sleep(
self.stream_period / 1000
- (
(time.time() - self.last_message_time)
% (self.stream_period / 1000)
)
)
starttime = time.time()
try:
message = self.get_message()
except Exception:
self.stop()
raise StopIteration
self.last_message_time = starttime
return message
start(self)
¶
Function to be called before asking any message.
Source code in ADLStream/data/stream/data_stream.py
def start(self):
self.iterator = iter(self.messages)
return super().start()
stop(self)
inherited
¶
Function to be called when stream is finished.
Source code in ADLStream/data/stream/data_stream.py
def stop(self):
"""Function to be called when stream is finished."""
pass