Skip to content

ClassificationStreamGenerator

ADLStream.data.ClassificationStreamGenerator

Classification stream generator.

This class is used for generating streams for classification problems.

Parameters:

Name Type Description Default
stream inherits BaseStream

Stream source to be feed to the ADLStream framework.

required
label_index int or list

The column index/indices of the target label. Defaults to -1.

[-1]
one_hot_labels list or None

Possible label values if one-hot encoding must be done. If None, the target value is not one-hot encoded. Defaults to None.

None
Source code in ADLStream/data/classification_generator.py
class ClassificationStreamGenerator(BaseStreamGenerator):
    """Classification stream generator.

    This class is used for generating streams for classification problems.

    Arguments:
        stream (inherits BaseStream):
            Stream source to be feed to the ADLStream framework.
        label_index (int or list, optional): The column index/indices of the target
            label.
            Defaults to -1.
        one_hot_labels (list or None, optional): Possible label values if one-hot
            encoding must be done. If None, the target value is not one-hot encoded.
            Defaults to None.
    """

    def __init__(
        self,
        stream: Type["BaseStream"],
        label_index: Optional[Union[List[int], int]] = [-1],
        one_hot_labels: Optional[List] = None,
        **kwargs
    ) -> None:
        super().__init__(stream, **kwargs)
        self.label_index = label_index if type(label_index) is list else [label_index]
        self.labels = one_hot_labels
        self.one_hot_encoder = None
        if self.labels:
            self.one_hot_encoder = OneHotEncoder()
            self.one_hot_encoder.fit(np.asarray(self.labels).reshape(-1, 1))

    def preprocess(self, message: List) -> Tuple[List, List]:
        """Divide the message in `X` and `y`.
        It uses the `self.labels_index` features as `y` and the rest as `x`.
        Additionally, if indicated, it performs a one hot encoding to the labels.

        Args:
            message (List): stream message.

        Returns:
            Tuple[List, List]: `(x, y)` input value and its class.
        """
        x = message
        y = [message.pop(i) for i in self.label_index]

        if self.labels:
            y = self.one_hot_encoder.transform([y]).toarray()
            y = list(y[0])

        return x, y

num_messages: int inherited property writable

Return number of messages processed from the stream.

Returns:

Type Description
int

number of messages.

next(self, context) inherited

Get the next message from the stream.

Parameters:

Name Type Description Default
context ADLStreamContext

ADLStream shared object. Used for logging.

required

Returns:

Type Description
List[float]

new message.

Source code in ADLStream/data/classification_generator.py
def next(self, context: "ADLStreamContext") -> List[float]:
    """Get the next message from the stream.

    Args:
        context (ADLStreamContext): ADLStream shared object. Used for logging.

    Returns:
        List[float]: new message.
    """
    message = None
    try:
        self._check_number_instances()
        message = self.stream.next()
        self.num_messages += 1
    except StopIteration:
        context.log("INFO", "GENERATOR-PROCESS - Stream has finished")
        context.set_time_out()
    except Exception as e:
        context.log(
            "ERROR",
            "GENERATOR-PROCESS - Error getting messages from stream {}".format(
                str(e)
            ),
        )
        context.set_time_out()
    return message

preprocess(self, message)

Divide the message in X and y. It uses the self.labels_index features as y and the rest as x. Additionally, if indicated, it performs a one hot encoding to the labels.

Parameters:

Name Type Description Default
message List

stream message.

required

Returns:

Type Description
Tuple[List, List]

(x, y) input value and its class.

Source code in ADLStream/data/classification_generator.py
def preprocess(self, message: List) -> Tuple[List, List]:
    """Divide the message in `X` and `y`.
    It uses the `self.labels_index` features as `y` and the rest as `x`.
    Additionally, if indicated, it performs a one hot encoding to the labels.

    Args:
        message (List): stream message.

    Returns:
        Tuple[List, List]: `(x, y)` input value and its class.
    """
    x = message
    y = [message.pop(i) for i in self.label_index]

    if self.labels:
        y = self.one_hot_encoder.transform([y]).toarray()
        y = list(y[0])

    return x, y

run(self, context) inherited

The function that sends data to ADLStream framework

It gets messages from the stream, preprocesses them and sends to the specific ADLStream context.

Parameters:

Name Type Description Default
context ADLStream.ADLStreamContext

context where to send the stream data

required
Source code in ADLStream/data/classification_generator.py
def run(self, context: "ADLStreamContext"):
    """The function that sends data to ADLStream framework

    It gets messages from the stream, preprocesses them and sends to the specific
    ADLStream context.

    Args:
        context (ADLStream.ADLStreamContext): context where to send the stream data
    """
    self.stream.start()
    message = self.next(context)
    while message is not None:
        (x, y) = self.preprocess(message)
        if x is not None or y is not None:
            x = self._perform_preprocessing_steps(x)
            context.add(x, y)
        message = self.next(context)