Core¶
The protea.core package contains all domain logic. It has no dependency
on the infrastructure layer: operations receive an open SQLAlchemy session
and an emit callback, but they do not manage connections, queues, or
transactions themselves. This strict boundary makes every operation
independently testable and trivially substitutable.
Contracts¶
The contracts module defines the interfaces that every operation must satisfy and the shared types used across the entire codebase.
Operation is a structural Protocol — any class that exposes a name
string and an execute(session, payload, *, emit) method conforms to it,
without needing to inherit from a base class. ProteaPayload is the
immutable, strictly-typed Pydantic base class for all operation payloads:
strict mode prevents silent type coercion, and frozen configuration prevents
accidental mutation after validation. OperationResult is the return value
of every execute call; its deferred flag tells BaseWorker that
completion will be signalled by child workers rather than immediately.
RetryLaterError is raised when a shared resource (e.g. the GPU) is
occupied — BaseWorker catches it, resets the job to QUEUED, and
re-publishes the message after a configurable delay.
- class protea.core.contracts.operation.Operation(*args, **kwargs)¶
Bases:
ProtocolProtocol that every domain operation must satisfy.
Operations are pure domain logic: they receive an open SQLAlchemy session and an
emitcallback for structured event logging, and return anOperationResult. They must not manage sessions or queue connections.- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name: str¶
- class protea.core.contracts.operation.OperationResult(result: dict[str, ~typing.Any]=<factory>, progress_current: int | None = None, progress_total: int | None = None, deferred: bool = False, publish_after_commit: list[tuple[str, ~uuid.UUID]]=<factory>, publish_operations: list[tuple[str, dict[str, ~typing.Any]]]=<factory>)¶
Bases:
objectReturn value of every Operation.execute() call.
resultis a free-form dict that gets stored inJob.metaand surfaced in the job detail view.progress_current/progress_totalare written back to the Job row so the UI can render a progress bar.deferred— if True, BaseWorker will NOT transition the job to SUCCEEDED. Use this for coordinator operations that delegate work to child jobs; the last child is responsible for marking the parent SUCCEEDED.publish_after_commit— list of (queue_name, job_id) pairs that BaseWorker will publish to RabbitMQ after the DB commit, guaranteeing workers always find the child job row before they try to claim it.- deferred: bool = False¶
- progress_current: int | None = None¶
- progress_total: int | None = None¶
- publish_after_commit: list[tuple[str, UUID]]¶
- publish_operations: list[tuple[str, dict[str, Any]]]¶
- result: dict[str, Any]¶
- class protea.core.contracts.operation.ProteaPayload¶
Bases:
BaseModelImmutable, strictly-typed base class for all operation payloads.
Subclass and declare fields using Pydantic annotations. Validation runs automatically via
model_validate(dict)— no manual parsing needed.strict=Trueprevents silent type coercion (e.g."yes"is not a validbool).- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- exception protea.core.contracts.operation.RetryLaterError(reason: str, delay_seconds: int = 60)¶
Bases:
ExceptionRaised by an operation when it cannot run yet but should be retried.
BaseWorker resets the job to QUEUED and the consumer re-publishes the message after
delay_seconds, leaving the GPU free for other work.
OperationRegistry is a simple dict-backed mapping from operation name
strings to instances. Workers resolve the correct operation at message
dispatch time; new operations are registered at process startup in
scripts/worker.py without modifying any worker code.
Utilities¶
protea.core.utils provides three shared utilities used across multiple
operations.
utcnow() returns a timezone-aware UTC datetime, avoiding the common
mistake of calling datetime.utcnow() which returns a naive object.
chunks(seq, n) splits any sequence into fixed-size chunks, used by
coordinator operations to partition work into batches. UniProtHttpMixin
encapsulates all retry logic for the UniProt REST API: exponential backoff
with jitter, Retry-After header parsing, and cursor extraction for
paginated endpoints. It is mixed into InsertProteinsOperation and
FetchUniProtMetadataOperation.
- class protea.core.utils.UniProtHttpMixin¶
Bases:
objectShared HTTP retry logic for UniProt REST API operations.
- Requires the subclass
__init__to set: self._http_requests: int = 0 self._http_retries: int = 0 self._http: requests.Session = requests.Session()
- Requires the subclass
- protea.core.utils.chunks(seq: Sequence[Any], n: int) Iterable[Sequence[Any]]¶
Yield successive n-sized chunks from seq.
- protea.core.utils.utcnow() datetime¶
Return the current UTC datetime (timezone-aware).
KNN search¶
protea.core.knn_search provides the nearest-neighbour search layer used
during GO term prediction. The single public entry point is search_knn(),
which dispatches to one of two backends based on the backend parameter.
The numpy backend computes exact cosine or L2 distances via matrix multiplication. It requires no additional dependencies and is the default. For cosine distance, query and reference matrices are L2-normalised and the distance is computed as \(D = 1 - \cos(\theta) \in [0, 2]\). This is \(O(NQ)\) and is appropriate for reference sets up to approximately 100 000 proteins when embeddings fit in RAM as float16.
The faiss backend wraps the FAISS library and supports three index
types: Flat (exact), IVFFlat (approximate, Voronoi partitioning),
and HNSW (approximate, hierarchical graph). IVFFlat is recommended
for datasets above 100 000 vectors: it restricts search to the nprobe
nearest Voronoi cells, reducing query time from \(O(N)\) to approximately
\(O(\sqrt{N})\) with negligible recall loss at default settings.
Important
KNN search is never performed at the database layer. pgvector index types (HNSW, IVFFlat) are not used. All search happens in Python after loading reference embeddings into a numpy array. See Predict GO terms in the how-to guides.
K-nearest-neighbor search backends for GO term prediction.
Backends¶
- numpy
Exact brute-force cosine or L2 distance via matrix multiplication. No dependencies beyond NumPy. Suitable for reference sets up to ~100K.
- faiss
Wraps the FAISS library (
faiss-cpu). Supports exact (Flat) and approximate (IVFFlat, HNSW) indices. Significantly faster for large reference sets (>100K vectors).
Metric convention¶
Both backends return distances (lower = more similar):
cosine→ D = 1 − cosine_similarity ∈ [0, 2]l2→ D = squared Euclidean distance ∈ [0, ∞)
Returned type¶
search_knn returns:
list[list[tuple[str, float]]]
One inner list per query; each tuple is (ref_accession, distance),
sorted ascending by distance, length ≤ k (may be shorter if
distance_threshold filters them out).
- protea.core.knn_search.search_knn(query_embeddings: ndarray, ref_embeddings: ndarray, ref_accessions: list[str], k: int, *, distance_threshold: float | None = None, backend: str = 'numpy', metric: str = 'cosine', faiss_index_type: str = 'Flat', faiss_nlist: int = 100, faiss_nprobe: int = 10, faiss_hnsw_m: int = 32, faiss_hnsw_ef_search: int = 64) list[list[tuple[str, float]]]¶
Search for the k nearest reference proteins for each query embedding.
- Parameters:
query_embeddings – Shape
(n_queries, dim). Need not be normalised.ref_embeddings – Shape
(n_refs, dim). Need not be normalised.ref_accessions – Length
n_refs. Maps index positions to accession strings.k – Maximum number of neighbours to return per query.
distance_threshold – If set, discard neighbours with distance > threshold.
backend –
"numpy"(exact brute-force) or"faiss".metric –
"cosine"or"l2".faiss_index_type – One of
"Flat","IVFFlat","HNSW"(ignored for numpy).faiss_nlist – Number of Voronoi cells for
IVFFlat.faiss_nprobe – Cells visited at search time for
IVFFlat.faiss_hnsw_m – Connections per node for
HNSW.faiss_hnsw_ef_search – Beam width at search time for
HNSW.
- Returns:
Outer list: one entry per query. Inner list:
(ref_accession, distance)sorted ascending by distance.- Return type:
list[list[tuple[str, float]]]
Feature engineering¶
protea.core.feature_engineering enriches each query–reference pair in a
prediction result with sequence-level and phylogenetic signals. These features
are opt-in: they are computed only when compute_alignments=true and/or
compute_taxonomy=true are set in the prediction payload.
Pairwise alignment is computed via the parasail library using the
BLOSUM62 substitution matrix with gap-open/extend penalties of 10/1. Both
global (Needleman–Wunsch) and local (Smith–Waterman) alignments are run for
each pair, producing identity, similarity, raw score, gap percentage, and
alignment length for each. These metrics capture sequence similarity beyond
what the embedding distance alone encodes, which is especially valuable for
distant homologues where embedding geometry may be unreliable.
Taxonomic distance is computed via ete3 and the NCBI taxonomy tree
(local SQLite, downloaded on first use). For each (query, reference) pair
where taxonomy IDs are available from UniProt metadata, PROTEA finds the
lowest common ancestor and computes the edge count through it. Results are
cached with an LRU cache keyed by taxon-ID pair to avoid redundant tree
traversals across a batch.
Feature engineering utilities for functional annotation enrichment.
Provides pairwise alignment metrics (Needleman–Wunsch and Smith–Waterman) via parasail and taxonomic distance computation via ete3 NCBITaxa.
These features complement the embedding-space KNN distance stored in
GOPrediction.distance with sequence-level and phylogenetic signals.
Performance notes: - Alignment is O(m*n) per pair; parasail uses SIMD acceleration. - Taxonomy lookups use an LRU cache over lineage queries (ete3 local SQLite).
First call may trigger a DB download if the ete3 database is absent.
- protea.core.feature_engineering.compute_alignment(seq1: str, seq2: str) dict[str, Any]¶
Compute both NW and SW alignment metrics in one call.
- protea.core.feature_engineering.compute_nw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]¶
Global alignment (Needleman–Wunsch) via parasail/BLOSUM62.
- Returns a dict with keys:
identity_nw, similarity_nw, alignment_score_nw, gaps_pct_nw, alignment_length_nw, length_query, length_ref
- protea.core.feature_engineering.compute_sw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]¶
Local alignment (Smith–Waterman) via parasail/BLOSUM62.
- Returns a dict with keys:
identity_sw, similarity_sw, alignment_score_sw, gaps_pct_sw, alignment_length_sw
- protea.core.feature_engineering.compute_taxonomy(t1_raw: Any, t2_raw: Any) dict[str, Any]¶
Compute taxonomic distance between two NCBI taxonomy IDs.
- Returns a dict with keys:
taxonomic_lca, taxonomic_distance, taxonomic_common_ancestors, taxonomic_relation
Evaluation¶
protea.core.evaluation implements the CAFA5 evaluation protocol for
computing the ground-truth delta between two annotation snapshots.
The module’s central data structure is EvaluationData, a frozen dataclass
that holds the NK, LK, PK, known, and pk_known annotation dictionaries.
Each dictionary maps a protein accession to a set of GO term IDs.
EvaluationData fields:
nk— delta annotations for No-Knowledge proteins (no prior annotations in any namespace at t0).lk— delta annotations for Limited-Knowledge proteins (had annotations in some namespaces but gained new terms in a previously empty namespace).pk— novel annotations for Partial-Knowledge proteins (gained new terms in a namespace where they already had annotations).pk_known— old experimental annotations for PK proteins in the relevant namespaces; passed as-knowntocafaevalto exclude them from scoring.known— all old experimental annotations flattened across namespaces; available for download via the reference endpoint.
The public entry point is compute_evaluation_data(session,
old_annotation_set_id, new_annotation_set_id, ontology_snapshot_id).
It loads the GO DAG for NOT-propagation, builds a per-namespace annotation
map for both old and new sets, and classifies each (protein, namespace) pair
into NK, LK, or PK. The same protein can appear in multiple categories across
different namespaces simultaneously (e.g., LK in CCO and PK in BPO).
CAFA-style evaluation data computation.
This module computes the ground-truth delta between two AnnotationSets (old → new) following the official CAFA5 evaluation protocol:
Experimental evidence codes only (EXP, IDA, IMP, …)
NOT-qualifier annotations are excluded — including their GO descendants propagated transitively through the is_a / part_of DAG.
Classification is per (protein, namespace), not globally per protein:
- NK — protein had NO experimental annotations in ANY namespace at t0.
All novel terms across all namespaces are ground truth.
- LK — protein had annotations in SOME namespaces at t0, but NOT in
namespace S. Novel terms in S are ground truth for LK.
- PK — protein had annotations in namespace S at t0 AND gained new terms
in S at t1. Novel terms in S are ground truth for PK; old terms in S are the
-knownfile for the CAFA evaluator.Note: the same protein can be LK in one namespace and PK in another simultaneously (e.g. had MFO+BPO at t0, gains CCO → LK in CCO, gains new BPO → PK in BPO).
- Output format (matching CAFA evaluator): 2-column TSV, no header.
protein_accession t go_id
- class protea.core.evaluation.EvaluationData(nk: dict[str, set[str]]=<factory>, lk: dict[str, set[str]]=<factory>, pk: dict[str, set[str]]=<factory>, known: dict[str, set[str]]=<factory>, pk_known: dict[str, set[str]]=<factory>)¶
Bases:
objectComputed ground-truth delta between two annotation sets.
- property delta_proteins: int¶
- known: dict[str, set[str]]¶
- property known_terms_count: int¶
- lk: dict[str, set[str]]¶
- property lk_annotations: int¶
- property lk_proteins: int¶
- nk: dict[str, set[str]]¶
- property nk_annotations: int¶
- property nk_proteins: int¶
- pk: dict[str, set[str]]¶
- property pk_annotations: int¶
- pk_known: dict[str, set[str]]¶
- property pk_proteins: int¶
- stats() dict¶
- protea.core.evaluation.compute_evaluation_data(session: Session, old_annotation_set_id: UUID, new_annotation_set_id: UUID, ontology_snapshot_id: UUID) EvaluationData¶
Compute NK/LK/PK ground truth following the CAFA5 protocol.
Classification is per (protein, namespace):
NK — protein had no experimental annotations in any namespace at t0. LK — protein had annotations in some namespaces at t0, but not in
namespace S; gained new terms in S → those terms are LK ground truth.
- PK — protein had annotations in namespace S at t0 and gained new terms
in S → those novel terms are PK ground truth; old terms in S are stored in
pk_knownfor the cafaeval-knownflag.
The same protein can be simultaneously LK in one namespace and PK in another.
Operations¶
PROTEA ships ten operations, all registered at worker startup in
scripts/worker.py. Each operation is a class that implements the
Operation protocol: a name string and an execute method.
Operations are stateless with respect to infrastructure — they receive a
session and emit structured events, but do not open connections or manage
transactions.
- ping
Smoke-test operation. Returns immediately with a success result. Used to verify end-to-end connectivity between the API, RabbitMQ, and worker processes.
- class protea.core.operations.ping.PingOperation(*args, **kwargs)¶
Bases:
Operation- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name: str = 'ping'¶
- insert_proteins
Fetches protein sequences from the UniProt REST API using cursor-based FASTA streaming. Sequences are deduplicated by MD5 hash before upsert; proteins are upserted by accession. Exponential backoff with jitter and
Retry-Afterheader handling are provided byUniProtHttpMixin. Isoforms are parsed and stored separately, sharing the canonical sequence where the amino-acid string is identical.
- class protea.core.operations.insert_proteins.InsertProteinsOperation¶
Bases:
UniProtHttpMixin,OperationFetches protein sequences from UniProt (FASTA) and upserts them into the DB.
Uses cursor-based pagination, exponential backoff with jitter, and MD5-based sequence deduplication. Many proteins can share one Sequence row. Isoforms (
<canonical>-<n>) are stored as separate Protein rows grouped bycanonical_accession.- UNIPROT_SEARCH_URL = 'https://rest.uniprot.org/uniprotkb/search'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name: str = 'insert_proteins'¶
- class protea.core.operations.insert_proteins.InsertProteinsPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, include_isoforms: bool = True, compressed: bool = False, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/insert_proteins (contact: you@example.org)')¶
Bases:
ProteaPayload- backoff_base_seconds: NonNegativeFloat¶
- backoff_max_seconds: NonNegativeFloat¶
- compressed: bool¶
- include_isoforms: bool¶
- jitter_seconds: NonNegativeFloat¶
- max_retries: PositiveInt¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- page_size: PositiveInt¶
- search_criteria: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- user_agent: str¶
- fetch_uniprot_metadata
Downloads TSV functional annotation data from UniProt and upserts
ProteinUniProtMetadatarows keyed by canonical accession. Fields include functional description, EC numbers, pathway membership, and kinetics. Isoforms inherit metadata through thecanonical_accessionjoin — no duplicate rows are created.
- class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataOperation¶
Bases:
UniProtHttpMixinFetches functional annotations from UniProt (TSV) and upserts ProteinUniProtMetadata rows.
One metadata row is stored per canonical accession. Isoforms share the same metadata record. Optionally updates core Protein fields (reviewed, organism, gene_name, length) if they are missing. Uses the same cursor-based pagination and backoff strategy as InsertProteinsOperation.
- FIELD_MAP: dict[str, str] = {'absorption': 'Absorption', 'active_site': 'Active site', 'activity_regulation': 'Activity regulation', 'binding_site': 'Binding site', 'catalytic_activity': 'Catalytic activity', 'cofactor': 'Cofactor', 'dna_binding': 'DNA binding', 'ec_number': 'EC number', 'features': 'Features', 'function_cc': 'Function [CC]', 'keywords': 'Keywords', 'kinetics': 'Kinetics', 'pathway': 'Pathway', 'ph_dependence': 'pH dependence', 'redox_potential': 'Redox potential', 'rhea_id': 'Rhea ID', 'site': 'Site', 'temperature_dependence': 'Temperature dependence'}¶
- UNIPROT_SEARCH_URL = 'https://rest.uniprot.org/uniprotkb/search'¶
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'fetch_uniprot_metadata'¶
- class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, compressed: bool = True, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/fetch_uniprot_metadata (contact: you@example.org)', commit_every_page: bool = True, update_protein_core: bool = True)¶
Bases:
ProteaPayload- backoff_base_seconds: NonNegativeFloat¶
- backoff_max_seconds: NonNegativeFloat¶
- commit_every_page: bool¶
- compressed: bool¶
- jitter_seconds: NonNegativeFloat¶
- max_retries: PositiveInt¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- page_size: PositiveInt¶
- search_criteria: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- update_protein_core: bool¶
- user_agent: str¶
- load_ontology_snapshot
Downloads a GO OBO file and populates
OntologySnapshot,GOTerm, andGOTermRelationshiprows. Theobo_versionfield carries a unique constraint so that re-importing the same release is idempotent. If a snapshot already exists but its relationships are missing, they are backfilled automatically.
- class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotOperation¶
Bases:
objectDownloads a go.obo file and upserts an OntologySnapshot + GOTerm rows.
The
data-version:header of the OBO file is used as the canonical version identifier (e.g.releases/2024-01-17). If a snapshot with that version already exists, the operation is a no-op and returns the existing snapshot id — making it safe to re-run.GO term aspect is mapped from the OBO
namespacefield:biological_process→ P,molecular_function→ F,cellular_component→ C.- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'load_ontology_snapshot'¶
- class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotPayload(*, obo_url: str, timeout_seconds: int = 120, force_relationships: bool = False)¶
Bases:
ProteaPayload- field_validator(*fields: str, mode: Literal['before', 'after', 'wrap', 'plain'] = 'after', check_fields: bool | None = None, json_schema_input_type: Any = PydanticUndefined) Callable[[Any], Any]¶
- !!! abstract “Usage Documentation”
[field validators](../concepts/validators.md#field-validators)
Decorate methods on the class indicating that they should be used to validate fields.
Example usage: ```python from typing import Any
- from pydantic import (
BaseModel, ValidationError, field_validator,
)
- class Model(BaseModel):
a: str
@field_validator(‘a’) @classmethod def ensure_foobar(cls, v: Any):
- if ‘foobar’ not in v:
raise ValueError(‘“foobar” not found in a’)
return v
print(repr(Model(a=’this is foobar good’))) #> Model(a=’this is foobar good’)
- try:
Model(a=’snap’)
- except ValidationError as exc_info:
print(exc_info) ‘’’ 1 validation error for Model a
Value error, “foobar” not found in a [type=value_error, input_value=’snap’, input_type=str]
‘’’
For more in depth examples, see [Field Validators](../concepts/validators.md#field-validators).
- Parameters:
field – The first field the field_validator should be called on; this is separate from fields to ensure an error is raised if you don’t pass at least one.
*fields – Additional field(s) the field_validator should be called on.
mode – Specifies whether to validate the fields before or after validation.
check_fields – Whether to check that the fields actually exist on the model.
json_schema_input_type – The input type of the function. This is only used to generate the appropriate JSON Schema (in validation mode) and can only specified when mode is either ‘before’, ‘plain’ or ‘wrap’.
- Returns:
A decorator that can be used to decorate a function to be used as a field_validator.
- Raises:
PydanticUserError –
If @field_validator is used bare (with no fields). - If the args passed to @field_validator as fields are not strings. - If @field_validator applied to instance methods.
- force_relationships: bool¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- obo_url: str¶
- timeout_seconds: int¶
- load_goa_annotations
Bulk-loads a GAF (Gene Association Format) file. Annotations are filtered against canonical accessions present in the database, avoiding orphaned foreign keys. Each batch is committed independently to bound transaction size.
- class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsOperation¶
Bases:
objectStreams a GOA GAF file (gzip or plain) and upserts ProteinGOAnnotation rows.
The GAF file is streamed line by line from
gaf_url— it is never fully loaded into memory, making it suitable for the full UniProt GAF (hundreds of millions of lines).Only accessions present in the
proteintable are stored; all others are silently skipped. The canonical accession set is loaded once from the DB at the start of the operation.- GAF 2.2 columns used (1-indexed, tab-separated):
2 → DB_Object_ID (accession) 5 → GO ID 4 → Qualifier 7 → Evidence Code 6 → DB:Reference 8 → With/From 15 → Assigned By 14 → Date (YYYYMMDD)
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'load_goa_annotations'¶
- class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsPayload(*, ontology_snapshot_id: str, gaf_url: str, source_version: str, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None)¶
Bases:
ProteaPayload- commit_every_page: bool¶
- gaf_url: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- ontology_snapshot_id: str¶
- page_size: PositiveInt¶
- source_version: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- load_quickgo_annotations
Streams GO annotations from the QuickGO bulk download API (paginated TSV). Supports optional ECO→GO evidence code mapping and per-page commits. Filters out annotations whose accessions are not already in the database.
- class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsOperation¶
Bases:
objectStreams GO annotations from the QuickGO bulk download API.
Proteins to annotate are determined by the canonical accessions already present in the DB — no external FASTA or accession list is needed.
- The QuickGO TSV columns used:
GENE PRODUCT ID → protein accession GO TERM → GO identifier QUALIFIER → qualifier (enables, involved_in…) ECO ID → mapped to evidence_code via eco_mapping_url (or stored raw) REFERENCE → db_reference WITH/FROM → with_from ASSIGNED BY → assigned_by DATE → annotation_date
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'load_quickgo_annotations'¶
- class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsPayload(*, ontology_snapshot_id: str, source_version: str, quickgo_base_url: str = 'https://www.ebi.ac.uk/QuickGO/services/annotation/downloadSearch', gene_product_ids: list[str] | None = None, use_db_accessions: bool = True, eco_mapping_url: str | None = None, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, gene_product_batch_size: Annotated[int, Gt(gt=0)] = 200)¶
Bases:
ProteaPayloadPayload for loading GO annotations from the QuickGO bulk download endpoint.
QuickGO returns a single streamed TSV filtered by the canonical accessions already present in the DB — no external accession list is needed.
eco_mapping_url(optional) points to a GAF-ECO mapping file (space-separated:ECO:XXXXXXX CODE). When provided, ECO IDs are resolved to GO evidence codes (IDA, IEA…) before insertion. If omitted, the raw ECO ID is stored as-is inevidence_code.- commit_every_page: bool¶
- eco_mapping_url: str | None¶
- gene_product_batch_size: PositiveInt¶
- gene_product_ids: list[str] | None¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- ontology_snapshot_id: str¶
- page_size: PositiveInt¶
- quickgo_base_url: str¶
- source_version: str¶
- timeout_seconds: PositiveInt¶
- total_limit: PositiveInt | None¶
- use_db_accessions: bool¶
- compute_embeddings
Coordinator operation that partitions the target sequence set into batches and dispatches one
compute_embeddings_batchmessage per batch toprotea.embeddings.batch. The coordinator serialises on theprotea.embeddingsqueue (one at a time) to prevent concurrent model loads from exhausting GPU memory. Batch and write workers scale independently. Returnsdeferred=True— the parent job is closed by the last write worker.
- class protea.core.operations.compute_embeddings.ChunkEmbedding(chunk_index_s: int, chunk_index_e: int | None, vector: ndarray)¶
Bases:
objectOne pooled embedding for a contiguous residue span of a sequence.
chunk_index_sandchunk_index_euse the same convention as the DB columns: start is 0-based inclusive, end is exclusive. When chunking is disabled,chunk_index_s=0andchunk_index_e=None(full sequence).- chunk_index_e: int | None¶
- chunk_index_s: int¶
- vector: ndarray¶
- class protea.core.operations.compute_embeddings.ComputeEmbeddingsBatchOperation¶
Bases:
objectProcesses one batch of sequences for a parent compute_embeddings job.
Reads
sequence_idsfrom the payload, loads the model, runs inference, stores embeddings, and atomically increments the parent job’sprogress_current. The last batch to finish marks the parent SUCCEEDED.- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'compute_embeddings_batch'¶
- class protea.core.operations.compute_embeddings.ComputeEmbeddingsBatchPayload(*, embedding_config_id: str, sequence_ids: list[int], parent_job_id: str, device: str = 'cuda', skip_existing: bool = True, batch_size: Annotated[int, Gt(gt=0)] = 8)¶
Bases:
ProteaPayloadPayload for a single batch operation message published by the coordinator.
- batch_size: PositiveInt¶
- device: str¶
- embedding_config_id: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parent_job_id: str¶
- sequence_ids: list[int]¶
- skip_existing: bool¶
- class protea.core.operations.compute_embeddings.ComputeEmbeddingsOperation¶
Bases:
objectComputes protein language model embeddings using a stored EmbeddingConfig.
Backends¶
esm / auto : HuggingFace
EsmModel(ESM-2 family). Sequences are processed one at a time. CLS and EOS special tokens are stripped before residue-level pooling.esm3c : ESM SDK
ESMC(ESM3c family). No external tokenizer; usesESMProtein+LogitsConfig. Runs FP16 on GPU; BOS and EOS stripped before pooling.t5 : HuggingFace
T5EncoderModel(ProstT5, prot_t5_xl…). Sequences are batched. ProSTT5 mode (<AA2fold>prefix) is auto-detected frommodel_name. EOS token is included in the residue tensor (consistent with PIS behaviour).
Layer indexing (reverse convention, matches PIS)¶
layer_indices = [0]→ last (most semantic) layer.layer_indices = [1]→ penultimate layer. And so on.Pipeline per sequence¶
Forward pass → raw hidden states per layer.
Extract layers using reverse indexing; validate against model depth.
Aggregate layers (
mean/last/concat).Optional per-residue L2 normalisation (
normalize_residues).Apply chunking if
use_chunking=True.Pool each chunk (
mean/max/mean_max/cls).Optional final L2 normalisation (
normalize).
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
Coordinator: partition sequences into child jobs and dispatch them.
- name = 'compute_embeddings'¶
- class protea.core.operations.compute_embeddings.ComputeEmbeddingsPayload(*, embedding_config_id: str, accessions: list[str] | None = None, query_set_id: str | None = None, sequences_per_job: Annotated[int, Gt(gt=0)] = 64, device: str = 'cuda', skip_existing: bool = True, batch_size: Annotated[int, Gt(gt=0)] = 8)¶
Bases:
ProteaPayloadCoordinator payload: decides which sequences to embed and how to batch.
The coordinator publishes N ephemeral operation messages to
protea.embeddings.batch. Any worker consuming that queue picks up a message and runsComputeEmbeddingsBatchOperation— no child Job rows are created in the DB.Fields¶
- embedding_config_idstr
UUID of the EmbeddingConfig row that defines the model and strategy.
- accessionslist[str] | None
Restrict to proteins with these UniProt accessions. None = all.
- sequences_per_jobint
How many sequences each batch message processes. Tune to GPU memory.
- devicestr
Device passed down to each batch worker (
"cuda"or"cpu").- skip_existingbool
Skip sequences that already have an embedding for this config.
- batch_sizeint
Model forward-pass batch size inside each batch worker.
- accessions: list[str] | None¶
- batch_size: PositiveInt¶
- device: str¶
- embedding_config_id: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- query_set_id: str | None¶
- sequences_per_job: PositiveInt¶
- skip_existing: bool¶
- class protea.core.operations.compute_embeddings.StoreEmbeddingsOperation¶
Bases:
objectWrites pre-computed embeddings to the DB and updates parent job progress.
Runs on a CPU-only worker (protea.embeddings.write queue) so the GPU worker is free to start the next inference batch immediately.
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'store_embeddings'¶
- class protea.core.operations.compute_embeddings.StoreEmbeddingsPayload(*, parent_job_id: str, embedding_config_id: str, skip_existing: bool = True, sequences: list[dict[str, Any]])¶
Bases:
ProteaPayloadPayload published by ComputeEmbeddingsBatchOperation after inference.
- embedding_config_id: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parent_job_id: str¶
- sequences: list[dict[str, Any]]¶
- skip_existing: bool¶
- predict_go_terms
Coordinator operation that loads reference embeddings into a process-level float16 cache, partitions the query set into batches, and dispatches one
predict_go_terms_batchmessage per batch toprotea.predictions.batch. Feature engineering (alignments, taxonomy) is opt-in via payload flags. Returnsdeferred=True— the parent job is closed by the last write worker.
- class protea.core.operations.predict_go_terms.PredictGOTermsBatchOperation¶
Bases:
objectCPU batch worker: KNN search + GO annotation transfer for one query chunk.
Reference embeddings and their GO annotations are loaded from DB on first access and cached at the process level (_REF_CACHE). Subsequent batch messages reuse the cached reference without any DB round-trip.
Result is published to protea.predictions.write for bulk DB insertion.
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'predict_go_terms_batch'¶
- class protea.core.operations.predict_go_terms.PredictGOTermsBatchPayload(*, embedding_config_id: str, annotation_set_id: str, prediction_set_id: str, parent_job_id: str, query_accessions: list[str], query_set_id: str | None = None, limit_per_entry: Annotated[int, Gt(gt=0)] = 5, distance_threshold: float | None = None, search_backend: str = 'numpy', metric: str = 'cosine', faiss_index_type: str = 'Flat', faiss_nlist: int = 100, faiss_nprobe: int = 10, faiss_hnsw_m: int = 32, faiss_hnsw_ef_search: int = 64, compute_alignments: bool = False, compute_taxonomy: bool = False)¶
Bases:
ProteaPayloadPayload for one KNN batch dispatched by the coordinator.
- annotation_set_id: str¶
- compute_alignments: bool¶
- compute_taxonomy: bool¶
- distance_threshold: float | None¶
- embedding_config_id: str¶
- faiss_hnsw_ef_search: int¶
- faiss_hnsw_m: int¶
- faiss_index_type: str¶
- faiss_nlist: int¶
- faiss_nprobe: int¶
- limit_per_entry: PositiveInt¶
- metric: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parent_job_id: str¶
- prediction_set_id: str¶
- query_accessions: list[str]¶
- query_set_id: str | None¶
- search_backend: str¶
- class protea.core.operations.predict_go_terms.PredictGOTermsOperation¶
Bases:
objectCoordinator: validates, creates PredictionSet, dispatches N batch messages.
Pipeline: 1. Validate EmbeddingConfig / AnnotationSet / OntologySnapshot. 2. Load query accessions that have embeddings (no embedding data — keeps
the coordinator session light).
Create PredictionSet.
Partition accessions into batches and publish to protea.predictions.batch.
The actual KNN search and GO transfer happen inside PredictGOTermsBatchOperation.
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'predict_go_terms'¶
- class protea.core.operations.predict_go_terms.PredictGOTermsPayload(*, embedding_config_id: str, annotation_set_id: str, ontology_snapshot_id: str, query_accessions: list[str] | None = None, query_set_id: str | None = None, limit_per_entry: Annotated[int, Gt(gt=0)] = 5, distance_threshold: float | None = None, batch_size: Annotated[int, Gt(gt=0)] = 1024, search_backend: str = 'numpy', metric: str = 'cosine', faiss_index_type: str = 'Flat', faiss_nlist: int = 100, faiss_nprobe: int = 10, faiss_hnsw_m: int = 32, faiss_hnsw_ef_search: int = 64, compute_alignments: bool = False, compute_taxonomy: bool = False)¶
Bases:
ProteaPayloadPayload for the predict_go_terms coordinator job.
- annotation_set_id: str¶
- batch_size: PositiveInt¶
- compute_alignments: bool¶
- compute_taxonomy: bool¶
- distance_threshold: float | None¶
- embedding_config_id: str¶
- faiss_hnsw_ef_search: int¶
- faiss_hnsw_m: int¶
- faiss_index_type: str¶
- faiss_nlist: int¶
- faiss_nprobe: int¶
- limit_per_entry: PositiveInt¶
- metric: str¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- ontology_snapshot_id: str¶
- query_accessions: list[str] | None¶
- query_set_id: str | None¶
- search_backend: str¶
- class protea.core.operations.predict_go_terms.StorePredictionsOperation¶
Bases:
objectWrite worker: bulk-inserts GOPrediction rows and updates parent job progress.
Receives serialized prediction dicts from PredictGOTermsBatchOperation, inserts them into the DB, and atomically increments the parent Job’s progress counter. When the last batch is stored the parent Job is closed as SUCCEEDED.
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'store_predictions'¶
- class protea.core.operations.predict_go_terms.StorePredictionsPayload(*, parent_job_id: str, prediction_set_id: str, predictions: list[dict[str, Any]])¶
Bases:
ProteaPayloadPayload carrying serialized prediction dicts to the write worker.
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- parent_job_id: str¶
- prediction_set_id: str¶
- predictions: list[dict[str, Any]]¶
- generate_evaluation_set
Computes the NK/LK/PK evaluation delta between two annotation sets using the CAFA5 protocol (experimental evidence only, NOT-propagation through the GO DAG, per-namespace classification). Stores an
EvaluationSetrow with summary statistics. Ground-truth files are generated on-demand by the download endpoints.
- class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetOperation¶
Bases:
objectComputes the CAFA evaluation delta between two GOA annotation sets.
Applies experimental evidence code filtering, NOT-qualifier exclusion with GO DAG descendant propagation, and classifies delta proteins into NK/LK.
Stores an EvaluationSet row with summary statistics. The actual ground-truth rows are computed on-demand by the download endpoints using the same logic.
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'generate_evaluation_set'¶
- class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetPayload(*, old_annotation_set_id: str, new_annotation_set_id: str)¶
Bases:
ProteaPayload- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- new_annotation_set_id: str¶
- old_annotation_set_id: str¶
- run_cafa_evaluation
Runs
cafaevalfor NK, LK, and PK settings against a given prediction set. Downloads the OBO file, writes ground-truth and prediction TSVs, callscafa_eval()three times (NK and LK without-known, PK withpk_known_terms.tsvas-known), and persists anEvaluationResultrow with per-namespace Fmax, precision, recall, τ, and coverage.
- class protea.core.operations.run_cafa_evaluation.RunCafaEvaluationOperation¶
Bases:
objectRuns the CAFA evaluator against NK, LK and PK settings.
- Steps:
Load EvaluationSet and PredictionSet from DB.
Compute evaluation data (delta NK/LK + known-terms) with full NOT propagation.
Download the OBO file from the ontology snapshot URL.
Write temp files: ground-truth NK/LK, known-terms, predictions (CAFA format).
Call
cafa_evalfor each setting (NK, LK, PK).Parse per-namespace Fmax / precision / recall / coverage from results.
Persist an EvaluationResult row with all metrics.
- execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult¶
- name = 'run_cafa_evaluation'¶
- class protea.core.operations.run_cafa_evaluation.RunCafaEvaluationPayload(*, evaluation_set_id: str, prediction_set_id: str, max_distance: Annotated[float | None, Ge(ge=0.0), Le(le=2.0)] = None, artifacts_dir: str | None = None)¶
Bases:
ProteaPayload- artifacts_dir: str | None¶
- evaluation_set_id: str¶
- max_distance: float | None¶
- model_config = {'frozen': True, 'strict': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod must_be_non_empty(v: str) str¶
- prediction_set_id: str¶