CSVFileStream
ADLStream.data.stream.CSVFileStream
CSV File Stream.
This class creates a stream from a csv file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Path of file to read. |
required |
sep |
str |
Delimiter to use. Defaults to ",". |
',' |
index_col |
int >=0 |
Number of columns to use as index. Defaults to 0. |
0 |
header |
int >=0 |
Number of rows to use as column names. Defaults to 0. |
0 |
stream_period |
int >=0 |
Stream time period in milliseconds.
It is the minimun time between consecutive messages in the stream. If it
is 0, when a message is required ( |
100 |
timeout |
int >0 |
Stream time out in milliseconds.
It is the maximun time to wait for the next message. If it takes longer,
|
30000 |
Source code in ADLStream/data/stream/csv_file_stream.py
class CSVFileStream(FileStream):
"""CSV File Stream.
This class creates a stream from a csv file.
Arguments:
filename (str): Path of file to read.
sep (str): Delimiter to use.
Defaults to ",".
index_col (int >=0, optional): Number of columns to use as index.
Defaults to 0.
header (int >=0, optional): Number of rows to use as column names.
Defaults to 0.
stream_period (int >=0, optional): Stream time period in milliseconds.
It is the minimun time between consecutive messages in the stream. If it
is 0, when a message is required (`next`), it is sent as soon as possible.
Defaults to 100.
timeout (int >0, optional): Stream time out in milliseconds.
It is the maximun time to wait for the next message. If it takes longer,
`StopIteration` exception is raised.
Defaults to 30000.
"""
def __init__(
self,
filename,
sep=",",
index_col=0,
header=0,
stream_period=100,
timeout=30000,
**kwargs
):
super().__init__(
filename=filename,
skip_first=header,
stream_period=stream_period,
timeout=timeout,
**kwargs
)
self.sep = sep
self.index_col = index_col
def decode(self, line):
message = line.strip().split(self.sep)[self.index_col :]
message = [float(x.strip()) for x in message]
return message
decode(self, line)
¶
Transform file line into a message
Parameters:
Name | Type | Description | Default |
---|---|---|---|
line |
str |
Line read from the file. |
required |
Returns:
Type | Description |
---|---|
list |
represents the data decoded from the file line. |
Source code in ADLStream/data/stream/csv_file_stream.py
def decode(self, line):
message = line.strip().split(self.sep)[self.index_col :]
message = [float(x.strip()) for x in message]
return message
get_message(self)
inherited
¶
The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Abstract function has not been overrided. |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/csv_file_stream.py
def get_message(self):
line = next(self.file)
line = line[:-1] # Delete newlines character
message = self.decode(line)
return message
next(self)
inherited
¶
The function that returns the next stream message.
Exceptions:
Type | Description |
---|---|
StopIteration |
The stream has finisshed |
Returns:
Type | Description |
---|---|
list |
message |
Source code in ADLStream/data/stream/csv_file_stream.py
def next(self):
"""The function that returns the next stream message.
Raises:
StopIteration: The stream has finisshed
Returns:
list: message
"""
if self.stream_period > 0:
if time.time() - self.last_message_time < self.stream_period / 1000:
time.sleep(
self.stream_period / 1000
- (
(time.time() - self.last_message_time)
% (self.stream_period / 1000)
)
)
starttime = time.time()
try:
message = self.get_message()
except Exception:
self.stop()
raise StopIteration
self.last_message_time = starttime
return message
start(self)
inherited
¶
Function to be called before asking any message.
Source code in ADLStream/data/stream/csv_file_stream.py
def start(self):
self.file = open(self.filename, "r")
for _ in range(self.skip_first):
next(self.file)
super().start()
stop(self)
inherited
¶
Function to be called when stream is finished.
Source code in ADLStream/data/stream/csv_file_stream.py
def stop(self):
self.file.close()
super().stop()