Workers¶
The worker layer bridges the message queue and the domain layer. Workers
are long-running Python processes — one per RabbitMQ queue — that consume
messages and delegate execution to registered operations. They are
transport-agnostic with respect to domain logic: operations are resolved
by name from the OperationRegistry and receive only a database session
and an emit callback.
Base worker¶
BaseWorker is the core execution engine. It implements the two-session
pattern that decouples job claiming from job execution:
- Session 1 — Claim
Loads the job, asserts it is in
QUEUEDstatus, transitions it toRUNNING, writes ajob.startedevent, and commits. After this commit the job is visible as running to any monitoring tool or frontend. The session is then closed.- Session 2 — Execute
Opens a fresh session, resolves the operation from the registry, and calls
operation.execute(session, payload, emit=emit). On success, transitions the job toSUCCEEDED(or marks it as deferred if the operation returnsOperationResult(deferred=True)). On exception, transitions toFAILED, stores the error class name and message, and re-raises.
The two-session design ensures durability: a crash in the execute phase
leaves the claim committed (RUNNING is visible) while the result is
not — which is the correct observable state. No session is held open across
a long-running GPU inference call.
Three exceptional flows are handled explicitly:
RetryLaterError: the job is reset to
QUEUEDand the consumer re-publishes it afterdelay_seconds. Used by the embedding coordinator when the GPU is already occupied.Parent cancellation: if a child job’s parent was cancelled between claim and execute, the child transitions to
CANCELLEDwithout running.Corrupt execute session: if the execute session fails to commit (e.g. the DB connection drops mid-operation), a fallback session marks the job
FAILEDso it is never permanently stuck inRUNNING.
- class protea.workers.base_worker.BaseWorker(session_factory: sessionmaker[Session], registry: OperationRegistry, config: WorkerConfig, *, amqp_url: str | None = None)¶
Bases:
objectExecutes queued jobs using a two-session pattern.
Session 1 (claim): transitions the job from QUEUED → RUNNING and commits. Session 2 (execute): resolves the operation, runs it, and transitions to SUCCEEDED or FAILED. Every state change is recorded as a JobEvent row.
This class is transport-agnostic: it receives a job_id and handles the rest. The caller (QueueConsumer) is responsible for acking/nacking.
- handle_job(job_id: UUID) None¶
Claim and execute a single job identified by
job_id.Silently returns if the job does not exist or is not in QUEUED status. Re-raises any exception from the operation after recording FAILED status.
Worker entry points¶
Workers are started by scripts/worker.py via scripts/manage.sh.
Each process is bound to a single RabbitMQ queue and registers all
operations at startup, making every worker capable of executing any
operation routed to its queue.
# Start the full stack (all workers + API + frontend)
bash scripts/manage.sh start [N]
# Start a single worker manually (for debugging)
poetry run python scripts/worker.py protea.jobs
# Run a single queued job by UUID (bypasses RabbitMQ entirely)
poetry run python scripts/run_one_job.py <job-id>
The run_one_job.py script loads the job from the database, executes it
through BaseWorker, and exits. No RabbitMQ connection is required. This
is the recommended way to debug a failing job without re-queuing it.
QueueConsumer vs OperationConsumer¶
Two consumer patterns exist in protea/infrastructure/queue/consumer.py,
selected by the queue configuration in scripts/worker.py:
- QueueConsumer
Reads a job UUID from the queue and delegates to
BaseWorker.handle_job(). Creates fullJobrows with status transitions and aJobEventaudit log. Used for queues where observability and traceability matter:protea.ping— smoke testprotea.jobs— all coordinator operationsprotea.embeddings— serialised embedding coordinator
- OperationConsumer
Reads a raw serialised operation payload from the queue and executes it directly, without a
Jobrow. Used for high-throughput batch queues where creating thousands of child rows per pipeline run would cause significant write contention and table bloat. Progress is tracked exclusively through atomic increments to the parent job’sprogress_currentcounter:protea.embeddings.batch— GPU inference per batchprotea.embeddings.write— bulk pgvector insertprotea.predictions.batch— KNN search + GO transferprotea.predictions.write— bulk GOPrediction insert