Operations¶
An Operation is the fundamental unit of domain logic in PROTEA. Every task
that a worker can execute — from a health-check ping to a full UniProt ingest —
is encapsulated in a class that satisfies the Operation protocol.
The Operation protocol¶
class Operation(Protocol):
name: str
def execute(
self,
session: Session,
payload: dict[str, Any],
*,
emit: EmitFn,
) -> OperationResult: ...
nameA stable string identifier used to route jobs. Must be unique across all registered operations and must match the
operationfield in theJobrow.executeReceives an open SQLAlchemy session, a raw
dictpayload (validated internally), and anemitcallback. Returns anOperationResult. Must not manage sessions, queue connections, or threads.EmitFnType alias for
Callable[[str, str | None, dict[str, Any], Level], None]. Callingemit(event, message, fields, level)writes aJobEventrow in real time, visible on the frontend timeline.OperationResultFrozen dataclass with three fields:
result(stored inJob.meta), and optionalprogress_current/progress_totalwritten back to theJobrow for the progress bar.
Payload validation¶
Every operation defines a payload class that extends ProteaPayload:
class ProteaPayload(BaseModel, frozen=True):
model_config = ConfigDict(strict=True)
ProteaPayload is an immutable, strictly-typed Pydantic v2 base. Strict
mode prevents silent coercions ("yes" is not a valid bool). Each
operation calls MyPayload.model_validate(payload) at the top of
execute() — validation errors surface as FAILED jobs with a clear
error message, before any DB writes occur.
insert_proteins¶
Operation name: insert_proteins — queue: protea.jobs
Fetches protein sequences from the UniProt REST API in FASTA format and
upserts them into the protein and sequence tables.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
Raw UniProt query string. Example: |
|
|
Results per page (1 – ∞). Larger values reduce round-trips. |
|
|
Stop after this many records (useful for testing). |
|
|
HTTP request timeout per page. |
|
|
Append |
|
|
Request gzip-compressed responses. |
|
|
Maximum retry attempts per page before raising. |
|
|
Exponential backoff base (seconds). |
|
|
Maximum wait between retries (seconds). |
|
|
Random jitter added to each backoff wait. |
|
PROTEA/insert_proteins … |
|
Execution flow¶
1. validate payload (Pydantic)
2. for each FASTA page from UniProt:
a. parse FASTA → list of records (accession, sequence, metadata)
b. compute MD5 hash per sequence
c. bulk-load existing Sequence rows by hash (chunks of 5 000)
d. INSERT missing Sequence rows → obtain IDs
e. bulk-load existing Protein rows by accession (chunks of 5 000)
f. INSERT new Protein rows / conservative UPDATE existing rows
g. session.flush() — no commit per page (commit on job success)
h. emit("insert_proteins.page_done", ...)
3. if total_limit reached → emit warning + break
4. emit("insert_proteins.done", ...)
5. return OperationResult(result={counts and timing})
Sequence deduplication¶
Sequence rows are keyed by sequence_hash (MD5 of the amino-acid
string). Many Protein rows can point to the same Sequence row —
sequence_id is deliberately non-unique. This eliminates redundant storage
for identical sequences across species or isoforms.
Conservative protein update¶
For proteins that already exist, insert_proteins applies a
fill-in-blanks policy: it only overwrites a field if the current DB value
is None or empty. Existing non-null values are never overwritten. This
prevents a re-ingestion from degrading data that was enriched by a later step.
Isoform handling¶
Accessions of the form <canonical>-<n> are parsed by
Protein.parse_isoform(). Both the isoform accession and the canonical
accession are stored. is_canonical = False for isoforms;
isoform_index stores the numeric suffix.
fetch_uniprot_metadata¶
Operation name: fetch_uniprot_metadata — queue: protea.jobs
Fetches functional annotations from the UniProt REST API in TSV format and
upserts ProteinUniProtMetadata rows, one per canonical accession.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
Raw UniProt query string. |
|
|
Results per TSV page. |
|
|
Stop after this many rows. |
|
|
HTTP timeout per page. |
|
|
Request gzip-compressed TSV. |
|
|
Maximum retry attempts. |
|
|
Backoff base (seconds). |
|
|
Maximum wait (seconds). |
|
|
Jitter added to backoff. |
|
|
Commit after each page (reduces memory pressure on large ingests). |
|
|
Backfill |
|
PROTEA/fetch_uniprot_metadata … |
|
TSV field mapping¶
The operation requests 25 TSV fields from UniProt and maps them to
ProteinUniProtMetadata columns:
DB column |
UniProt TSV header |
|---|---|
|
Function [CC] |
|
Catalytic activity |
|
EC number |
|
Pathway |
|
Kinetics |
|
Absorption |
|
Active site |
|
Binding site |
|
Cofactor |
|
DNA binding |
|
Activity regulation |
|
pH dependence |
|
Redox potential |
|
Rhea ID |
|
Site |
|
Temperature dependence |
|
Keywords |
|
Features |
Isoform scoping¶
Because ProteinUniProtMetadata is keyed by canonical_accession, all
isoforms of a protein share a single metadata record. The canonical accession
is resolved via Protein.parse_isoform(accession)[0] for each row in the
TSV response.
load_ontology_snapshot¶
Operation name: load_ontology_snapshot — queue: protea.jobs
Downloads a GO OBO file and populates OntologySnapshot + GOTerm +
GOTermRelationship rows. Idempotent: if a snapshot with the same
obo_version already exists and its relationships are present, the
operation returns immediately without writing anything.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
Direct HTTP(S) URL to the |
|
|
HTTP download timeout in seconds. |
|
|
Re-insert relationships even if the snapshot already exists with relationships. |
Execution flow¶
1. validate payload
2. download OBO text (HTTP GET, single request)
3. extract ``data-version`` header → obo_version
4. check DB for existing OntologySnapshot with that obo_version
a. exists + has relationships → skip (idempotent)
b. exists + no relationships → backfill relationships only
c. does not exist → full insert
5. parse OBO stanzas → GOTerm rows (aspect mapped from namespace)
6. session.add_all(go_terms)
7. parse relationship edges → GOTermRelationship rows
8. session.add_all(relationships)
9. emit("load_ontology_snapshot.done", ...)
10. return OperationResult(result={snapshot_id, term_count, rel_count})
load_goa_annotations¶
Operation name: load_goa_annotations — queue: protea.jobs
Streams a GOA GAF 2.2 file (plain or gzip) and bulk-inserts
AnnotationSet + ProteinGOAnnotation rows. Only accessions already
present in the protein table are retained; all others are silently
skipped.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
UUID of the |
|
(required) |
HTTP(S) URL to the GAF file (plain or |
|
(required) |
Human-readable version label stored in |
|
|
Lines buffered per commit cycle. |
|
|
HTTP stream timeout. |
|
|
Commit after each page to bound memory use (recommended for large GAFs). |
|
|
Stop after this many annotation rows (for testing). |
Execution flow¶
1. validate payload; resolve OntologySnapshot and canonical accession set
2. create AnnotationSet row (source="goa")
3. stream GAF lines:
a. skip comment lines (starting with "!")
b. parse 15-column tab-separated record
c. filter against canonical accessions — skip unknown
d. resolve go_term_id from go_id; skip if term unknown in snapshot
e. buffer ProteinGOAnnotation rows
f. flush + commit every page_size rows
4. emit("load_goa_annotations.done", ...)
5. return OperationResult(result={annotation_set_id, inserted, skipped})
load_quickgo_annotations¶
Operation name: load_quickgo_annotations — queue: protea.jobs
Streams GO annotations from the QuickGO bulk download TSV API. Proteins are determined by the canonical accessions already in the DB — no external accession list is needed. Supports optional ECO ID → evidence code mapping, taxon filtering, and aspect filtering.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
UUID of the |
|
(required) |
Version label for |
|
EBI QuickGO |
Base URL for the QuickGO download endpoint. |
|
|
Explicit accession filter; |
|
|
Pull the accession filter from the |
|
|
URL to a GAF-ECO mapping file for evidence code resolution. |
|
|
Rows buffered per commit. |
|
|
Commit after each page. |
|
|
Row cap (for testing). |
|
|
Accessions per QuickGO API request when using |
compute_embeddings¶
Operation name: compute_embeddings — queue: protea.embeddings
(coordinator, serialised; one at a time via RetryLaterError if GPU busy)
Coordinator operation: determines which sequences need embeddings and fans
out ComputeEmbeddingsBatchOperation messages to protea.embeddings.batch.
Does not run GPU inference directly; returns OperationResult(deferred=True)
immediately after publishing batch messages.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
UUID of the |
|
|
List of UniProt accessions to embed; |
|
|
UUID of a |
|
|
Sequences per batch message. Tune to GPU memory. |
|
|
Device for batch workers ( |
|
|
Skip sequences that already have an embedding for this config. |
|
|
Model forward-pass batch size inside each batch worker. |
Execution flow¶
1. resolve embedding config; raise RetryLaterError(delay=60s) if another
embedding job is RUNNING (GPU exclusive lock)
2. resolve sequence IDs from accessions or query_set_id
3. if skip_existing: filter out sequence IDs that already have embeddings
4. partition sequence IDs into batches of sequences_per_job
5. publish N ComputeEmbeddingsBatch messages to protea.embeddings.batch
6. publish N StoreEmbeddings slots to protea.embeddings.write
7. update Job.progress_total = N
8. return OperationResult(deferred=True, result={"batches": N})
Batch and write workers¶
The compute_embeddings_batch (OperationConsumer on protea.embeddings.batch)
runs GPU inference per batch and publishes float32 vectors to protea.embeddings.write.
The store_embeddings (OperationConsumer on protea.embeddings.write) bulk-inserts
SequenceEmbedding rows and atomically increments Job.progress_current,
closing the parent job when all batches are done.
predict_go_terms¶
Operation name: predict_go_terms — queue: protea.jobs
(coordinator; fans out KNN batch workers)
Coordinator operation: loads reference embeddings into a process-level cache,
partitions query proteins into batches, and fans out PredictGOTermsBatch
messages to protea.predictions.batch. Returns OperationResult(deferred=True).
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
UUID of the |
|
(required) |
UUID of the |
|
(required) |
UUID of the |
|
|
List of query protein accessions; |
|
|
UUID of a |
|
|
Maximum GO predictions per query protein. |
|
|
Discard neighbors beyond this distance; |
|
|
Query proteins per batch message. |
|
|
KNN backend: |
|
|
Distance metric ( |
|
|
FAISS index type: |
|
|
Compute NW + SW pairwise alignments (parasail) for each prediction. |
|
|
Compute taxonomic distance (ete3 NCBITaxa) for each prediction. |
Reference cache¶
Reference embeddings are loaded once per (embedding_config_id,
annotation_set_id) pair and stored as a process-level float16 numpy
array (max 1 entry, LRU-evicted on config change). This avoids reloading
hundreds of thousands of vectors for every batch.
Batch and write workers¶
The predict_go_terms_batch (OperationConsumer on protea.predictions.batch)
runs KNN search and GO transfer per batch. The store_predictions
(OperationConsumer on protea.predictions.write) bulk-inserts
GOPrediction rows and atomically increments Job.progress_current.
ping¶
Operation name: ping — queue: protea.ping
Smoke-test operation. Accepts no required payload fields, emits a single
ping.pong event, and returns immediately. Used to verify end-to-end
connectivity of the job queue without touching the protein data tables.
generate_evaluation_set¶
Operation name: generate_evaluation_set — queue: protea.jobs
Computes the CAFA-style evaluation delta between two AnnotationSet rows
(old → new) and stores an EvaluationSet row with summary statistics.
Follows the CAFA5 evaluation protocol exactly.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
UUID of the older annotation set (t0, used as reference). |
|
(required) |
UUID of the newer annotation set (t1, ground truth source). |
Both annotation sets must share the same ontology_snapshot_id; the
operation raises ValueError otherwise.
Execution flow¶
1. validate payload; load AnnotationSet rows; assert same ontology snapshot
2. call compute_evaluation_data() — see protea.core.evaluation:
a. load GO DAG children map (is_a / part_of only)
b. build NOT-propagation exclusion set (negated terms + GO descendants)
c. load experimental annotations for old and new sets, filtered by
exclusion set and evidence codes (EXP, IDA, IMP, IGI, IEP, IPI, …)
d. classify per (protein, namespace) into NK / LK / PK:
- NK: protein had NO annotations in any namespace at t0
- LK: protein had annotations in some namespaces but not in namespace S;
novel terms in S are LK ground truth
- PK: protein had annotations in namespace S at t0 and gained new terms
in S; novel terms are PK ground truth, old terms go to pk_known
3. emit stats (delta_proteins, nk/lk/pk counts and annotations)
4. INSERT EvaluationSet row with stats stored in JSONB
5. return OperationResult(result={evaluation_set_id, ...stats})
NK/LK/PK classification note¶
Classification is per (protein, namespace), not globally per protein. A protein that had MFO and BPO annotations at t0 and gains new CCO and BPO annotations at t1 will be LK in CCO (no prior CCO annotations) and PK in BPO (had prior BPO annotations) simultaneously. This matches the CAFA5 evaluation protocol.
Ground truth files are computed on-demand by the download endpoints (not
stored in the DB) using the same compute_evaluation_data logic.
run_cafa_evaluation¶
Operation name: run_cafa_evaluation — queue: protea.jobs
Runs the cafaeval evaluator against NK, LK, and PK ground-truth settings
for a given EvaluationSet × PredictionSet pair. Persists an
EvaluationResult row with per-namespace Fmax, precision, recall, τ, and
coverage for each setting.
Payload fields¶
Field |
Default |
Description |
|---|---|---|
|
(required) |
UUID of the |
|
(required) |
UUID of the |
|
|
Discard predictions with cosine distance above this threshold before
scoring (range 0 – 2). |
|
|
Filesystem path where cafaeval output files (PR curves, TSVs) are
written. |
Execution flow¶
1. load EvaluationSet + PredictionSet + OntologySnapshot from DB
2. compute_evaluation_data() — same delta as generate_evaluation_set
3. download OBO file from snapshot.obo_url to a temp directory
4. write ground-truth TSVs: gt_NK.tsv, gt_LK.tsv, gt_PK.tsv,
known_terms.tsv, pk_known_terms.tsv
5. write predictions in CAFA format (protein \\t go_id \\t score),
score = max(0, 1 - distance), filtered to delta proteins only
6. session.commit() — releases DB connection before cafaeval forks workers
7. for each setting in (NK, LK, PK):
a. NK pass: cafa_eval(obo, predictions/, gt_NK.tsv)
b. LK pass: cafa_eval(obo, predictions/, gt_LK.tsv)
c. PK pass: cafa_eval(obo, predictions/, gt_PK.tsv, exclude=pk_known_terms.tsv)
d. parse per-namespace Fmax / precision / recall / τ / coverage
e. if artifacts_dir: write cafaeval PR curves and metric TSVs
8. INSERT EvaluationResult row with results dict (NK → {BPO, MFO, CCO})
9. return OperationResult(result={evaluation_result_id, results})
PK known-terms¶
For the PK pass, pk_known_terms.tsv is passed to cafa_eval as the
-known (exclude) argument. This file contains all experimental
annotations from the old snapshot for PK proteins in the relevant namespace.
cafaeval uses this to exclude known annotations from scoring — methods
that simply repeat prior annotations receive no credit for PK predictions.
SIGTERM handling¶
cafaeval uses Python multiprocessing.Pool internally. Before each
cafa_eval() call, the operation temporarily resets SIGTERM and
SIGINT handlers to defaults so that forked pool workers can be terminated
cleanly. The original handlers are restored afterwards.
Registering a new operation¶
Create a module under
protea/core/operations/.Define a payload class extending
ProteaPayload.Implement the
Operationprotocol (nameattribute +execute).Register the instance in the worker startup script:
registry.register(MyOperation())Route jobs to the correct queue by setting
queue_namein thePOST /jobsrequest body.
No changes to BaseWorker, the API, or the infrastructure layer are required.