RiverStream
ADLStream.data.stream.RiverStream
River stream wrapper
This class acts as a wrapper for river streams, allowing to use ADLStream logic with streams from river framework. This is extremely usefull in order to use the synthetic stream generators implemented in river.
For more information about river framework, visit its docs.
Examples:
import ADLStream
rom river import synth
dataset = synth.Agrawal(classification_function=0, seed=42)
stream = ADLStream.data.RiverStream(dataset)
generator = ADLStream.data.ClassificationStreamGenerator(stream)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
inherits from river Dataset class |
original stream from river framework. |
required |
stream_period |
int >=0 |
Stream time period in milliseconds.
It is the minimun time between consecutive messages in the stream. If it
is 0, when a message is required ( |
0 |
timeout |
int >0 |
Stream time out in milliseconds.
It is the maximun time to wait for the next message. If it takes longer,
|
30000 |
Source code in ADLStream/data/stream/river_stream.py
class RiverStream(BaseStream):
"""River stream wrapper
This class acts as a wrapper for river streams, allowing to use ADLStream
logic with streams from river framework. This is extremely usefull in order
to use the synthetic stream generators implemented in river.
For more information about river framework, visit its [docs](https://github.com/online-ml/river).
Examples:
```python
import ADLStream
rom river import synth
dataset = synth.Agrawal(classification_function=0, seed=42)
stream = ADLStream.data.RiverStream(dataset)
generator = ADLStream.data.ClassificationStreamGenerator(stream)
```
Arguments:
dataset (inherits from river Dataset class): original stream from river
framework.
stream_period (int >=0, optional): Stream time period in milliseconds.
It is the minimun time between consecutive messages in the stream. If it
is 0, when a message is required (`next`), it is sent as soon as possible.
Defaults to 0.
timeout (int >0, optional): Stream time out in milliseconds.
It is the maximun time to wait for the next message. If it takes longer,
`StopIteration` exception is raised.
Defaults to 30000.
"""
def __init__(
self, dataset, stream_period=0, timeout=30000, n_instances=1000, **kwargs
):
super().__init__(stream_period=stream_period, timeout=timeout, **kwargs)
self.dataset = dataset
self.n_instances = n_instances
self.data = None
def start(self):
self.data = self.dataset.take(self.n_instances)
super().start()
def get_message(self):
# Get X and y from river stream
X, y = next(self.data)
# Convert X from dict format to list
X = list(X.values())
# Return X and y concatenated as a single list
return X + [y]
get_message(self)
¶
The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Abstract function has not been overrided. |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/river_stream.py
def get_message(self):
# Get X and y from river stream
X, y = next(self.data)
# Convert X from dict format to list
X = list(X.values())
# Return X and y concatenated as a single list
return X + [y]
next(self)
inherited
¶
The function that returns the next stream message.
Exceptions:
Type | Description |
---|---|
StopIteration |
The stream has finisshed |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/river_stream.py
def next(self):
"""The function that returns the next stream message.
Raises:
StopIteration: The stream has finisshed
Returns:
list: message
"""
if self.stream_period > 0:
if time.time() - self.last_message_time < self.stream_period / 1000:
time.sleep(
self.stream_period / 1000
- (
(time.time() - self.last_message_time)
% (self.stream_period / 1000)
)
)
starttime = time.time()
try:
message = self.get_message()
except Exception:
self.stop()
raise StopIteration
self.last_message_time = starttime
return message
start(self)
¶
Function to be called before asking any message.
Source code in ADLStream/data/stream/river_stream.py
def start(self):
self.data = self.dataset.take(self.n_instances)
super().start()
stop(self)
inherited
¶
Function to be called when stream is finished.
Source code in ADLStream/data/stream/river_stream.py
def stop(self):
"""Function to be called when stream is finished."""
pass