Queue Tasks

class protein_information_system.tasks.queue.QueueTaskInitializer(conf, session_required=True)

Bases: BaseTaskInitializer

The QueueTaskInitializer class extends BaseTaskInitializer to manage tasks that are distributed and processed via RabbitMQ queues.

It provides the necessary infrastructure to configure RabbitMQ, coordinate worker processes, and ensure that tasks are handled efficiently and reliably.

processes

A list of multiprocessing.Process objects representing the worker processes.

Type:

list

threads

A list of threading.Thread objects for auxiliary tasks like monitoring queues.

Type:

list

stop_event

An event to signal workers and threads to stop.

Type:

multiprocessing.Event

connection_params

Parameters for connecting to the RabbitMQ server.

Type:

pika.ConnectionParameters

callback(ch, method, properties, body)

Handle messages received in the computing queue, process them, and optionally publish to the inserting queue.

property computing_queue

Property to get the name of the computing queue.

consume_messages(channel, stop_event)

Consume messages from the RabbitMQ queue until the stop event is set.

create_rabbitmq_connection()

Create a new RabbitMQ connection.

db_inserter_callback(ch, method, properties, body)

Handle messages received in the inserting queue, storing them in the DB.

delete_all_queues()

Deletes all queues from the RabbitMQ server using the HTTP API.

Raises:

Exception – If an error occurs during queue deletion.

abstract enqueue()

Abstract method to enqueue tasks. Must be overridden by subclasses.

property inserting_queue

Property to get the name of the inserting queue.

monitor_queues()

Dynamically monitors available queues and filters them based on defined patterns. Sends a stop signal if all relevant queues are empty.

post_processing() None

Finalization hook executed after all workers have finished and queues are drained. Subclasses may override to run post-processing steps (e.g., consolidations, reports). Default: no-op.

abstract process(target)

Abstract method to process tasks. Must be overridden by subclasses.

publish_task(data)

Publish a task to the computing queue.

run_db_inserter_worker(stop_event)

Run the database inserter worker: consumes messages and inserts them into DB.

run_processor_worker(stop_event)

Run the processor worker: consumes messages and processes tasks.

setup_rabbitmq()

Set up RabbitMQ by declaring the necessary queues.

start()

Start the task processing pipeline: - Initialize RabbitMQ - Enqueue tasks - Launch workers and wait for completion - Run post-processing hook

start_workers()

Start worker processes for task computing and database insertion.

abstract store_entry(record)

Abstract method to store processed entries. Must be overridden by subclasses.