GPU Tasks
GPU tasks are specialized for managing and executing computationally intensive operations using Graphics Processing Units (GPUs). These tasks are optimized to efficiently handle large-scale data processing, particularly during model training and inference, where high-performance computing is critical.
Purpose
The GPUTaskInitializer class extends the QueueTaskInitializer to provide the necessary framework for managing GPU-accelerated tasks. It handles the dynamic loading and unloading of GPU models, manages worker processes, and ensures that tasks are processed efficiently using GPU resources.
Customization
- To create a custom GPU-based task, subclass GPUTaskInitializer and implement the enqueue, process, and
store_entry methods. These methods define the logic for enqueuing tasks, processing them using GPUs, and storing the results in the database.
Key Features
Efficient Resource Management: Dynamically load and unload GPU models to maximize resource utilization.
Scalable Task Execution: Handle large batches of data, optimizing throughput during model training and inference.
Seamless Integration: Fully integrates with RabbitMQ for distributed task management, ensuring reliable and scalable task execution.
Worker Management: Manages GPU-specific worker processes to handle tasks in parallel.
Extensibility: Abstract methods are provided to allow developers to define custom GPU task logic.
Example Usage
Here is an example of how to subclass GPUTaskInitializer:
from protein_information_system.tasks.gpu import GPUTaskInitializer
class MyCustomGPUTask(GPUTaskInitializer):
def enqueue(self):
# Implementation of the task enqueuing logic for GPU processing
pass
def process(self, target):
# Processing logic for the target data using GPU
pass
def store_entry(self, record):
# Logic to store the processed record in the database
pass
- class protein_information_system.tasks.gpu.GPUTaskInitializer(conf, session_required=True)
Bases:
QueueTaskInitializerThe GPUTaskInitializer class extends QueueTaskInitializer to manage tasks that are specifically designed for GPU-based processing.
This class provides the necessary infrastructure for setting up RabbitMQ queues, coordinating GPU-specific worker processes, and ensuring that tasks are efficiently processed using GPUs.
- stop_event
An event to signal workers and threads to stop.
- Type:
multiprocessing.Event
- model_instances
Dictionary storing loaded models for each type.
- Type:
dict
- tokenizer_instances
Dictionary storing loaded tokenizers for each type.
- Type:
dict
- cleanup()
Clean up resources and stop worker processes.
This method ensures that all worker processes and threads are properly terminated.
- abstract enqueue()
Abstract method to enqueue tasks. Must be overridden by subclasses.
- load_model(model_type)
Load the GPU model into memory.
This method loads the specified model and its tokenizer into memory for processing tasks.
- Parameters:
model_type (str) – The type of model to load.
- abstract process(target)
Abstract method to process tasks. Must be overridden by subclasses.
- publish_task(batch_data, model_type)
Publish a task to the GPU processing queue.
This method serializes the task data and publishes it to the appropriate queue for the specified model type.
- Parameters:
batch_data (any) – The task data to be processed.
model_type (str) – The type of model for which the task is intended.
- run_processor_worker_sequential(model_type)
Run the processor worker sequentially for a specific GPU model type.
This method manages the loading and unloading of models for each task in the GPU processing pipeline, ensuring efficient GPU usage.
- Parameters:
model_type (str) – The type of GPU model to be used for processing.
- setup_rabbitmq()
Set up RabbitMQ by declaring the necessary queues for GPU tasks.
This method connects to RabbitMQ using the provided credentials and declares the necessary queues for each GPU model type, as well as the queue for data insertion.
- Raises:
Exception – If there is an issue setting up RabbitMQ.
- start_workers()
Start the worker processes for GPU task processing and database insertion.
This method spawns worker processes to handle GPU-based task processing and inserts the processed data into the database. It also starts a monitoring thread to oversee the queues.
The method ensures that models are loaded and unloaded as needed to optimize GPU usage.
- abstract store_entry(record)
Abstract method to store processed entries. Must be overridden by subclasses.
- unload_model(model_type)
Unload the GPU model from memory.
This method removes the specified model and its tokenizer from memory.
- Parameters:
model_type (str) – The type of model to unload.