Skip to content

ADLStream.ADLStream

ADLStream.

This is the main object of the framework. Based on a stream generator and a given deep learning model, it runs the training and predicting process in paralell (ideally in two different GPU) to obtain obtain accurate predictions as soon as an instance is received.

Parameters:

Name Type Description Default
stream_generator ADLStream.data.BaseStreamGenerator

It is in charge of generating new instances from the stream.

required
evaluator ADLStream.evaluator.BaseEvaluator

It will deal with the validation logic.

required
batch_size int

Number of instances per batch.

required
num_batches_fed int

Maximun number of batches to be used for training.

required
model_architecture str

Model architecture to use. Possible options can be found in ADLStream.models.

required
model_loss tf.keras.Loss

Loss function to use. For references, check tf.keras.losses.

required
model_optimizers tf.keras.Optimizer

It defines the training algorithm to use. Fore references, check tf.keras.optimizers. Defaults to "adam".

required
model_parameters dict

It contains all the model-creation parameters. It depends on the model architecture chosen. Defaults to {}.

{}
train_gpu_index int

GPU index to be used fore training. Defaults to 0.

0
predict_gpu_index int

GPU index to be used fore predicting. Defaults to 1.

1
log_file str

Name of log file. If None, log will be printed on screen and will not be saved. If log_file is given, log level is set to "DEBUG". However if None, log level is kept as default. Defaults to None.

None
Source code in ADLStream/adlstream.py
class ADLStream:
    """ADLStream.

    This is the main object of the framework.
    Based on a stream generator and a given deep learning model, it runs the training and 
    predicting process in paralell (ideally in two different GPU) to obtain obtain accurate
    predictions as soon as an instance is received. 

    Parameters:
        stream_generator (ADLStream.data.BaseStreamGenerator): 
            It is in charge of generating new instances from the stream. 
        evaluator (ADLStream.evaluator.BaseEvaluator): 
            It will deal with the validation logic.
        batch_size (int): Number of instances per batch.
        num_batches_fed (int): Maximun number of batches to be used for training.
        model_architecture (str): Model architecture to use.
            Possible options can be found in ADLStream.models.
        model_loss (tf.keras.Loss): Loss function to use.
            For references, check tf.keras.losses.
        model_optimizers (tf.keras.Optimizer, optional): It defines the training algorithm to use.
            Fore references, check tf.keras.optimizers.
            Defaults to "adam".
        model_parameters (dict, optional): It contains all the model-creation parameters.
            It depends on the model architecture chosen.
            Defaults to {}.
        train_gpu_index (int, optional): GPU index to be used fore training.
            Defaults to 0.
        predict_gpu_index (int, optional): GPU index to be used fore predicting.
            Defaults to 1.
        log_file (str, optional): Name of log file. 
            If None, log will be printed on screen and will not be saved. 
            If log_file is given, log level is set to "DEBUG". However if None,
            log level is kept as default.
            Defaults to None.

    """

    def __init__(
        self,
        stream_generator,
        evaluator,
        batch_size,
        num_batches_fed,
        model_architecture,
        model_loss,
        model_optimizer="adam",
        model_parameters={},
        train_gpu_index=0,
        predict_gpu_index=1,
        log_file=None,
    ):
        self.stream_generator = stream_generator
        self.evaluator = evaluator
        self.batch_size = batch_size
        self.num_batches_fed = num_batches_fed
        self.model_architecture = model_architecture
        self.model_loss = model_loss
        self.model_optimizer = model_optimizer
        self.model_parameters = model_parameters
        self.train_gpu_index = train_gpu_index
        self.predict_gpu_index = predict_gpu_index
        self.log_file = log_file

        self.manager = ADLStreamManager()

    def training_process(self, context, gpu_index):
        """Training process.

        Args:
            context (ADLStreamContext): Shared object among processes.
            gpu_index (int): Index of the GPU to use for training
        """
        import tensorflow as tf
        from ADLStream.models import create_model

        # Select GPU device
        gpus = tf.config.experimental.list_physical_devices("GPU")
        if len(gpus) > gpu_index:
            try:
                tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
                context.log(
                    "INFO",
                    "TRAINING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
                )
            except RuntimeError as e:
                context.log("ERROR", "TRAINING-PROCESS - {}".format(e))
        else:
            tf.config.experimental.set_visible_devices([], "GPU")
            context.log(
                "WARNING",
                "TRAINING-PROCESS - There are no enough GPU devices, using CPU",
            )

        model = None
        y_shape = None
        output_shape = None
        while not context.is_finished():
            X, y = context.get_training_data()
            if not X:
                continue

            X, y = np.asarray(X), np.asarray(y)

            if model is None:
                y_shape = y.shape

                output_shape = y.reshape(y_shape[0], -1).shape
                context.set_output_size(output_shape[-1])

                model = create_model(
                    self.model_architecture,
                    X.shape,
                    context.get_output_size(),
                    self.model_loss,
                    self.model_optimizer,
                    **self.model_parameters
                )

            y = y.reshape((y.shape[0], context.get_output_size()))

            context.log(
                "INFO",
                "TRAINING-PROCESS - Training with the last {} instances".format(
                    X.shape[0]
                ),
            )
            model.fit(X, y, context.get_batch_size(), epochs=1, verbose=0)

            context.set_weights(model.get_weights())
            context.set_new_model_available(True)

        context.log("INFO", "TRAINING-PROCESS - Finished stream")

    def predicting_process(self, context, gpu_index):
        """Predicting process.

        Args:
            context (ADLStreamContext): Shared object among processes.
            gpu_index (int): Index of the GPU to use for training
        """
        import tensorflow as tf
        from ADLStream.models import create_model

        # Select GPU device
        gpus = tf.config.experimental.list_physical_devices("GPU")
        if len(gpus) > gpu_index:
            try:
                tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
                context.log(
                    "INFO",
                    "PREDICTING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
                )
            except RuntimeError as e:
                context.log("ERROR", "PREDICTING-PROCESS - {}".format(e))
        else:
            tf.config.experimental.set_visible_devices([], "GPU")
            context.log(
                "WARNING",
                "PREDICTING-PROCESS - There are no enough GPU devices, using CPU",
            )

        # Wait until model is created and trained for the first time.
        while not context.is_new_model_available():
            if context.is_time_out():
                if not context.get_remaining_test() > 0:
                    context.log(
                        "INFO",
                        "PREDICTING-PROCESS - Time out, no instances were received. Finishing process.",
                    )
                    context.set_finished()
                    return
            pass

        context.log("INFO", "Starting predictions")

        model = None
        while True:
            X = context.get_test_data()

            if not X:
                if context.is_time_out():
                    context.set_finished(True)
                    context.log("INFO", "PREDICTING-PROCESS - Finished stream")
                    break
                continue

            X = np.asarray(X)

            if model is None:
                model = create_model(
                    self.model_architecture,
                    X.shape,
                    context.get_output_size(),
                    self.model_loss,
                    self.model_optimizer,
                    **self.model_parameters
                )

            if context.is_new_model_available():
                model.set_weights(context.get_weights())
                context.set_new_model_available(False)

            predictions = model.predict(X)
            context.log(
                "INFO", "PREDICTING-PROCESS: {} instances predicted.".format(X.shape[0])
            )

            context.add_predictions(predictions)

    def run(self):
        """Function that run ADLStream.

        It run 4 different processes:

        - Training process.
        - Predicting process.
        - Stream generator process.
        - Evaluator process.
        """
        self.manager.start()
        context = self.manager.context(
            self.batch_size, self.num_batches_fed, log_file=self.log_file
        )

        process_stream = Process(target=self.stream_generator.run, args=[context],)
        process_train = Process(
            target=self.training_process, args=[context, self.train_gpu_index]
        )
        process_predict = Process(
            target=self.predicting_process, args=[context, self.predict_gpu_index]
        )
        process_evaluator = Process(target=self.evaluator.run, args=[context])

        process_stream.start()
        process_train.start()
        process_predict.start()
        process_evaluator.start()

        process_stream.join()
        process_train.join()
        process_predict.join()
        process_evaluator.join()

        self.manager.shutdown()

predicting_process(self, context, gpu_index)

Predicting process.

Parameters:

Name Type Description Default
context ADLStreamContext

Shared object among processes.

required
gpu_index int

Index of the GPU to use for training

required
Source code in ADLStream/adlstream.py
def predicting_process(self, context, gpu_index):
    """Predicting process.

    Args:
        context (ADLStreamContext): Shared object among processes.
        gpu_index (int): Index of the GPU to use for training
    """
    import tensorflow as tf
    from ADLStream.models import create_model

    # Select GPU device
    gpus = tf.config.experimental.list_physical_devices("GPU")
    if len(gpus) > gpu_index:
        try:
            tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
            context.log(
                "INFO",
                "PREDICTING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
            )
        except RuntimeError as e:
            context.log("ERROR", "PREDICTING-PROCESS - {}".format(e))
    else:
        tf.config.experimental.set_visible_devices([], "GPU")
        context.log(
            "WARNING",
            "PREDICTING-PROCESS - There are no enough GPU devices, using CPU",
        )

    # Wait until model is created and trained for the first time.
    while not context.is_new_model_available():
        if context.is_time_out():
            if not context.get_remaining_test() > 0:
                context.log(
                    "INFO",
                    "PREDICTING-PROCESS - Time out, no instances were received. Finishing process.",
                )
                context.set_finished()
                return
        pass

    context.log("INFO", "Starting predictions")

    model = None
    while True:
        X = context.get_test_data()

        if not X:
            if context.is_time_out():
                context.set_finished(True)
                context.log("INFO", "PREDICTING-PROCESS - Finished stream")
                break
            continue

        X = np.asarray(X)

        if model is None:
            model = create_model(
                self.model_architecture,
                X.shape,
                context.get_output_size(),
                self.model_loss,
                self.model_optimizer,
                **self.model_parameters
            )

        if context.is_new_model_available():
            model.set_weights(context.get_weights())
            context.set_new_model_available(False)

        predictions = model.predict(X)
        context.log(
            "INFO", "PREDICTING-PROCESS: {} instances predicted.".format(X.shape[0])
        )

        context.add_predictions(predictions)

run(self)

Function that run ADLStream.

It run 4 different processes:

  • Training process.
  • Predicting process.
  • Stream generator process.
  • Evaluator process.
Source code in ADLStream/adlstream.py
def run(self):
    """Function that run ADLStream.

    It run 4 different processes:

    - Training process.
    - Predicting process.
    - Stream generator process.
    - Evaluator process.
    """
    self.manager.start()
    context = self.manager.context(
        self.batch_size, self.num_batches_fed, log_file=self.log_file
    )

    process_stream = Process(target=self.stream_generator.run, args=[context],)
    process_train = Process(
        target=self.training_process, args=[context, self.train_gpu_index]
    )
    process_predict = Process(
        target=self.predicting_process, args=[context, self.predict_gpu_index]
    )
    process_evaluator = Process(target=self.evaluator.run, args=[context])

    process_stream.start()
    process_train.start()
    process_predict.start()
    process_evaluator.start()

    process_stream.join()
    process_train.join()
    process_predict.join()
    process_evaluator.join()

    self.manager.shutdown()

training_process(self, context, gpu_index)

Training process.

Parameters:

Name Type Description Default
context ADLStreamContext

Shared object among processes.

required
gpu_index int

Index of the GPU to use for training

required
Source code in ADLStream/adlstream.py
def training_process(self, context, gpu_index):
    """Training process.

    Args:
        context (ADLStreamContext): Shared object among processes.
        gpu_index (int): Index of the GPU to use for training
    """
    import tensorflow as tf
    from ADLStream.models import create_model

    # Select GPU device
    gpus = tf.config.experimental.list_physical_devices("GPU")
    if len(gpus) > gpu_index:
        try:
            tf.config.experimental.set_visible_devices(gpus[gpu_index], "GPU")
            context.log(
                "INFO",
                "TRAINING-PROCESS - GPU device using: {}".format(gpus[gpu_index]),
            )
        except RuntimeError as e:
            context.log("ERROR", "TRAINING-PROCESS - {}".format(e))
    else:
        tf.config.experimental.set_visible_devices([], "GPU")
        context.log(
            "WARNING",
            "TRAINING-PROCESS - There are no enough GPU devices, using CPU",
        )

    model = None
    y_shape = None
    output_shape = None
    while not context.is_finished():
        X, y = context.get_training_data()
        if not X:
            continue

        X, y = np.asarray(X), np.asarray(y)

        if model is None:
            y_shape = y.shape

            output_shape = y.reshape(y_shape[0], -1).shape
            context.set_output_size(output_shape[-1])

            model = create_model(
                self.model_architecture,
                X.shape,
                context.get_output_size(),
                self.model_loss,
                self.model_optimizer,
                **self.model_parameters
            )

        y = y.reshape((y.shape[0], context.get_output_size()))

        context.log(
            "INFO",
            "TRAINING-PROCESS - Training with the last {} instances".format(
                X.shape[0]
            ),
        )
        model.fit(X, y, context.get_batch_size(), epochs=1, verbose=0)

        context.set_weights(model.get_weights())
        context.set_new_model_available(True)

    context.log("INFO", "TRAINING-PROCESS - Finished stream")