Workers¶
The worker layer bridges the message queue and the domain layer. Workers
are long-running Python processes (one per RabbitMQ queue) that consume
messages and delegate execution to registered operations. They are
transport-agnostic with respect to domain logic: operations are resolved
by name from the OperationRegistry and receive only a database session
and an emit callback.
Base worker¶
BaseWorker is the core execution engine. It implements the two-session
pattern that decouples job claiming from job execution:
- Session 1. Claim.
Loads the job, asserts it is in
QUEUEDstatus, transitions it toRUNNING, writes ajob.startedevent, and commits. After this commit the job is visible as running to any monitoring tool or frontend. The session is then closed.- Session 2. Execute.
Opens a fresh session, resolves the operation from the registry, and calls
operation.execute(session, payload, emit=emit). On success, transitions the job toSUCCEEDED(or marks it as deferred if the operation returnsOperationResult(deferred=True)). On exception, transitions toFAILED, stores the error class name and message, and re-raises.
The two-session design ensures durability: a crash in the execute phase
leaves the claim committed (RUNNING is visible) while the result is
not, which is the correct observable state. No session is held open across
a long-running GPU inference call.
Three exceptional flows are handled explicitly:
RetryLaterError: the job is reset to
QUEUEDand the consumer re-publishes it afterdelay_seconds. Used by the embedding coordinator when the GPU is already occupied.Parent cancellation: if a child job’s parent was cancelled between claim and execute, the child transitions to
CANCELLEDwithout running.Corrupt execute session: if the execute session fails to commit (e.g. the DB connection drops mid-operation), a fallback session marks the job
FAILEDso it is never permanently stuck inRUNNING.
Worker entry points¶
Workers are started by scripts/worker.py via scripts/manage.sh.
Each process is bound to a single RabbitMQ queue and registers all
operations at startup, making every worker capable of executing any
operation routed to its queue.
# Start the full stack (all workers + API + frontend)
bash scripts/manage.sh start [N]
# Start a single worker manually (for debugging)
poetry run python scripts/worker.py --queue protea.jobs
# Start the stale job reaper (periodic cleanup process)
poetry run python scripts/worker.py --queue reaper
QueueConsumer vs OperationConsumer¶
Two consumer patterns exist in protea/infrastructure/queue/consumer.py,
selected by the queue configuration in scripts/worker.py:
- QueueConsumer
Reads a job UUID from the queue and delegates to
BaseWorker.handle_job(). Creates fullJobrows with status transitions and aJobEventaudit log. Used for queues where observability and traceability matter:protea.ping: smoke testprotea.jobs: ingestion, ontology / annotation loaders, andgenerate_evaluation_setprotea.embeddings: serialised embedding coordinatorprotea.predictions: serialised prediction coordinatorprotea.training: serialised dataset-export coordinator (export_research_dataset)protea.evaluations:run_cafa_evaluationrunner
- OperationConsumer
Reads a raw serialised operation payload from the queue and executes it directly, without a
Jobrow. Used for high-throughput batch queues where creating thousands of child rows per pipeline run would cause significant write contention and table bloat. Progress is tracked exclusively through atomic increments to the parent job’sprogress_currentcounter:protea.embeddings.batch: GPU inference per batchprotea.embeddings.write: bulk pgvector insertprotea.predictions.batch: KNN search + GO transferprotea.predictions.write: bulk GOPrediction insert
Graceful shutdown
protea.workers.shutdown provides the signal-handler setup that
enables graceful shutdown of long-running worker processes. It installs
handlers for SIGTERM and SIGINT that set a shared stop_event,
allowing the main worker loop to drain the current batch and exit cleanly
rather than being killed mid-operation.
Stale job reaper¶
The StaleJobReaper is a periodic background process that scans for jobs
stuck in RUNNING status beyond a configurable timeout (default: 21 600
seconds = 6 hours). It marks them as FAILED with error code
JobTimeout. The reaper runs every 60 seconds and is started via
scripts/worker.py --queue reaper.
See also
Job Lifecycle: the two-session lifecycle and parent-child coordinator pattern that
BaseWorkerimplements.Operations: what these workers actually run.
ADR-002: Two-session worker pattern: design rationale.
ADR-003: Two types of consumer: when each consumer subclass applies.