ADLStream.ADLStream
ADLStream.
This is the main object of the framework. Based on a stream generator and a given deep learning model, it runs the training and predicting process in paralell (ideally in two different GPU) to obtain obtain accurate predictions as soon as an instance is received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stream_generator |
ADLStream.data.BaseStreamGenerator |
It is in charge of generating new instances from the stream. |
required |
evaluator |
ADLStream.evaluator.BaseEvaluator |
It will deal with the validation logic. |
required |
batch_size |
int |
Number of instances per batch. |
required |
num_batches_fed |
int |
Maximun number of batches to be used for training. |
required |
model_architecture |
str |
Model architecture to use. Possible options can be found in ADLStream.models. |
required |
model_loss |
tf.keras.Loss |
Loss function to use. For references, check tf.keras.losses. |
required |
model_optimizers |
tf.keras.Optimizer |
It defines the training algorithm to use. Fore references, check tf.keras.optimizers. Defaults to "adam". |
required |
model_parameters |
dict |
It contains all the model-creation parameters. It depends on the model architecture chosen. Defaults to {}. |
{} |
train_gpu_index |
int |
GPU index to be used fore training. Defaults to 0. |
0 |
predict_gpu_index |
int |
GPU index to be used fore predicting. Defaults to 1. |
1 |
log_file |
str |
Name of log file. If None, log will be printed on screen and will not be saved. If log_file is given, log level is set to "DEBUG". However if None, log level is kept as default. Defaults to None. |
None |
Source code in ADLStream/adlstream.py
class ADLStream:
"""ADLStream.
This is the main object of the framework.
Based on a stream generator and a given deep learning model, it runs the training and
predicting process in paralell (ideally in two different GPU) to obtain obtain accurate
predictions as soon as an instance is received.
Parameters:
stream_generator (ADLStream.data.BaseStreamGenerator):
It is in charge of generating new instances from the stream.
evaluator (ADLStream.evaluator.BaseEvaluator):
It will deal with the validation logic.
batch_size (int): Number of instances per batch.
num_batches_fed (int): Maximun number of batches to be used for training.
model_architecture (str): Model architecture to use.
Possible options can be found in ADLStream.models.
model_loss (tf.keras.Loss): Loss function to use.
For references, check tf.keras.losses.
model_optimizers (tf.keras.Optimizer, optional): It defines the training algorithm to use.
Fore references, check tf.keras.optimizers.
Defaults to "adam".
model_parameters (dict, optional): It contains all the model-creation parameters.
It depends on the model architecture chosen.
Defaults to {}.
train_gpu_index (int, optional): GPU index to be used fore training.
Defaults to 0.
predict_gpu_index (int, optional): GPU index to be used fore predicting.
Defaults to 1.
log_file (str, optional): Name of log file.
If None, log will be printed on screen and will not be saved.
If log_file is given, log level is set to "DEBUG". However if None,
log level is kept as default.
Defaults to None.
"""
def __init__(
self,
stream_generator,
evaluator,
batch_size,
num_batches_fed,
model_architecture,
model_loss,
model_optimizer="adam",
model_parameters={},
train_gpu_index=0,
predict_gpu_index=1,
log_file=None,
):
self.stream_generator = stream_generator
self.evaluator = evaluator
self.batch_size = batch_size
self.num_batches_fed = num_batches_fed
self.model_architecture = model_architecture
self.model_loss = model_loss
self.model_optimizer = model_optimizer
self.model_parameters = model_parameters
self.train_gpu_index = train_gpu_index
self.predict_gpu_index = predict_gpu_index
self.log_file = log_file
self.manager = ADLStreamManager()
def training_process(self, context, gpu_index):
"""Training process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"TRAINING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "TRAINING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"TRAINING-PROCESS - There are no enough GPU devices, using CPU",
)
model = None
y_shape = None
output_shape = None
while not context.is_finished():
X, y = context.get_training_data()
if not X:
continue
X, y = np.asarray(X), np.asarray(y)
if model is None:
y_shape = y.shape
output_shape = y.reshape(y_shape[0], -1).shape
context.set_output_size(output_shape[-1])
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
y = y.reshape((y.shape[0], context.get_output_size()))
context.log(
"INFO",
"TRAINING-PROCESS - Training with the last {} instances".format(
X.shape[0]
),
)
model.fit(X, y, context.get_batch_size(), epochs=1, verbose=0)
context.set_weights(model.get_weights())
context.set_new_model_available(True)
context.log("INFO", "TRAINING-PROCESS - Finished stream")
def predicting_process(self, context, gpu_index):
"""Predicting process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"PREDICTING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "PREDICTING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"PREDICTING-PROCESS - There are no enough GPU devices, using CPU",
)
# Wait until model is created and trained for the first time.
while not context.is_new_model_available():
if context.is_time_out():
if not context.get_remaining_test() > 0:
context.log(
"INFO",
"PREDICTING-PROCESS - Time out, no instances were received. Finishing process.",
)
context.set_finished()
return
pass
context.log("INFO", "Starting predictions")
model = None
while True:
X = context.get_test_data()
if not X:
if context.is_time_out():
context.set_finished(True)
context.log("INFO", "PREDICTING-PROCESS - Finished stream")
break
continue
X = np.asarray(X)
if model is None:
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
if context.is_new_model_available():
model.set_weights(context.get_weights())
context.set_new_model_available(False)
predictions = model.predict(X)
context.log(
"INFO", "PREDICTING-PROCESS: {} instances predicted.".format(X.shape[0])
)
context.add_predictions(predictions)
def run(self):
"""Function that run ADLStream.
It run 4 different processes:
- Training process.
- Predicting process.
- Stream generator process.
- Evaluator process.
"""
self.manager.start()
context = self.manager.context(
self.batch_size, self.num_batches_fed, log_file=self.log_file
)
process_stream = Process(target=self.stream_generator.run, args=[context],)
process_train = Process(
target=self.training_process, args=[context, self.train_gpu_index]
)
process_predict = Process(
target=self.predicting_process, args=[context, self.predict_gpu_index]
)
process_evaluator = Process(target=self.evaluator.run, args=[context])
process_stream.start()
process_train.start()
process_predict.start()
process_evaluator.start()
process_stream.join()
process_train.join()
process_predict.join()
process_evaluator.join()
self.manager.shutdown()
predicting_process(self, context, gpu_index)
Predicting process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStreamContext |
Shared object among processes. |
required |
gpu_index |
int |
Index of the GPU to use for training |
required |
Source code in ADLStream/adlstream.py
def predicting_process(self, context, gpu_index):
"""Predicting process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"PREDICTING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "PREDICTING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"PREDICTING-PROCESS - There are no enough GPU devices, using CPU",
)
# Wait until model is created and trained for the first time.
while not context.is_new_model_available():
if context.is_time_out():
if not context.get_remaining_test() > 0:
context.log(
"INFO",
"PREDICTING-PROCESS - Time out, no instances were received. Finishing process.",
)
context.set_finished()
return
pass
context.log("INFO", "Starting predictions")
model = None
while True:
X = context.get_test_data()
if not X:
if context.is_time_out():
context.set_finished(True)
context.log("INFO", "PREDICTING-PROCESS - Finished stream")
break
continue
X = np.asarray(X)
if model is None:
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
if context.is_new_model_available():
model.set_weights(context.get_weights())
context.set_new_model_available(False)
predictions = model.predict(X)
context.log(
"INFO", "PREDICTING-PROCESS: {} instances predicted.".format(X.shape[0])
)
context.add_predictions(predictions)
run(self)
Function that run ADLStream.
It run 4 different processes:
- Training process.
- Predicting process.
- Stream generator process.
- Evaluator process.
Source code in ADLStream/adlstream.py
def run(self):
"""Function that run ADLStream.
It run 4 different processes:
- Training process.
- Predicting process.
- Stream generator process.
- Evaluator process.
"""
self.manager.start()
context = self.manager.context(
self.batch_size, self.num_batches_fed, log_file=self.log_file
)
process_stream = Process(target=self.stream_generator.run, args=[context],)
process_train = Process(
target=self.training_process, args=[context, self.train_gpu_index]
)
process_predict = Process(
target=self.predicting_process, args=[context, self.predict_gpu_index]
)
process_evaluator = Process(target=self.evaluator.run, args=[context])
process_stream.start()
process_train.start()
process_predict.start()
process_evaluator.start()
process_stream.join()
process_train.join()
process_predict.join()
process_evaluator.join()
self.manager.shutdown()
training_process(self, context, gpu_index)
Training process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ADLStreamContext |
Shared object among processes. |
required |
gpu_index |
int |
Index of the GPU to use for training |
required |
Source code in ADLStream/adlstream.py
def training_process(self, context, gpu_index):
"""Training process.
Args:
context (ADLStreamContext): Shared object among processes.
gpu_index (int): Index of the GPU to use for training
"""
import tensorflow as tf
from ADLStream.models import create_model
# Select GPU device
gpus = tf.config.experimental.list_physical_devices("GPU")
if len(gpus) > gpu_index:
try:
tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
context.log(
"INFO",
"TRAINING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
)
except RuntimeError as e:
context.log("ERROR", "TRAINING-PROCESS - {}".format(e))
else:
tf.config.experimental.set_visible_devices([], "GPU")
context.log(
"WARNING",
"TRAINING-PROCESS - There are no enough GPU devices, using CPU",
)
model = None
y_shape = None
output_shape = None
while not context.is_finished():
X, y = context.get_training_data()
if not X:
continue
X, y = np.asarray(X), np.asarray(y)
if model is None:
y_shape = y.shape
output_shape = y.reshape(y_shape[0], -1).shape
context.set_output_size(output_shape[-1])
model = create_model(
self.model_architecture,
X.shape,
context.get_output_size(),
self.model_loss,
self.model_optimizer,
**self.model_parameters
)
y = y.reshape((y.shape[0], context.get_output_size()))
context.log(
"INFO",
"TRAINING-PROCESS - Training with the last {} instances".format(
X.shape[0]
),
)
model.fit(X, y, context.get_batch_size(), epochs=1, verbose=0)
context.set_weights(model.get_weights())
context.set_new_model_available(True)
context.log("INFO", "TRAINING-PROCESS - Finished stream")