Core

The protea.core package contains all domain logic. It has no dependency on the infrastructure layer: operations receive an open SQLAlchemy session and an emit callback, but they do not manage connections, queues, or transactions themselves. This strict boundary makes every operation independently testable and trivially substitutable.

Contracts

The contracts module defines the interfaces that every operation must satisfy and the shared types used across the entire codebase.

Operation is a structural Protocol — any class that exposes a name string and an execute(session, payload, *, emit) method conforms to it, without needing to inherit from a base class. ProteaPayload is the immutable, strictly-typed Pydantic base class for all operation payloads: strict mode prevents silent type coercion, and frozen configuration prevents accidental mutation after validation. OperationResult is the return value of every execute call; its deferred flag tells BaseWorker that completion will be signalled by child workers rather than immediately. RetryLaterError is raised when a shared resource (e.g. the GPU) is occupied — BaseWorker catches it, resets the job to QUEUED, and re-publishes the message after a configurable delay.

class protea.core.contracts.operation.Operation(*args, **kwargs)

Bases: Protocol

Protocol that every domain operation must satisfy.

Operations are pure domain logic: they receive an open SQLAlchemy session and an emit callback for structured event logging, and return an OperationResult. They must not manage sessions or queue connections.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name: str
class protea.core.contracts.operation.OperationResult(result: dict[str, ~typing.Any]=<factory>, progress_current: int | None = None, progress_total: int | None = None, deferred: bool = False, publish_after_commit: list[tuple[str, ~uuid.UUID]]=<factory>, publish_operations: list[tuple[str, dict[str, ~typing.Any]]]=<factory>)

Bases: object

Return value of every Operation.execute() call.

result is a free-form dict that gets stored in Job.meta and surfaced in the job detail view. progress_current / progress_total are written back to the Job row so the UI can render a progress bar.

deferred — if True, BaseWorker will NOT transition the job to SUCCEEDED. Use this for coordinator operations that delegate work to child jobs; the last child is responsible for marking the parent SUCCEEDED.

publish_after_commit — list of (queue_name, job_id) pairs that BaseWorker will publish to RabbitMQ after the DB commit, guaranteeing workers always find the child job row before they try to claim it.

deferred: bool = False
progress_current: int | None = None
progress_total: int | None = None
publish_after_commit: list[tuple[str, UUID]]
publish_operations: list[tuple[str, dict[str, Any]]]
result: dict[str, Any]
class protea.core.contracts.operation.ProteaPayload

Bases: BaseModel

Immutable, strictly-typed base class for all operation payloads.

Subclass and declare fields using Pydantic annotations. Validation runs automatically via model_validate(dict) — no manual parsing needed. strict=True prevents silent type coercion (e.g. "yes" is not a valid bool).

model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception protea.core.contracts.operation.RetryLaterError(reason: str, delay_seconds: int = 60)

Bases: Exception

Raised by an operation when it cannot run yet but should be retried.

BaseWorker resets the job to QUEUED and the consumer re-publishes the message after delay_seconds, leaving the GPU free for other work.

OperationRegistry is a simple dict-backed mapping from operation name strings to instances. Workers resolve the correct operation at message dispatch time; new operations are registered at process startup in scripts/worker.py without modifying any worker code.

class protea.core.contracts.registry.OperationRegistry

Bases: object

get(name: str) Operation
register(op: Operation) None

Utilities

protea.core.utils provides three shared utilities used across multiple operations.

utcnow() returns a timezone-aware UTC datetime, avoiding the common mistake of calling datetime.utcnow() which returns a naive object. chunks(seq, n) splits any sequence into fixed-size chunks, used by coordinator operations to partition work into batches. UniProtHttpMixin encapsulates all retry logic for the UniProt REST API: exponential backoff with jitter, Retry-After header parsing, and cursor extraction for paginated endpoints. It is mixed into InsertProteinsOperation and FetchUniProtMetadataOperation.

class protea.core.utils.UniProtHttpMixin

Bases: object

Shared HTTP retry logic for UniProt REST API operations.

Requires the subclass __init__ to set:

self._http_requests: int = 0 self._http_retries: int = 0 self._http: requests.Session = requests.Session()

protea.core.utils.chunks(seq: Sequence[Any], n: int) Iterable[Sequence[Any]]

Yield successive n-sized chunks from seq.

protea.core.utils.utcnow() datetime

Return the current UTC datetime (timezone-aware).

Feature engineering

protea.core.feature_engineering enriches each query–reference pair in a prediction result with sequence-level and phylogenetic signals. These features are opt-in: they are computed only when compute_alignments=true and/or compute_taxonomy=true are set in the prediction payload.

Pairwise alignment is computed via the parasail library using the BLOSUM62 substitution matrix with gap-open/extend penalties of 10/1. Both global (Needleman–Wunsch) and local (Smith–Waterman) alignments are run for each pair, producing identity, similarity, raw score, gap percentage, and alignment length for each. These metrics capture sequence similarity beyond what the embedding distance alone encodes, which is especially valuable for distant homologues where embedding geometry may be unreliable.

Taxonomic distance is computed via ete3 and the NCBI taxonomy tree (local SQLite, downloaded on first use). For each (query, reference) pair where taxonomy IDs are available from UniProt metadata, PROTEA finds the lowest common ancestor and computes the edge count through it. Results are cached with an LRU cache keyed by taxon-ID pair to avoid redundant tree traversals across a batch.

Feature engineering utilities for functional annotation enrichment.

Provides pairwise alignment metrics (Needleman–Wunsch and Smith–Waterman) via parasail and taxonomic distance computation via ete3 NCBITaxa.

These features complement the embedding-space KNN distance stored in GOPrediction.distance with sequence-level and phylogenetic signals.

Performance notes: - Alignment is O(m*n) per pair; parasail uses SIMD acceleration. - Taxonomy lookups use an LRU cache over lineage queries (ete3 local SQLite).

First call may trigger a DB download if the ete3 database is absent.

protea.core.feature_engineering.compute_alignment(seq1: str, seq2: str) dict[str, Any]

Compute both NW and SW alignment metrics in one call.

protea.core.feature_engineering.compute_nw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]

Global alignment (Needleman–Wunsch) via parasail/BLOSUM62.

Returns a dict with keys:

identity_nw, similarity_nw, alignment_score_nw, gaps_pct_nw, alignment_length_nw, length_query, length_ref

protea.core.feature_engineering.compute_sw(seq1: str, seq2: str, *, gap_open: int = 10, gap_extend: int = 1) dict[str, Any]

Local alignment (Smith–Waterman) via parasail/BLOSUM62.

Returns a dict with keys:

identity_sw, similarity_sw, alignment_score_sw, gaps_pct_sw, alignment_length_sw

protea.core.feature_engineering.compute_taxonomy(t1_raw: Any, t2_raw: Any) dict[str, Any]

Compute taxonomic distance between two NCBI taxonomy IDs.

Returns a dict with keys:

taxonomic_lca, taxonomic_distance, taxonomic_common_ancestors, taxonomic_relation

Evaluation

protea.core.evaluation implements the CAFA5 evaluation protocol for computing the ground-truth delta between two annotation snapshots.

The module’s central data structure is EvaluationData, a frozen dataclass that holds the NK, LK, PK, known, and pk_known annotation dictionaries. Each dictionary maps a protein accession to a set of GO term IDs.

EvaluationData fields:

  • nk — delta annotations for No-Knowledge proteins (no prior annotations in any namespace at t0).

  • lk — delta annotations for Limited-Knowledge proteins (had annotations in some namespaces but gained new terms in a previously empty namespace).

  • pk — novel annotations for Partial-Knowledge proteins (gained new terms in a namespace where they already had annotations).

  • pk_known — old experimental annotations for PK proteins in the relevant namespaces; passed as -known to cafaeval to exclude them from scoring.

  • known — all old experimental annotations flattened across namespaces; available for download via the reference endpoint.

The public entry point is compute_evaluation_data(session, old_annotation_set_id, new_annotation_set_id, ontology_snapshot_id). It loads the GO DAG for NOT-propagation, builds a per-namespace annotation map for both old and new sets, and classifies each (protein, namespace) pair into NK, LK, or PK. The same protein can appear in multiple categories across different namespaces simultaneously (e.g., LK in CCO and PK in BPO).

CAFA-style evaluation data computation.

This module computes the ground-truth delta between two AnnotationSets (old → new) following the official CAFA5 evaluation protocol:

  1. Experimental evidence codes only (EXP, IDA, IMP, …)

  2. NOT-qualifier annotations are excluded — including their GO descendants propagated transitively through the is_a / part_of DAG.

  3. Classification is per (protein, namespace), not globally per protein:

    NK — protein had NO experimental annotations in ANY namespace at t0.

    All novel terms across all namespaces are ground truth.

    LK — protein had annotations in SOME namespaces at t0, but NOT in

    namespace S. Novel terms in S are ground truth for LK.

    PK — protein had annotations in namespace S at t0 AND gained new terms

    in S at t1. Novel terms in S are ground truth for PK; old terms in S are the -known file for the CAFA evaluator.

Note: the same protein can be LK in one namespace and PK in another simultaneously (e.g. had MFO+BPO at t0, gains CCO → LK in CCO, gains new BPO → PK in BPO).

Output format (matching CAFA evaluator): 2-column TSV, no header.

protein_accession t go_id

class protea.core.evaluation.EvaluationData(nk: dict[str, set[str]]=<factory>, lk: dict[str, set[str]]=<factory>, pk: dict[str, set[str]]=<factory>, known: dict[str, set[str]]=<factory>, pk_known: dict[str, set[str]]=<factory>)

Bases: object

Computed ground-truth delta between two annotation sets.

property delta_proteins: int
known: dict[str, set[str]]
property known_terms_count: int
lk: dict[str, set[str]]
property lk_annotations: int
property lk_proteins: int
nk: dict[str, set[str]]
property nk_annotations: int
property nk_proteins: int
pk: dict[str, set[str]]
property pk_annotations: int
pk_known: dict[str, set[str]]
property pk_proteins: int
stats() dict
protea.core.evaluation.compute_evaluation_data(session: Session, old_annotation_set_id: UUID, new_annotation_set_id: UUID, ontology_snapshot_id: UUID) EvaluationData

Compute NK/LK/PK ground truth following the CAFA5 protocol.

Classification is per (protein, namespace):

NK — protein had no experimental annotations in any namespace at t0. LK — protein had annotations in some namespaces at t0, but not in

namespace S; gained new terms in S → those terms are LK ground truth.

PK — protein had annotations in namespace S at t0 and gained new terms

in S → those novel terms are PK ground truth; old terms in S are stored in pk_known for the cafaeval -known flag.

The same protein can be simultaneously LK in one namespace and PK in another.

Operations

PROTEA ships ten operations, all registered at worker startup in scripts/worker.py. Each operation is a class that implements the Operation protocol: a name string and an execute method. Operations are stateless with respect to infrastructure — they receive a session and emit structured events, but do not open connections or manage transactions.

ping

Smoke-test operation. Returns immediately with a success result. Used to verify end-to-end connectivity between the API, RabbitMQ, and worker processes.

class protea.core.operations.ping.PingOperation(*args, **kwargs)

Bases: Operation

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name: str = 'ping'
insert_proteins

Fetches protein sequences from the UniProt REST API using cursor-based FASTA streaming. Sequences are deduplicated by MD5 hash before upsert; proteins are upserted by accession. Exponential backoff with jitter and Retry-After header handling are provided by UniProtHttpMixin. Isoforms are parsed and stored separately, sharing the canonical sequence where the amino-acid string is identical.

class protea.core.operations.insert_proteins.InsertProteinsOperation

Bases: UniProtHttpMixin, Operation

Fetches protein sequences from UniProt (FASTA) and upserts them into the DB.

Uses cursor-based pagination, exponential backoff with jitter, and MD5-based sequence deduplication. Many proteins can share one Sequence row. Isoforms (<canonical>-<n>) are stored as separate Protein rows grouped by canonical_accession.

UNIPROT_SEARCH_URL = 'https://rest.uniprot.org/uniprotkb/search'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name: str = 'insert_proteins'
class protea.core.operations.insert_proteins.InsertProteinsPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, include_isoforms: bool = True, compressed: bool = False, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/insert_proteins (contact: you@example.org)')

Bases: ProteaPayload

backoff_base_seconds: NonNegativeFloat
backoff_max_seconds: NonNegativeFloat
compressed: bool
include_isoforms: bool
jitter_seconds: NonNegativeFloat
max_retries: PositiveInt
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
page_size: PositiveInt
search_criteria: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
user_agent: str
fetch_uniprot_metadata

Downloads TSV functional annotation data from UniProt and upserts ProteinUniProtMetadata rows keyed by canonical accession. Fields include functional description, EC numbers, pathway membership, and kinetics. Isoforms inherit metadata through the canonical_accession join — no duplicate rows are created.

class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataOperation

Bases: UniProtHttpMixin

Fetches functional annotations from UniProt (TSV) and upserts ProteinUniProtMetadata rows.

One metadata row is stored per canonical accession. Isoforms share the same metadata record. Optionally updates core Protein fields (reviewed, organism, gene_name, length) if they are missing. Uses the same cursor-based pagination and backoff strategy as InsertProteinsOperation.

FIELD_MAP: dict[str, str] = {'absorption': 'Absorption', 'active_site': 'Active site', 'activity_regulation': 'Activity regulation', 'binding_site': 'Binding site', 'catalytic_activity': 'Catalytic activity', 'cofactor': 'Cofactor', 'dna_binding': 'DNA binding', 'ec_number': 'EC number', 'features': 'Features', 'function_cc': 'Function [CC]', 'keywords': 'Keywords', 'kinetics': 'Kinetics', 'pathway': 'Pathway', 'ph_dependence': 'pH dependence', 'redox_potential': 'Redox potential', 'rhea_id': 'Rhea ID', 'site': 'Site', 'temperature_dependence': 'Temperature dependence'}
UNIPROT_SEARCH_URL = 'https://rest.uniprot.org/uniprotkb/search'
execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'fetch_uniprot_metadata'
class protea.core.operations.fetch_uniprot_metadata.FetchUniProtMetadataPayload(*, search_criteria: str, page_size: Annotated[int, Gt(gt=0)] = 500, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, timeout_seconds: Annotated[int, Gt(gt=0)] = 60, compressed: bool = True, max_retries: Annotated[int, Gt(gt=0)] = 6, backoff_base_seconds: Annotated[float, Ge(ge=0.0)] = 0.8, backoff_max_seconds: Annotated[float, Ge(ge=0.0)] = 20.0, jitter_seconds: Annotated[float, Ge(ge=0.0)] = 0.4, user_agent: str = 'PROTEA/fetch_uniprot_metadata (contact: you@example.org)', commit_every_page: bool = True, update_protein_core: bool = True)

Bases: ProteaPayload

backoff_base_seconds: NonNegativeFloat
backoff_max_seconds: NonNegativeFloat
commit_every_page: bool
compressed: bool
jitter_seconds: NonNegativeFloat
max_retries: PositiveInt
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
page_size: PositiveInt
search_criteria: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
update_protein_core: bool
user_agent: str
load_ontology_snapshot

Downloads a GO OBO file and populates OntologySnapshot, GOTerm, and GOTermRelationship rows. The obo_version field carries a unique constraint so that re-importing the same release is idempotent. If a snapshot already exists but its relationships are missing, they are backfilled automatically.

class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotOperation

Bases: object

Downloads a go.obo file and upserts an OntologySnapshot + GOTerm rows.

The data-version: header of the OBO file is used as the canonical version identifier (e.g. releases/2024-01-17). If a snapshot with that version already exists, the operation is a no-op and returns the existing snapshot id — making it safe to re-run.

GO term aspect is mapped from the OBO namespace field: biological_process → P, molecular_function → F, cellular_component → C.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'load_ontology_snapshot'
class protea.core.operations.load_ontology_snapshot.LoadOntologySnapshotPayload(*, obo_url: str, timeout_seconds: int = 120, force_relationships: bool = False)

Bases: ProteaPayload

field_validator(*fields: str, mode: Literal['before', 'after', 'wrap', 'plain'] = 'after', check_fields: bool | None = None, json_schema_input_type: Any = PydanticUndefined) Callable[[Any], Any]
!!! abstract “Usage Documentation”

[field validators](../concepts/validators.md#field-validators)

Decorate methods on the class indicating that they should be used to validate fields.

Example usage: ```python from typing import Any

from pydantic import (

BaseModel, ValidationError, field_validator,

)

class Model(BaseModel):

a: str

@field_validator(‘a’) @classmethod def ensure_foobar(cls, v: Any):

if ‘foobar’ not in v:

raise ValueError(‘“foobar” not found in a’)

return v

print(repr(Model(a=’this is foobar good’))) #> Model(a=’this is foobar good’)

try:

Model(a=’snap’)

except ValidationError as exc_info:

print(exc_info) ‘’’ 1 validation error for Model a

Value error, “foobar” not found in a [type=value_error, input_value=’snap’, input_type=str]

‘’’

```

For more in depth examples, see [Field Validators](../concepts/validators.md#field-validators).

Parameters:
  • field – The first field the field_validator should be called on; this is separate from fields to ensure an error is raised if you don’t pass at least one.

  • *fields – Additional field(s) the field_validator should be called on.

  • mode – Specifies whether to validate the fields before or after validation.

  • check_fields – Whether to check that the fields actually exist on the model.

  • json_schema_input_type – The input type of the function. This is only used to generate the appropriate JSON Schema (in validation mode) and can only specified when mode is either ‘before’, ‘plain’ or ‘wrap’.

Returns:

A decorator that can be used to decorate a function to be used as a field_validator.

Raises:

PydanticUserError

  • If @field_validator is used bare (with no fields). - If the args passed to @field_validator as fields are not strings. - If @field_validator applied to instance methods.

force_relationships: bool
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
obo_url: str
timeout_seconds: int
load_goa_annotations

Bulk-loads a GAF (Gene Association Format) file. Annotations are filtered against canonical accessions present in the database, avoiding orphaned foreign keys. Each batch is committed independently to bound transaction size.

class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsOperation

Bases: object

Streams a GOA GAF file (gzip or plain) and upserts ProteinGOAnnotation rows.

The GAF file is streamed line by line from gaf_url — it is never fully loaded into memory, making it suitable for the full UniProt GAF (hundreds of millions of lines).

Only accessions present in the protein table are stored; all others are silently skipped. The canonical accession set is loaded once from the DB at the start of the operation.

GAF 2.2 columns used (1-indexed, tab-separated):

2 → DB_Object_ID (accession) 5 → GO ID 4 → Qualifier 7 → Evidence Code 6 → DB:Reference 8 → With/From 15 → Assigned By 14 → Date (YYYYMMDD)

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'load_goa_annotations'
class protea.core.operations.load_goa_annotations.LoadGOAAnnotationsPayload(*, ontology_snapshot_id: str, gaf_url: str, source_version: str, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None)

Bases: ProteaPayload

commit_every_page: bool
gaf_url: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
ontology_snapshot_id: str
page_size: PositiveInt
source_version: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
load_quickgo_annotations

Streams GO annotations from the QuickGO bulk download API (paginated TSV). Supports optional ECO→GO evidence code mapping and per-page commits. Filters out annotations whose accessions are not already in the database.

class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsOperation

Bases: object

Streams GO annotations from the QuickGO bulk download API.

Proteins to annotate are determined by the canonical accessions already present in the DB — no external FASTA or accession list is needed.

The QuickGO TSV columns used:

GENE PRODUCT ID → protein accession GO TERM → GO identifier QUALIFIER → qualifier (enables, involved_in…) ECO ID → mapped to evidence_code via eco_mapping_url (or stored raw) REFERENCE → db_reference WITH/FROM → with_from ASSIGNED BY → assigned_by DATE → annotation_date

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'load_quickgo_annotations'
class protea.core.operations.load_quickgo_annotations.LoadQuickGOAnnotationsPayload(*, ontology_snapshot_id: str, source_version: str, quickgo_base_url: str = 'https://www.ebi.ac.uk/QuickGO/services/annotation/downloadSearch', gene_product_ids: list[str] | None = None, use_db_accessions: bool = True, eco_mapping_url: str | None = None, page_size: Annotated[int, Gt(gt=0)] = 10000, timeout_seconds: Annotated[int, Gt(gt=0)] = 300, commit_every_page: bool = True, total_limit: Annotated[int, FieldInfo(annotation=NoneType, required=True, metadata=[Gt(gt=0)])] | None = None, gene_product_batch_size: Annotated[int, Gt(gt=0)] = 200)

Bases: ProteaPayload

Payload for loading GO annotations from the QuickGO bulk download endpoint.

QuickGO returns a single streamed TSV filtered by the canonical accessions already present in the DB — no external accession list is needed.

eco_mapping_url (optional) points to a GAF-ECO mapping file (space-separated: ECO:XXXXXXX  CODE). When provided, ECO IDs are resolved to GO evidence codes (IDA, IEA…) before insertion. If omitted, the raw ECO ID is stored as-is in evidence_code.

commit_every_page: bool
eco_mapping_url: str | None
gene_product_batch_size: PositiveInt
gene_product_ids: list[str] | None
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
ontology_snapshot_id: str
page_size: PositiveInt
quickgo_base_url: str
source_version: str
timeout_seconds: PositiveInt
total_limit: PositiveInt | None
use_db_accessions: bool
compute_embeddings

Coordinator operation that partitions the target sequence set into batches and dispatches one compute_embeddings_batch message per batch to protea.embeddings.batch. The coordinator serialises on the protea.embeddings queue (one at a time) to prevent concurrent model loads from exhausting GPU memory. Batch and write workers scale independently. Returns deferred=True — the parent job is closed by the last write worker.

class protea.core.operations.compute_embeddings.ChunkEmbedding(chunk_index_s: int, chunk_index_e: int | None, vector: ndarray)

Bases: object

One pooled embedding for a contiguous residue span of a sequence.

chunk_index_s and chunk_index_e use the same convention as the DB columns: start is 0-based inclusive, end is exclusive. When chunking is disabled, chunk_index_s=0 and chunk_index_e=None (full sequence).

chunk_index_e: int | None
chunk_index_s: int
vector: ndarray
class protea.core.operations.compute_embeddings.ComputeEmbeddingsBatchOperation

Bases: object

Processes one batch of sequences for a parent compute_embeddings job.

Reads sequence_ids from the payload, loads the model, runs inference, stores embeddings, and atomically increments the parent job’s progress_current. The last batch to finish marks the parent SUCCEEDED.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'compute_embeddings_batch'
class protea.core.operations.compute_embeddings.ComputeEmbeddingsBatchPayload(*, embedding_config_id: str, sequence_ids: list[int], parent_job_id: str, device: str = 'cuda', skip_existing: bool = True, batch_size: Annotated[int, Gt(gt=0)] = 8)

Bases: ProteaPayload

Payload for a single batch operation message published by the coordinator.

batch_size: PositiveInt
device: str
embedding_config_id: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parent_job_id: str
sequence_ids: list[int]
skip_existing: bool
class protea.core.operations.compute_embeddings.ComputeEmbeddingsOperation

Bases: object

Computes protein language model embeddings using a stored EmbeddingConfig.

Backends

  • esm / auto : HuggingFace EsmModel (ESM-2 family). Sequences are processed one at a time. CLS and EOS special tokens are stripped before residue-level pooling.

  • esm3c : ESM SDK ESMC (ESM3c family). No external tokenizer; uses ESMProtein + LogitsConfig. Runs FP16 on GPU; BOS and EOS stripped before pooling.

  • t5 : HuggingFace T5EncoderModel (ProstT5, prot_t5_xl…). Sequences are batched. ProSTT5 mode (<AA2fold> prefix) is auto-detected from model_name. EOS token is included in the residue tensor (consistent with PIS behaviour).

Layer indexing (reverse convention, matches PIS)

layer_indices = [0] → last (most semantic) layer. layer_indices = [1] → penultimate layer. And so on.

Pipeline per sequence

  1. Forward pass → raw hidden states per layer.

  2. Extract layers using reverse indexing; validate against model depth.

  3. Aggregate layers (mean / last / concat).

  4. Optional per-residue L2 normalisation (normalize_residues).

  5. Apply chunking if use_chunking=True.

  6. Pool each chunk (mean / max / mean_max / cls).

  7. Optional final L2 normalisation (normalize).

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult

Coordinator: partition sequences into child jobs and dispatch them.

name = 'compute_embeddings'
class protea.core.operations.compute_embeddings.ComputeEmbeddingsPayload(*, embedding_config_id: str, accessions: list[str] | None = None, query_set_id: str | None = None, sequences_per_job: Annotated[int, Gt(gt=0)] = 64, device: str = 'cuda', skip_existing: bool = True, batch_size: Annotated[int, Gt(gt=0)] = 8)

Bases: ProteaPayload

Coordinator payload: decides which sequences to embed and how to batch.

The coordinator publishes N ephemeral operation messages to protea.embeddings.batch. Any worker consuming that queue picks up a message and runs ComputeEmbeddingsBatchOperation — no child Job rows are created in the DB.

Fields

embedding_config_idstr

UUID of the EmbeddingConfig row that defines the model and strategy.

accessionslist[str] | None

Restrict to proteins with these UniProt accessions. None = all.

sequences_per_jobint

How many sequences each batch message processes. Tune to GPU memory.

devicestr

Device passed down to each batch worker ("cuda" or "cpu").

skip_existingbool

Skip sequences that already have an embedding for this config.

batch_sizeint

Model forward-pass batch size inside each batch worker.

accessions: list[str] | None
batch_size: PositiveInt
device: str
embedding_config_id: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
query_set_id: str | None
sequences_per_job: PositiveInt
skip_existing: bool
class protea.core.operations.compute_embeddings.StoreEmbeddingsOperation

Bases: object

Writes pre-computed embeddings to the DB and updates parent job progress.

Runs on a CPU-only worker (protea.embeddings.write queue) so the GPU worker is free to start the next inference batch immediately.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'store_embeddings'
class protea.core.operations.compute_embeddings.StoreEmbeddingsPayload(*, parent_job_id: str, embedding_config_id: str, skip_existing: bool = True, sequences: list[dict[str, Any]])

Bases: ProteaPayload

Payload published by ComputeEmbeddingsBatchOperation after inference.

embedding_config_id: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parent_job_id: str
sequences: list[dict[str, Any]]
skip_existing: bool
predict_go_terms

Coordinator operation that loads reference embeddings into a process-level float16 cache, partitions the query set into batches, and dispatches one predict_go_terms_batch message per batch to protea.predictions.batch. Feature engineering (alignments, taxonomy) is opt-in via payload flags. Returns deferred=True — the parent job is closed by the last write worker.

class protea.core.operations.predict_go_terms.PredictGOTermsBatchOperation

Bases: object

CPU batch worker: KNN search + GO annotation transfer for one query chunk.

Reference embeddings and their GO annotations are loaded from DB on first access and cached at the process level (_REF_CACHE). Subsequent batch messages reuse the cached reference without any DB round-trip.

Result is published to protea.predictions.write for bulk DB insertion.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'predict_go_terms_batch'
class protea.core.operations.predict_go_terms.PredictGOTermsBatchPayload(*, embedding_config_id: str, annotation_set_id: str, prediction_set_id: str, parent_job_id: str, query_accessions: list[str], query_set_id: str | None = None, limit_per_entry: Annotated[int, Gt(gt=0)] = 5, distance_threshold: float | None = None, search_backend: str = 'numpy', metric: str = 'cosine', faiss_index_type: str = 'Flat', faiss_nlist: int = 100, faiss_nprobe: int = 10, faiss_hnsw_m: int = 32, faiss_hnsw_ef_search: int = 64, compute_alignments: bool = False, compute_taxonomy: bool = False)

Bases: ProteaPayload

Payload for one KNN batch dispatched by the coordinator.

annotation_set_id: str
compute_alignments: bool
compute_taxonomy: bool
distance_threshold: float | None
embedding_config_id: str
faiss_hnsw_m: int
faiss_index_type: str
faiss_nlist: int
faiss_nprobe: int
limit_per_entry: PositiveInt
metric: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parent_job_id: str
prediction_set_id: str
query_accessions: list[str]
query_set_id: str | None
search_backend: str
class protea.core.operations.predict_go_terms.PredictGOTermsOperation

Bases: object

Coordinator: validates, creates PredictionSet, dispatches N batch messages.

Pipeline: 1. Validate EmbeddingConfig / AnnotationSet / OntologySnapshot. 2. Load query accessions that have embeddings (no embedding data — keeps

the coordinator session light).

  1. Create PredictionSet.

  2. Partition accessions into batches and publish to protea.predictions.batch.

The actual KNN search and GO transfer happen inside PredictGOTermsBatchOperation.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'predict_go_terms'
class protea.core.operations.predict_go_terms.PredictGOTermsPayload(*, embedding_config_id: str, annotation_set_id: str, ontology_snapshot_id: str, query_accessions: list[str] | None = None, query_set_id: str | None = None, limit_per_entry: Annotated[int, Gt(gt=0)] = 5, distance_threshold: float | None = None, batch_size: Annotated[int, Gt(gt=0)] = 1024, search_backend: str = 'numpy', metric: str = 'cosine', faiss_index_type: str = 'Flat', faiss_nlist: int = 100, faiss_nprobe: int = 10, faiss_hnsw_m: int = 32, faiss_hnsw_ef_search: int = 64, compute_alignments: bool = False, compute_taxonomy: bool = False)

Bases: ProteaPayload

Payload for the predict_go_terms coordinator job.

annotation_set_id: str
batch_size: PositiveInt
compute_alignments: bool
compute_taxonomy: bool
distance_threshold: float | None
embedding_config_id: str
faiss_hnsw_m: int
faiss_index_type: str
faiss_nlist: int
faiss_nprobe: int
limit_per_entry: PositiveInt
metric: str
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
ontology_snapshot_id: str
query_accessions: list[str] | None
query_set_id: str | None
search_backend: str
class protea.core.operations.predict_go_terms.StorePredictionsOperation

Bases: object

Write worker: bulk-inserts GOPrediction rows and updates parent job progress.

Receives serialized prediction dicts from PredictGOTermsBatchOperation, inserts them into the DB, and atomically increments the parent Job’s progress counter. When the last batch is stored the parent Job is closed as SUCCEEDED.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'store_predictions'
class protea.core.operations.predict_go_terms.StorePredictionsPayload(*, parent_job_id: str, prediction_set_id: str, predictions: list[dict[str, Any]])

Bases: ProteaPayload

Payload carrying serialized prediction dicts to the write worker.

model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

parent_job_id: str
prediction_set_id: str
predictions: list[dict[str, Any]]
generate_evaluation_set

Computes the NK/LK/PK evaluation delta between two annotation sets using the CAFA5 protocol (experimental evidence only, NOT-propagation through the GO DAG, per-namespace classification). Stores an EvaluationSet row with summary statistics. Ground-truth files are generated on-demand by the download endpoints.

class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetOperation

Bases: object

Computes the CAFA evaluation delta between two GOA annotation sets.

Applies experimental evidence code filtering, NOT-qualifier exclusion with GO DAG descendant propagation, and classifies delta proteins into NK/LK.

Stores an EvaluationSet row with summary statistics. The actual ground-truth rows are computed on-demand by the download endpoints using the same logic.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'generate_evaluation_set'
class protea.core.operations.generate_evaluation_set.GenerateEvaluationSetPayload(*, old_annotation_set_id: str, new_annotation_set_id: str)

Bases: ProteaPayload

model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
new_annotation_set_id: str
old_annotation_set_id: str
run_cafa_evaluation

Runs cafaeval for NK, LK, and PK settings against a given prediction set. Downloads the OBO file, writes ground-truth and prediction TSVs, calls cafa_eval() three times (NK and LK without -known, PK with pk_known_terms.tsv as -known), and persists an EvaluationResult row with per-namespace Fmax, precision, recall, τ, and coverage.

class protea.core.operations.run_cafa_evaluation.RunCafaEvaluationOperation

Bases: object

Runs the CAFA evaluator against NK, LK and PK settings.

Steps:
  1. Load EvaluationSet and PredictionSet from DB.

  2. Compute evaluation data (delta NK/LK + known-terms) with full NOT propagation.

  3. Download the OBO file from the ontology snapshot URL.

  4. Write temp files: ground-truth NK/LK, known-terms, predictions (CAFA format).

  5. Call cafa_eval for each setting (NK, LK, PK).

  6. Parse per-namespace Fmax / precision / recall / coverage from results.

  7. Persist an EvaluationResult row with all metrics.

execute(session: Session, payload: dict[str, Any], *, emit: Callable[[str, str | None, dict[str, Any], Literal['info', 'warning', 'error']], None]) OperationResult
name = 'run_cafa_evaluation'
class protea.core.operations.run_cafa_evaluation.RunCafaEvaluationPayload(*, evaluation_set_id: str, prediction_set_id: str, max_distance: Annotated[float | None, Ge(ge=0.0), Le(le=2.0)] = None, artifacts_dir: str | None = None)

Bases: ProteaPayload

artifacts_dir: str | None
evaluation_set_id: str
max_distance: float | None
model_config = {'frozen': True, 'strict': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod must_be_non_empty(v: str) str
prediction_set_id: str