ClassificationStreamGenerator
ADLStream.data.ClassificationStreamGenerator
Classification stream generator.
This class is used for generating streams for classification problems.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream |
inherits BaseStream |
Stream source to be feed to the ADLStream framework. |
required |
label_index |
int or list |
The column index/indices of the target label. Defaults to -1. |
[-1] |
one_hot_labels |
list or None |
Possible label values if one-hot encoding must be done. If None, the target value is not one-hot encoded. Defaults to None. |
None |
Source code in ADLStream/data/classification_generator.py
class ClassificationStreamGenerator(BaseStreamGenerator):
"""Classification stream generator.
This class is used for generating streams for classification problems.
Arguments:
stream (inherits BaseStream):
Stream source to be feed to the ADLStream framework.
label_index (int or list, optional): The column index/indices of the target
label.
Defaults to -1.
one_hot_labels (list or None, optional): Possible label values if one-hot
encoding must be done. If None, the target value is not one-hot encoded.
Defaults to None.
"""
def __init__(
self,
stream: Type["BaseStream"],
label_index: Optional[Union[List[int], int]] = [-1],
one_hot_labels: Optional[List] = None,
**kwargs
) -> None:
super().__init__(stream, **kwargs)
self.label_index = label_index if type(label_index) is list else [label_index]
self.labels = one_hot_labels
self.one_hot_encoder = None
if self.labels:
self.one_hot_encoder = OneHotEncoder()
self.one_hot_encoder.fit(np.asarray(self.labels).reshape(-1, 1))
def preprocess(self, message: List) -> Tuple[List, List]:
"""Divide the message in `X` and `y`.
It uses the `self.labels_index` features as `y` and the rest as `x`.
Additionally, if indicated, it performs a one hot encoding to the labels.
Args:
message (List): stream message.
Returns:
Tuple[List, List]: `(x, y)` input value and its class.
"""
x = message
y = [message.pop(i) for i in self.label_index]
if self.labels:
y = self.one_hot_encoder.transform([y]).toarray()
y = list(y[0])
return x, y
num_messages: int
inherited
property
writable
¶
Return number of messages processed from the stream.
Returns:
Type | Description |
---|---|
int |
number of messages. |
next(self, context)
inherited
¶
Get the next message from the stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStreamContext |
ADLStream shared object. Used for logging. |
required |
Returns:
Type | Description |
---|---|
List[float] |
new message. |
Source code in ADLStream/data/classification_generator.py
def next(self, context: "ADLStreamContext") -> List[float]:
"""Get the next message from the stream.
Args:
context (ADLStreamContext): ADLStream shared object. Used for logging.
Returns:
List[float]: new message.
"""
message = None
try:
self._check_number_instances()
message = self.stream.next()
self.num_messages += 1
except StopIteration:
context.log("INFO", "GENERATOR-PROCESS - Stream has finished")
context.set_time_out()
except Exception as e:
context.log(
"ERROR",
"GENERATOR-PROCESS - Error getting messages from stream {}".format(
str(e)
),
)
context.set_time_out()
return message
preprocess(self, message)
¶
Divide the message in X
and y
.
It uses the self.labels_index
features as y
and the rest as x
.
Additionally, if indicated, it performs a one hot encoding to the labels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
List |
stream message. |
required |
Returns:
Type | Description |
---|---|
Tuple[List, List] |
|
Source code in ADLStream/data/classification_generator.py
def preprocess(self, message: List) -> Tuple[List, List]:
"""Divide the message in `X` and `y`.
It uses the `self.labels_index` features as `y` and the rest as `x`.
Additionally, if indicated, it performs a one hot encoding to the labels.
Args:
message (List): stream message.
Returns:
Tuple[List, List]: `(x, y)` input value and its class.
"""
x = message
y = [message.pop(i) for i in self.label_index]
if self.labels:
y = self.one_hot_encoder.transform([y]).toarray()
y = list(y[0])
return x, y
run(self, context)
inherited
¶
The function that sends data to ADLStream framework
It gets messages from the stream, preprocesses them and sends to the specific ADLStream context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStream.ADLStreamContext |
context where to send the stream data |
required |
Source code in ADLStream/data/classification_generator.py
def run(self, context: "ADLStreamContext"):
"""The function that sends data to ADLStream framework
It gets messages from the stream, preprocesses them and sends to the specific
ADLStream context.
Args:
context (ADLStream.ADLStreamContext): context where to send the stream data
"""
self.stream.start()
message = self.next(context)
while message is not None:
(x, y) = self.preprocess(message)
if x is not None or y is not None:
x = self._perform_preprocessing_steps(x)
context.add(x, y)
message = self.next(context)