Skip to content

ADLStream.data.stream.FileStream

Abstract File Stream.

This class implements the logic to read lines from a file.

Every class inheriting FileStream must have the properties below and implement decode function with the signature message = decode(line), where line is a string read from the file and message must be of type list.

Examples:

    class CSVFileStream(FileStream):

        def __init__(self, filename, stream_period=0, timeout=30000, **kwargs):
            super().__init__(filename, stream_period, timeout, **kwargs)

        def decode(self, line):
            return line.split(",")

Parameters:

Name Type Description Default
filename str

Path of file to read.

required
skip_first int >=0

Number of lines to skip at the begining. Defaults to 0.

0
stream_period int >=0

Stream time period in milliseconds. It is the minimun time between consecutive messages in the stream. If it is 0, when a message is required (next), it is sent as soon as possible. Defaults to 100.

100
timeout int >0

Stream time out in milliseconds. It is the maximun time to wait for the next message. If it takes longer, StopIteration exception is raised. Defaults to 30000.

30000
Source code in ADLStream/data/stream/file_stream.py
class FileStream(BaseStream, ABC):
    """Abstract File Stream.

    This class implements the logic to read lines from a file.

    Every class inheriting `FileStream` must have the properties below and implement
    `decode` function with the signature `message = decode(line)`, where `line` is a
    string read from the file and `message` must be of type list.

    Examples:
    ```
        class CSVFileStream(FileStream):

            def __init__(self, filename, stream_period=0, timeout=30000, **kwargs):
                super().__init__(filename, stream_period, timeout, **kwargs)

            def decode(self, line):
                return line.split(",")
    ```

    Arguments:
        filename (str): Path of file to read.
        skip_first (int >=0, optional): Number of lines to skip at the begining.
            Defaults to 0.
        stream_period (int >=0, optional): Stream time period in milliseconds. 
            It is the minimun time between consecutive messages in the stream. If it 
            is 0, when a message is required (`next`), it is sent as soon as possible.
            Defaults to 100.
        timeout (int >0, optional): Stream time out in milliseconds.
            It is the maximun time to wait for the next message. If it takes longer,
            `StopIteration` exception is raised.
            Defaults to 30000.
    """

    def __init__(
        self, filename, skip_first=0, stream_period=100, timeout=30000, **kwargs
    ):
        super().__init__(stream_period=stream_period, timeout=timeout, **kwargs)
        self.filename = filename
        self.skip_first = skip_first

    def start(self):
        self.file = open(self.filename, "r")
        for _ in range(self.skip_first):
            next(self.file)
        super().start()

    def stop(self):
        self.file.close()
        super().stop()

    @abstractmethod
    def decode(self, line):
        """Transform file line into a message

        Args:
            line (str): Line read from the file. 

        Returns:
            list: represents the data decoded from the file line.
        """
        raise NotImplementedError

    def get_message(self):
        line = next(self.file)
        line = line[:-1]  # Delete newlines character
        message = self.decode(line)
        return message

decode(self, line)

Transform file line into a message

Parameters:

Name Type Description Default
line str

Line read from the file.

required

Returns:

Type Description
list

represents the data decoded from the file line.

Source code in ADLStream/data/stream/file_stream.py
@abstractmethod
def decode(self, line):
    """Transform file line into a message

    Args:
        line (str): Line read from the file. 

    Returns:
        list: represents the data decoded from the file line.
    """
    raise NotImplementedError

get_message(self)

The function that contains the logic to generate a new message. It must return the message as an array. This function must be override by every custom stream.

Exceptions:

Type Description
NotImplementedError

Abstract function has not been overrided.

Returns:

Type Description
list

message

Source code in ADLStream/data/stream/file_stream.py
def get_message(self):
    line = next(self.file)
    line = line[:-1]  # Delete newlines character
    message = self.decode(line)
    return message

start(self)

Function to be called before asking any message.

Source code in ADLStream/data/stream/file_stream.py
def start(self):
    self.file = open(self.filename, "r")
    for _ in range(self.skip_first):
        next(self.file)
    super().start()

stop(self)

Function to be called when stream is finished.

Source code in ADLStream/data/stream/file_stream.py
def stop(self):
    self.file.close()
    super().stop()