ADLStream
ADLStream¶
ADLStream.ADLStream
ADLStream. This is the main object of the framework. Based on a stream generator and a given deep learning model, it runs the training and predicting process in paralell (ideally in two different GPU) to obtain obtain accurate predictions as soon as an instance is received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream_generator |
ADLStream.data.BaseStreamGenerator |
It is in charge of generating new instances from the stream. |
required |
evaluator |
ADLStream.evaluator.BaseEvaluator |
It will deal with the validation logic. |
required |
batch_size |
int |
Number of instances per batch. |
required |
num_batches_fed |
int |
Maximun number of batches to be used for training. |
required |
model_architecture |
str |
Model architecture to use. Possible options can be found in ADLStream.models. |
required |
model_loss |
tf.keras.Loss |
Loss function to use. For references, check tf.keras.losses. |
required |
model_optimizers |
tf.keras.Optimizer |
It defines the training algorithm to use. Fore references, check tf.keras.optimizers. Defaults to "adam". |
required |
model_parameters |
dict |
It contains all the model-creation parameters. It depends on the model architecture chosen. Defaults to {}. |
{} |
train_gpu_index |
int |
GPU index to be used fore training. Defaults to 0. |
0 |
predict_gpu_index |
int |
GPU index to be used fore predicting. Defaults to 1. |
1 |
log_file |
str |
Name of log file. If None, log will be printed on screen and will not be saved. If log_file is given, log level is set to "DEBUG". However if None, log level is kept as default. Defaults to None. |
None |
Source code in ADLStream/adlstream.py
class ADLStream:
"""ADLStream.
This is the main object of the framework.
Based on a stream generator and a given deep learning model, it runs the training and
predicting process in paralell (ideally in two different GPU) to obtain obtain accurate
predictions as soon as an instance is received.
Parameters:
stream_generator (ADLStream.data.BaseStreamGenerator):
It is in charge of generating new instances from the stream.
evaluator (ADLStream.evaluator.BaseEvaluator):
It will deal with the validation logic.
batch_size (int): Number of instances per batch.
num_batches_fed (int): Maximun number of batches to be used for training.
model_architecture (str): Model architecture to use.
Possible options can be found in ADLStream.models.
model_loss (tf.keras.Loss): Loss function to use.
For references, check tf.keras.losses.
model_optimizers (tf.keras.Optimizer, optional): It defines the training algorithm to use.
Fore references, check tf.keras.optimizers.
Defaults to "adam".
model_parameters (dict, optional): It contains all the model-creation parameters.
It depends on the model architecture chosen.
Defaults to {}.
train_gpu_index (int, optional): GPU index to be used fore training.
Defaults to 0.
predict_gpu_index (int, optional): GPU index to be used fore predicting.
Defaults to 1.
log_file (str, optional): Name of log file.
If None, log will be printed on screen and will not be saved.
If log_file is given, log level is set to "DEBUG". However if None,
log level is kept as default.
Defaults to None.
"""
def __init__(
self,
stream_generator: Type["BaseStreamGenerator"],
evaluator: Type["BaseEvaluator"],
batch_size: int,
num_batches_fed: int,
model_architecture: str,
model_loss: Union[str, Callable],
model_optimizer: str = "adam",
model_parameters: dict = {},
train_gpu_index: int = 0,
predict_gpu_index: int = 1,
log_file: Optional[str] = None,
) -> None:
self.stream_generator = stream_generator
self.evaluator = evaluator
self.batch_size = batch_size
self.num_batches_fed = num_batches_fed
self.model_architecture = model_architecture
self.model_loss = model_loss
self.model_optimizer = model_optimizer
self.model_parameters = model_parameters
self.train_gpu_index = train_gpu_index
self.predict_gpu_index = predict_gpu_index
self.log_file = log_file
self.x_shape = None
self.output_size = None
self.weights = None
self.manager = ADLStreamManager()
def training_process(self, context: ADLStreamContext, gpu_index: int) -> None:
"""Training process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"TRAINING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "TRAINING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"TRAINING-PROCESS - There are no enough GPU devices, using CPU",
)
model = None
y_shape = None
output_shape = None
while not context.is_finished():
X, y = context.get_training_data()
if not X:
continue
X, y = np.asarray(X), np.asarray(y)
if model is None:
y_shape = y.shape
output_shape = y.reshape(y_shape[0], -1).shape
context.set_output_size(output_shape[-1])
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
self.x_shape = X.shape
y = y.reshape((y.shape[0], context.get_output_size()))
context.log(
"INFO",
"TRAINING-PROCESS - Training with the last {} instances".format(
X.shape[0]
),
)
model.fit(X, y, context.get_batch_size(), epochs=1, verbose=0)
context.set_weights(model.get_weights())
context.set_new_model_available(True)
context.log("INFO", "TRAINING-PROCESS - Finished stream")
def predicting_process(self, context: ContextManager, gpu_index: int) -> None:
"""Predicting process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"PREDICTING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "PREDICTING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"PREDICTING-PROCESS - There are no enough GPU devices, using CPU",
)
# Wait until model is created and trained for the first time.
while not context.is_new_model_available():
if context.is_time_out():
if not context.get_remaining_test() > 0:
context.log(
"INFO",
"PREDICTING-PROCESS - Time out, no instances were received. Finishing process.",
)
context.set_finished()
return
pass
context.log("INFO", "Starting predictions")
model = None
while True:
X = context.get_test_data()
if not X:
if context.is_time_out():
context.set_finished(True)
context.log("INFO", "PREDICTING-PROCESS - Finished stream")
break
continue
X = np.asarray(X)
if model is None:
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
if context.is_new_model_available():
model.set_weights(context.get_weights())
context.set_new_model_available(False)
predictions = model.predict(X)
context.log(
"INFO", "PREDICTING-PROCESS: {} instances predicted.".format(X.shape[0])
)
context.add_predictions(predictions)
def run(self) -> None:
"""Function that run ADLStream.
It run 4 different processes:
- Training process.
- Predicting process.
- Stream generator process.
- Evaluator process.
"""
self.manager.start()
context = self.manager.context(
self.batch_size, self.num_batches_fed, log_file=self.log_file
)
process_stream = Process(
target=self.stream_generator.run,
args=[context],
)
process_train = Process(
target=self.training_process, args=[context, self.train_gpu_index]
)
process_predict = Process(
target=self.predicting_process, args=[context, self.predict_gpu_index]
)
process_evaluator = Process(target=self.evaluator.run, args=[context])
process_stream.start()
process_train.start()
process_predict.start()
process_evaluator.start()
process_stream.join()
process_train.join()
process_predict.join()
process_evaluator.join()
self.x_shape = context.get_shape()
self.output_size = context.get_output_size()
self.weights = context.get_weights()
self.manager.shutdown()
def get_model(self) -> object:
"""Returns model with the latest weights.
Returns:
tf.model: Model.
"""
from ADLStream.models import create_model
model = create_model(
self.model_architecture,
self.x_shape,
self.output_size,
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
model.set_weights(self.weights)
return model
get_model(self)
¶
Returns model with the latest weights.
Returns:
Type | Description |
---|---|
tf.model |
Model. |
Source code in ADLStream/adlstream.py
def get_model(self) -> object:
"""Returns model with the latest weights.
Returns:
tf.model: Model.
"""
from ADLStream.models import create_model
model = create_model(
self.model_architecture,
self.x_shape,
self.output_size,
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
model.set_weights(self.weights)
return model
predicting_process(self, context, gpu_index)
¶
Predicting process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStreamContext |
Shared object among processes. |
required |
gpu_index |
int |
Index of the GPU to use for training |
required |
Source code in ADLStream/adlstream.py
def predicting_process(self, context: ContextManager, gpu_index: int) -> None:
"""Predicting process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"PREDICTING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "PREDICTING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"PREDICTING-PROCESS - There are no enough GPU devices, using CPU",
)
# Wait until model is created and trained for the first time.
while not context.is_new_model_available():
if context.is_time_out():
if not context.get_remaining_test() > 0:
context.log(
"INFO",
"PREDICTING-PROCESS - Time out, no instances were received. Finishing process.",
)
context.set_finished()
return
pass
context.log("INFO", "Starting predictions")
model = None
while True:
X = context.get_test_data()
if not X:
if context.is_time_out():
context.set_finished(True)
context.log("INFO", "PREDICTING-PROCESS - Finished stream")
break
continue
X = np.asarray(X)
if model is None:
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
if context.is_new_model_available():
model.set_weights(context.get_weights())
context.set_new_model_available(False)
predictions = model.predict(X)
context.log(
"INFO", "PREDICTING-PROCESS: {} instances predicted.".format(X.shape[0])
)
context.add_predictions(predictions)
run(self)
¶
Function that run ADLStream. It run 4 different processes: - Training process. - Predicting process. - Stream generator process. - Evaluator process.
Source code in ADLStream/adlstream.py
def run(self) -> None:
"""Function that run ADLStream.
It run 4 different processes:
- Training process.
- Predicting process.
- Stream generator process.
- Evaluator process.
"""
self.manager.start()
context = self.manager.context(
self.batch_size, self.num_batches_fed, log_file=self.log_file
)
process_stream = Process(
target=self.stream_generator.run,
args=[context],
)
process_train = Process(
target=self.training_process, args=[context, self.train_gpu_index]
)
process_predict = Process(
target=self.predicting_process, args=[context, self.predict_gpu_index]
)
process_evaluator = Process(target=self.evaluator.run, args=[context])
process_stream.start()
process_train.start()
process_predict.start()
process_evaluator.start()
process_stream.join()
process_train.join()
process_predict.join()
process_evaluator.join()
self.x_shape = context.get_shape()
self.output_size = context.get_output_size()
self.weights = context.get_weights()
self.manager.shutdown()
training_process(self, context, gpu_index)
¶
Training process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStreamContext |
Shared object among processes. |
required |
gpu_index |
int |
Index of the GPU to use for training |
required |
Source code in ADLStream/adlstream.py
def training_process(self, context: ADLStreamContext, gpu_index: int) -> None:
"""Training process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"TRAINING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "TRAINING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"TRAINING-PROCESS - There are no enough GPU devices, using CPU",
)
model = None
y_shape = None
output_shape = None
while not context.is_finished():
X, y = context.get_training_data()
if not X:
continue
X, y = np.asarray(X), np.asarray(y)
if model is None:
y_shape = y.shape
output_shape = y.reshape(y_shape[0], -1).shape
context.set_output_size(output_shape[-1])
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
self.x_shape = X.shape
y = y.reshape((y.shape[0], context.get_output_size()))
context.log(
"INFO",
"TRAINING-PROCESS - Training with the last {} instances".format(
X.shape[0]
),
)
model.fit(X, y, context.get_batch_size(), epochs=1, verbose=0)
context.set_weights(model.get_weights())
context.set_new_model_available(True)
context.log("INFO", "TRAINING-PROCESS - Finished stream")
ADLStreamContext¶
ADLStream.adlstream.ADLStreamContext
ADLStream context. This object is shared among training, predicting, stream-generator and validator processes. It is used to send the data from the stream generator to the predicting process, then it is used for training and finally the validator has access to the output predictions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
int |
Number of instances per batch. |
required |
num_batches_fed |
int |
Maximun number of batches to be used for training. |
required |
log_file |
str |
Name of log file. If None, log will be printed on screen and will not be saved. If log_file is given, log level is set to "DEBUG". However if None, log level is kept as default. Defaults to None. |
None |
Source code in ADLStream/adlstream.py
class ADLStreamContext:
"""ADLStream context.
This object is shared among training, predicting, stream-generator and validator processes.
It is used to send the data from the stream generator to the predicting process,
then it is used for training and finally the validator has access to the output predictions.
Parameters:
batch_size (int): Number of instances per batch.
num_batches_fed (int): Maximun number of batches to be used for training.
log_file (str, optional): Name of log file.
If None, log will be printed on screen and will not be saved.
If log_file is given, log level is set to "DEBUG". However if None,
log level is kept as default.
Defaults to None.
"""
def __init__(
self,
batch_size: int,
num_batches_fed: int,
log_file: Optional[str] = None,
) -> None:
self.batch_size = batch_size
self.num_batches_fed = num_batches_fed
self.time_out = False
self.finished = False
self.new_model_available = False
self.output_size = None
self.weights = None
self.x_train = []
self.y_train = []
self.x_test = []
self.y_test = []
self.y_eval = []
self.o_eval = []
self.x_eval = []
self.data_lock = Lock()
self.eval_lock = Lock()
self.num_instances_to_train = num_batches_fed * batch_size
self._configure_logging(log_file)
def _configure_logging(self, log_file: str) -> None:
if log_file is not None:
with open(log_file, "w"):
pass
logging.basicConfig(
filename=log_file,
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.DEBUG,
)
def log(
self,
level: str,
message: str,
) -> None:
"""Log message.
Args:
level (str): Log level - "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL".
message (str): Logging message.
Raises:
Exception: Level is not valid.
"""
if level == "DEBUG":
logging.debug(message)
elif level == "INFO":
logging.info(message)
elif level == "WARNING":
logging.warning(message)
elif level == "ERROR":
logging.error(message)
elif level == "CRITICAL":
logging.critical(message)
else:
raise Exception("{} is not a valid logging level".format(level))
def get_batch_size(self) -> int:
"""Get the batch size.
Returns:
int: Batch size.
"""
return self.batch_size
def get_num_batches_fed(self) -> int:
"""Get the number of batches to fed for training.
Returns:
int: Number of batches to fed.
"""
return self.num_batches_fed
def set_time_out(self, time_out: bool = True) -> None:
"""Set time out to true by the stream generator.
Args:
time_out (bool, optional): Whether is time out. Defaults to True.
"""
self.time_out = time_out
def is_time_out(self) -> bool:
"""Whether the stream has timed out.
Returns:
bool: Time out.
"""
return self.time_out
def set_finished(self, finished: bool = True) -> None:
"""Indicate that the stream has finished.
Args:
finished (bool, optional): Whether the stream has finished. Defaults to True.
"""
self.finished = finished
def is_finished(self) -> bool:
"""Whether the stream has finished or not.
Returns:
bool: True if the stream has finished.
"""
return self.finished
def set_new_model_available(self, new_model_available: bool) -> None:
"""Indicate that there are new weights available for the model.
Args:
new_model_available (bool): True if there is a new model. False otherwise.
"""
self.new_model_available = new_model_available
def is_new_model_available(self) -> bool:
"""Whether there are updated weights available for the model.
Returns:
bool: True if there is a new model.
"""
return self.new_model_available
def get_output_size(self) -> int:
"""Get output size of the model (number of neurons of the last layer).
Returns:
int: output size.
"""
return self.output_size
def set_output_size(self, output_size: int) -> None:
"""Indicate the number of features of the output.
Args:
output_size (int): output size.
"""
self.output_size = output_size
def set_weights(self, w: List[np.ndarray]) -> None:
"""Update the model's weigths.
Args:
w (List[np.ndarray]): model's weights.
"""
self.weights = w
def get_weights(self) -> List[np.ndarray]:
"""Get the most updated model's weights.
Returns:
List[np.ndarray]: model's weights.
"""
return self.weights
def add(self, x: List[float], y: Optional[List[float]] = None):
"""Add a new instance from the stream.
The instances will be added to the prediction buffer. The inputs (x) and outputs (y)
do not need to arrive at the same time. However they have to arrive in the same order.
So that the first output (y_0) corresponds to the first input (x_0).
Args:
x (List[float]): input instance of the model.
y (List[float], optional): output intance of the model. Defaults to None.
"""
with self.data_lock:
if x is not None:
if len(self.x_train) < self.batch_size:
# Fill initial training data
self.x_train.append(x)
else:
# Get prediction before using it for training
self.x_test.append(x)
if y is not None:
if len(self.y_train) < self.batch_size:
# Fill initial training data
self.y_train.append(y)
else:
self.y_test.append(y)
def get_test_data(self) -> List[List[float]]:
"""Get the test data (prediction). Once the test data is provided to the
prediction process, the data is added to the training buffer.
Returns:
List[List[float]]: X_test -- model's input.
"""
X, y = [], []
with self.data_lock:
X = self.x_test
y = self.y_test
self.x_test, self.y_test = [], []
self.x_train += X
self.y_train += y
with self.eval_lock:
self.y_eval += y
self.x_eval += X
return X
def get_remaining_test(self) -> int:
"""Get length of the test buffer.
Returns:
int: length of test buffer.
"""
return len(self.x_test)
def get_training_data(self) -> Tuple[List[List[float]], List[List[float]]]:
"""Get data for training the model.
Returns a tuple with two lists such as (X_train, y_train). The length of both lists, is
given by `min(len(y_train), batch_size*num_batches_fed)`. If the training buffer has more
instance than the maximun (`batch_size*num_batches_fed`), it returns the newest instances.
Returns:
(List[List[float]], List[List[float]]): X_train, y_train
"""
X, y = [], []
with self.data_lock:
X = self.x_train
y = self.y_train
if len(y) > self.num_instances_to_train:
index = len(y) - self.num_instances_to_train
X = X[index:]
y = y[index:]
self.x_train = X
self.y_train = y
elif len(y) < self.batch_size:
return [], []
return X[: len(self.y_train)], y
def get_predictions(
self,
) -> Tuple[List[List[float]], List[List[float]], List[List[float]]]:
"""Returns the input instances (x), expected output (y) and models predictions(o).
Returns:
(List[List[float]], List[List[float]], List[List[float]]): (x, y, o)
"""
with self.eval_lock:
x_eval, self.x_eval = self.x_eval, []
y_eval, self.y_eval = self.y_eval, []
o_eval, self.o_eval = self.o_eval, []
return x_eval, y_eval, o_eval
def add_predictions(self, o_eval):
o_eval = [list(p) for p in o_eval]
with self.eval_lock:
self.o_eval += o_eval
add(self, x, y=None)
¶
Add a new instance from the stream. The instances will be added to the prediction buffer. The inputs (x) and outputs (y) do not need to arrive at the same time. However they have to arrive in the same order. So that the first output (y_0) corresponds to the first input (x_0).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
List[float] |
input instance of the model. |
required |
y |
List[float] |
output intance of the model. Defaults to None. |
None |
Source code in ADLStream/adlstream.py
def add(self, x: List[float], y: Optional[List[float]] = None):
"""Add a new instance from the stream.
The instances will be added to the prediction buffer. The inputs (x) and outputs (y)
do not need to arrive at the same time. However they have to arrive in the same order.
So that the first output (y_0) corresponds to the first input (x_0).
Args:
x (List[float]): input instance of the model.
y (List[float], optional): output intance of the model. Defaults to None.
"""
with self.data_lock:
if x is not None:
if len(self.x_train) < self.batch_size:
# Fill initial training data
self.x_train.append(x)
else:
# Get prediction before using it for training
self.x_test.append(x)
if y is not None:
if len(self.y_train) < self.batch_size:
# Fill initial training data
self.y_train.append(y)
else:
self.y_test.append(y)
get_batch_size(self)
¶
Get the batch size.
Returns:
Type | Description |
---|---|
int |
Batch size. |
Source code in ADLStream/adlstream.py
def get_batch_size(self) -> int:
"""Get the batch size.
Returns:
int: Batch size.
"""
return self.batch_size
get_num_batches_fed(self)
¶
Get the number of batches to fed for training.
Returns:
Type | Description |
---|---|
int |
Number of batches to fed. |
Source code in ADLStream/adlstream.py
def get_num_batches_fed(self) -> int:
"""Get the number of batches to fed for training.
Returns:
int: Number of batches to fed.
"""
return self.num_batches_fed
get_output_size(self)
¶
Get output size of the model (number of neurons of the last layer).
Returns:
Type | Description |
---|---|
int |
output size. |
Source code in ADLStream/adlstream.py
def get_output_size(self) -> int:
"""Get output size of the model (number of neurons of the last layer).
Returns:
int: output size.
"""
return self.output_size
get_predictions(self)
¶
Returns the input instances (x), expected output (y) and models predictions(o).
Returns:
Type | Description |
---|---|
(List[List[float]], List[List[float]], List[List[float]]) |
(x, y, o) |
Source code in ADLStream/adlstream.py
def get_predictions(
self,
) -> Tuple[List[List[float]], List[List[float]], List[List[float]]]:
"""Returns the input instances (x), expected output (y) and models predictions(o).
Returns:
(List[List[float]], List[List[float]], List[List[float]]): (x, y, o)
"""
with self.eval_lock:
x_eval, self.x_eval = self.x_eval, []
y_eval, self.y_eval = self.y_eval, []
o_eval, self.o_eval = self.o_eval, []
return x_eval, y_eval, o_eval
get_remaining_test(self)
¶
Get length of the test buffer.
Returns:
Type | Description |
---|---|
int |
length of test buffer. |
Source code in ADLStream/adlstream.py
def get_remaining_test(self) -> int:
"""Get length of the test buffer.
Returns:
int: length of test buffer.
"""
return len(self.x_test)
get_test_data(self)
¶
Get the test data (prediction). Once the test data is provided to the prediction process, the data is added to the training buffer.
Returns:
Type | Description |
---|---|
List[List[float]] |
X_test -- model's input. |
Source code in ADLStream/adlstream.py
def get_test_data(self) -> List[List[float]]:
"""Get the test data (prediction). Once the test data is provided to the
prediction process, the data is added to the training buffer.
Returns:
List[List[float]]: X_test -- model's input.
"""
X, y = [], []
with self.data_lock:
X = self.x_test
y = self.y_test
self.x_test, self.y_test = [], []
self.x_train += X
self.y_train += y
with self.eval_lock:
self.y_eval += y
self.x_eval += X
return X
get_training_data(self)
¶
Get data for training the model.
Returns a tuple with two lists such as (X_train, y_train). The length of both lists, is
given by min(len(y_train), batch_size*num_batches_fed)
. If the training buffer has more
instance than the maximun (batch_size*num_batches_fed
), it returns the newest instances.
Returns:
Type | Description |
---|---|
(List[List[float]], List[List[float]]) |
X_train, y_train |
Source code in ADLStream/adlstream.py
def get_training_data(self) -> Tuple[List[List[float]], List[List[float]]]:
"""Get data for training the model.
Returns a tuple with two lists such as (X_train, y_train). The length of both lists, is
given by `min(len(y_train), batch_size*num_batches_fed)`. If the training buffer has more
instance than the maximun (`batch_size*num_batches_fed`), it returns the newest instances.
Returns:
(List[List[float]], List[List[float]]): X_train, y_train
"""
X, y = [], []
with self.data_lock:
X = self.x_train
y = self.y_train
if len(y) > self.num_instances_to_train:
index = len(y) - self.num_instances_to_train
X = X[index:]
y = y[index:]
self.x_train = X
self.y_train = y
elif len(y) < self.batch_size:
return [], []
return X[: len(self.y_train)], y
get_weights(self)
¶
Get the most updated model's weights.
Returns:
Type | Description |
---|---|
List[np.ndarray] |
model's weights. |
Source code in ADLStream/adlstream.py
def get_weights(self) -> List[np.ndarray]:
"""Get the most updated model's weights.
Returns:
List[np.ndarray]: model's weights.
"""
return self.weights
is_finished(self)
¶
Whether the stream has finished or not.
Returns:
Type | Description |
---|---|
bool |
True if the stream has finished. |
Source code in ADLStream/adlstream.py
def is_finished(self) -> bool:
"""Whether the stream has finished or not.
Returns:
bool: True if the stream has finished.
"""
return self.finished
is_new_model_available(self)
¶
Whether there are updated weights available for the model.
Returns:
Type | Description |
---|---|
bool |
True if there is a new model. |
Source code in ADLStream/adlstream.py
def is_new_model_available(self) -> bool:
"""Whether there are updated weights available for the model.
Returns:
bool: True if there is a new model.
"""
return self.new_model_available
is_time_out(self)
¶
Whether the stream has timed out.
Returns:
Type | Description |
---|---|
bool |
Time out. |
Source code in ADLStream/adlstream.py
def is_time_out(self) -> bool:
"""Whether the stream has timed out.
Returns:
bool: Time out.
"""
return self.time_out
log(self, level, message)
¶
Log message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
level |
str |
Log level - "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". |
required |
message |
str |
Logging message. |
required |
Exceptions:
Type | Description |
---|---|
Exception |
Level is not valid. |
Source code in ADLStream/adlstream.py
def log(
self,
level: str,
message: str,
) -> None:
"""Log message.
Args:
level (str): Log level - "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL".
message (str): Logging message.
Raises:
Exception: Level is not valid.
"""
if level == "DEBUG":
logging.debug(message)
elif level == "INFO":
logging.info(message)
elif level == "WARNING":
logging.warning(message)
elif level == "ERROR":
logging.error(message)
elif level == "CRITICAL":
logging.critical(message)
else:
raise Exception("{} is not a valid logging level".format(level))
set_finished(self, finished=True)
¶
Indicate that the stream has finished.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
finished |
bool |
Whether the stream has finished. Defaults to True. |
True |
Source code in ADLStream/adlstream.py
def set_finished(self, finished: bool = True) -> None:
"""Indicate that the stream has finished.
Args:
finished (bool, optional): Whether the stream has finished. Defaults to True.
"""
self.finished = finished
set_new_model_available(self, new_model_available)
¶
Indicate that there are new weights available for the model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_model_available |
bool |
True if there is a new model. False otherwise. |
required |
Source code in ADLStream/adlstream.py
def set_new_model_available(self, new_model_available: bool) -> None:
"""Indicate that there are new weights available for the model.
Args:
new_model_available (bool): True if there is a new model. False otherwise.
"""
self.new_model_available = new_model_available
set_output_size(self, output_size)
¶
Indicate the number of features of the output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_size |
int |
output size. |
required |
Source code in ADLStream/adlstream.py
def set_output_size(self, output_size: int) -> None:
"""Indicate the number of features of the output.
Args:
output_size (int): output size.
"""
self.output_size = output_size
set_time_out(self, time_out=True)
¶
Set time out to true by the stream generator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_out |
bool |
Whether is time out. Defaults to True. |
True |
Source code in ADLStream/adlstream.py
def set_time_out(self, time_out: bool = True) -> None:
"""Set time out to true by the stream generator.
Args:
time_out (bool, optional): Whether is time out. Defaults to True.
"""
self.time_out = time_out
set_weights(self, w)
¶
Update the model's weigths.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
w |
List[np.ndarray] |
model's weights. |
required |
Source code in ADLStream/adlstream.py
def set_weights(self, w: List[np.ndarray]) -> None:
"""Update the model's weigths.
Args:
w (List[np.ndarray]): model's weights.
"""
self.weights = w
ADLStreamManager¶
ADLStream.adlstream.ADLStreamManager
ADLStream Manager Manager server which hold ADLStreamContext object. It allows other processes to manipulate the shared context.
self.register("context", ADLStreamContext)
Source code in ADLStream/adlstream.py
class ADLStreamManager(BaseManager):
"""ADLStream Manager
Manager server which hold ADLStreamContext object.
It allows other processes to manipulate the shared context.
```
self.register("context", ADLStreamContext)
```
"""
def __init__(self):
super().__init__()
self.register("context", ADLStreamContext)
connect(self)
inherited
¶
Connect manager object to the server process
Source code in ADLStream/adlstream.py
def connect(self):
'''
Connect manager object to the server process
'''
Listener, Client = listener_client[self._serializer]
conn = Client(self._address, authkey=self._authkey)
dispatch(conn, None, 'dummy')
self._state.value = State.STARTED
get_server(self)
inherited
¶
Return server object with serve_forever() method and address attribute
Source code in ADLStream/adlstream.py
def get_server(self):
'''
Return server object with serve_forever() method and address attribute
'''
if self._state.value != State.INITIAL:
if self._state.value == State.STARTED:
raise ProcessError("Already started server")
elif self._state.value == State.SHUTDOWN:
raise ProcessError("Manager has shut down")
else:
raise ProcessError(
"Unknown state {!r}".format(self._state.value))
return Server(self._registry, self._address,
self._authkey, self._serializer)
join(self, timeout=None)
inherited
¶
Join the manager process (if it has been spawned)
Source code in ADLStream/adlstream.py
def join(self, timeout=None):
'''
Join the manager process (if it has been spawned)
'''
if self._process is not None:
self._process.join(timeout)
if not self._process.is_alive():
self._process = None
start(self, initializer=None, initargs=())
inherited
¶
Spawn a server process for this manager object
Source code in ADLStream/adlstream.py
def start(self, initializer=None, initargs=()):
'''
Spawn a server process for this manager object
'''
if self._state.value != State.INITIAL:
if self._state.value == State.STARTED:
raise ProcessError("Already started server")
elif self._state.value == State.SHUTDOWN:
raise ProcessError("Manager has shut down")
else:
raise ProcessError(
"Unknown state {!r}".format(self._state.value))
if initializer is not None and not callable(initializer):
raise TypeError('initializer must be a callable')
# pipe over which we will retrieve address of server
reader, writer = connection.Pipe(duplex=False)
# spawn process which runs a server
self._process = self._ctx.Process(
target=type(self)._run_server,
args=(self._registry, self._address, self._authkey,
self._serializer, writer, initializer, initargs),
)
ident = ':'.join(str(i) for i in self._process._identity)
self._process.name = type(self).__name__ + '-' + ident
self._process.start()
# get address of server
writer.close()
self._address = reader.recv()
reader.close()
# register a finalizer
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
exitpriority=0
)