Skip to content

CSVFileStream

ADLStream.data.stream.CSVFileStream

CSV File Stream.

This class creates a stream from a csv file.

Parameters:

Name Type Description Default
filename str

Path of file to read.

required
sep str

Delimiter to use. Defaults to ",".

','
index_col int >=0

Number of columns to use as index. Defaults to 0.

0
header int >=0

Number of rows to use as column names. Defaults to 0.

0
stream_period int >=0

Stream time period in milliseconds. It is the minimun time between consecutive messages in the stream. If it is 0, when a message is required (next), it is sent as soon as possible. Defaults to 100.

100
timeout int >0

Stream time out in milliseconds. It is the maximun time to wait for the next message. If it takes longer, StopIteration exception is raised. Defaults to 30000.

30000
Source code in ADLStream/data/stream/csv_file_stream.py
class CSVFileStream(FileStream):
    """CSV File Stream.

    This class creates a stream from a csv file.

    Arguments:
        filename (str): Path of file to read.
        sep (str): Delimiter to use.
            Defaults to ",".
        index_col (int >=0, optional): Number of columns to use as index.
            Defaults to 0.
        header (int >=0, optional): Number of rows to use as column names.
            Defaults to 0.
        stream_period (int >=0, optional): Stream time period in milliseconds.
            It is the minimun time between consecutive messages in the stream. If it
            is 0, when a message is required (`next`), it is sent as soon as possible.
            Defaults to 100.
        timeout (int >0, optional): Stream time out in milliseconds.
            It is the maximun time to wait for the next message. If it takes longer,
            `StopIteration` exception is raised.
            Defaults to 30000.
    """

    def __init__(
        self,
        filename,
        sep=",",
        index_col=0,
        header=0,
        stream_period=100,
        timeout=30000,
        **kwargs
    ):
        super().__init__(
            filename=filename,
            skip_first=header,
            stream_period=stream_period,
            timeout=timeout,
            **kwargs
        )
        self.sep = sep
        self.index_col = index_col

    def decode(self, line):
        message = line.strip().split(self.sep)[self.index_col :]
        message = [float(x.strip()) for x in message]
        return message

decode(self, line)

Transform file line into a message

Parameters:

Name Type Description Default
line str

Line read from the file.

required

Returns:

Type Description
list

represents the data decoded from the file line.

Source code in ADLStream/data/stream/csv_file_stream.py
def decode(self, line):
    message = line.strip().split(self.sep)[self.index_col :]
    message = [float(x.strip()) for x in message]
    return message

get_message(self) inherited

The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.

Exceptions:

Type Description
NotImplementedError

Abstract function has not been overrided.

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/csv_file_stream.py
def get_message(self):
    line = next(self.file)
    line = line[:-1]  # Delete newlines character
    message = self.decode(line)
    return message

next(self) inherited

The function that returns the next stream message.

Exceptions:

Type Description
StopIteration

The stream has finisshed

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/csv_file_stream.py
def next(self):
    """The function that returns the next stream message.

    Raises:
        StopIteration: The stream has finisshed

    Returns:
        list: message
    """
    if self.stream_period > 0:
        if time.time() - self.last_message_time < self.stream_period / 1000:
            time.sleep(
                self.stream_period / 1000
                - (
                    (time.time() - self.last_message_time)
                    % (self.stream_period / 1000)
                )
            )
    starttime = time.time()

    try:
        message = self.get_message()
    except Exception:
        self.stop()
        raise StopIteration

    self.last_message_time = starttime

    return message

start(self) inherited

Function to be called before asking any message.

Source code in ADLStream/data/stream/csv_file_stream.py
def start(self):
    self.file = open(self.filename, "r")
    for _ in range(self.skip_first):
        next(self.file)
    super().start()

stop(self) inherited

Function to be called when stream is finished.

Source code in ADLStream/data/stream/csv_file_stream.py
def stop(self):
    self.file.close()
    super().stop()