Infrastructure

The protea.infrastructure package implements the persistence and messaging layer. It is the only package that imports SQLAlchemy, psycopg2, or aio-pika directly. All other layers interact with the database through the session factory and with the queue through the publisher interface.

Settings

protea.infrastructure.settings loads configuration from protea/config/system.yaml and applies environment variable overrides. The two mandatory settings are db_url (PostgreSQL connection string) and amqp_url (RabbitMQ connection string). Both can be overridden at runtime via PROTEA_DB_URL and PROTEA_AMQP_URL environment variables, which takes precedence over the YAML file. This makes the same configuration file usable across local development, CI, and production deployments.

class protea.infrastructure.settings.Settings(db_url: 'str', amqp_url: 'str', artifacts_dir: 'Path')

Bases: object

amqp_url: str
artifacts_dir: Path
db_url: str
protea.infrastructure.settings.load_settings(project_root: Path, *, env_prefix: str = 'PROTEA_') Settings
Load settings from:
  1. protea/config/system.yaml (relative to project root)

  2. environment variables (override YAML values)

Expected env vars:
  • PROTEA_DB_URL

  • PROTEA_AMQP_URL

Session management

session_scope() is the single entry point for all database access in PROTEA. It is a context manager that commits on normal exit and rolls back on any exception, then always closes the session. Workers open and close sessions explicitly rather than relying on this context manager for long-lived operations, but it is used throughout the API layer and in tests.

The build_session_factory() function creates a SQLAlchemy sessionmaker bound to the given database URL. It is called once at application startup and stored on app.state.session_factory, keeping the router free of global state.

protea.infrastructure.session.build_session_factory(db_url: str) sessionmaker[Session]

Create a SQLAlchemy session factory bound to the given database URL.

protea.infrastructure.session.session_scope(factory: sessionmaker[Session]) Iterator[Session]

Context manager that commits on success and rolls back on exception.

ORM models

All models use SQLAlchemy 2.x declarative style with Mapped[] type annotations. The schema is managed by Alembic; migrations are generated via alembic revision --autogenerate and stored under alembic/versions/.

Job and JobEvent

Job is the central entity of the job queue. It implements a five-state machine (QUEUED RUNNING SUCCEEDED | FAILED, or QUEUED CANCELLED). The parent_job_id foreign key links batch child jobs to their coordinator parent. payload and meta are PostgreSQL JSONB columns, allowing arbitrary structured data without schema migrations for new operation types. progress_current and progress_total are updated atomically by write workers to drive the frontend progress bar.

JobEvent is an append-only audit log: rows are written by the emit callback during execution and are never updated or deleted. The frontend renders them as a chronological event timeline.

class protea.infrastructure.orm.models.job.Job(**kwargs)

Bases: Base

created_at: Mapped[datetime]
error_code: Mapped[str | None]
error_message: Mapped[str | None]
events: Mapped[list[JobEvent]]
finished_at: Mapped[datetime | None]
id: Mapped[UUID]
meta: Mapped[dict[str, Any]]
operation: Mapped[str]
parent_job_id: Mapped[UUID | None]
payload: Mapped[dict[str, Any]]
progress_current: Mapped[int | None]
progress_total: Mapped[int | None]
queue_name: Mapped[str]
started_at: Mapped[datetime | None]
status: Mapped[JobStatus]
class protea.infrastructure.orm.models.job.JobEvent(**kwargs)

Bases: Base

event: Mapped[str]
fields: Mapped[dict[str, Any]]
id: Mapped[int]
job: Mapped[Job]
job_id: Mapped[UUID]
level: Mapped[str]
message: Mapped[str | None]
ts: Mapped[datetime]
class protea.infrastructure.orm.models.job.JobStatus(*values)

Bases: StrEnum

CANCELLED = 'cancelled'
FAILED = 'failed'
QUEUED = 'queued'
RUNNING = 'running'
SUCCEEDED = 'succeeded'
protea.infrastructure.orm.models.job.utcnow() datetime

Protein and Sequence

Sequence stores unique amino-acid strings, deduplicated by MD5 hash. Multiple Protein rows (canonical accessions and isoforms) may reference the same Sequence row, preventing redundant embedding computation for sequences that appear under different accessions.

Protein stores one row per UniProt accession, including isoforms (<canonical>-<n>). The canonical_accession field groups isoforms together, and the view-only relationship to ProteinUniProtMetadata is joined on this field rather than a foreign key, so metadata rows are not duplicated for each isoform.

class protea.infrastructure.orm.models.protein.protein.Protein(**kwargs)

Bases: Base

One row per UniProt accession, including isoforms (<canonical>-<n>).

Isoforms are grouped by canonical_accession. Many proteins can share the same Sequence row — sequence_id is deliberately non-unique. The uniprot_metadata relationship is view-only, joined by canonical_accession.

accession: Mapped[str]
canonical_accession: Mapped[str]
created_at: Mapped[datetime]
entry_name: Mapped[str | None]
gene_name: Mapped[str | None]
is_canonical: Mapped[bool]
isoform_index: Mapped[int | None]
length: Mapped[int | None]
organism: Mapped[str | None]
static parse_isoform(accession: str) tuple[str, bool, int | None]

Parse isoform accession pattern “<canonical>-<n>”. Returns: (canonical_accession, is_canonical, isoform_index)

reviewed: Mapped[bool | None]
sequence: Mapped[Sequence | None]
sequence_id: Mapped[int | None]
taxonomy_id: Mapped[str | None]
uniprot_metadata: Mapped[ProteinUniProtMetadata | None]
updated_at: Mapped[datetime]
class protea.infrastructure.orm.models.protein.protein_metadata.ProteinUniProtMetadata(**kwargs)

Bases: Base

Raw UniProt metadata stored ONCE per canonical accession.

  • Primary key: canonical_accession (e.g., X6R8D5)

  • Isoforms reuse the same metadata via Protein.canonical_accession.

  • Relationship to Protein is view-only (no FK required).

absorption: Mapped[str | None]
active_site: Mapped[str | None]
activity_regulation: Mapped[str | None]
binding_site: Mapped[str | None]
canonical_accession: Mapped[str]
catalytic_activity: Mapped[str | None]
cofactor: Mapped[str | None]
created_at: Mapped[datetime]
dna_binding: Mapped[str | None]
ec_number: Mapped[str | None]
features: Mapped[str | None]
function_cc: Mapped[str | None]
keywords: Mapped[str | None]
kinetics: Mapped[str | None]
pathway: Mapped[str | None]
ph_dependence: Mapped[str | None]
proteins: Mapped[list[Protein]]
redox_potential: Mapped[str | None]
rhea_id: Mapped[str | None]
site: Mapped[str | None]
temperature_dependence: Mapped[str | None]
updated_at: Mapped[datetime]
class protea.infrastructure.orm.models.sequence.sequence.Sequence(*args, **kwargs)

Bases: Base

Production-grade Sequence schema.

Key points: - Stores raw amino-acid sequence. - Deduplicated by sequence_hash (MD5). - A Sequence can be referenced by MANY proteins (Sequence.proteins).

static compute_hash(seq: str) str
created_at: Mapped[datetime]
id: Mapped[int]
proteins: Mapped[list[Protein]]
sequence: Mapped[str]
sequence_hash: Mapped[str]
updated_at: Mapped[datetime]

GO Ontology

OntologySnapshot records one complete GO OBO release, versioned by the obo_version string from the OBO file header. The unique constraint on obo_version makes repeated imports idempotent. GOTerm stores one row per term per snapshot; GOTermRelationship stores the directed edges of the GO DAG with their relation types (is_a, part_of, regulates, etc.). The /annotations/snapshots/{id}/subgraph endpoint uses BFS over these edges to return ancestor subgraphs for a given set of GO term IDs.

class protea.infrastructure.orm.models.annotation.ontology_snapshot.OntologySnapshot(**kwargs)

Bases: Base

One row per loaded go.obo file.

obo_version is extracted from the data-version: header of the OBO file (e.g. releases/2024-01-17). obo_url is the URL from which the file was downloaded, providing full provenance. Multiple AnnotationSet rows can reference the same snapshot when they were built against the same ontology release.

annotation_sets: Mapped[list[AnnotationSet]]
go_terms: Mapped[list[GOTerm]]
id: Mapped[uuid.UUID]
loaded_at: Mapped[datetime]
obo_url: Mapped[str]
obo_version: Mapped[str]
class protea.infrastructure.orm.models.annotation.go_term.GOTerm(**kwargs)

Bases: Base

One row per GO term per ontology snapshot.

GO terms are scoped to an OntologySnapshot so that the meaning of a term at a specific ontology release is preserved. (go_id, ontology_snapshot_id) is unique — the same GO:XXXXXXX can exist in multiple snapshots with potentially different names or definitions.

annotations: Mapped[list[ProteinGOAnnotation]]
aspect: Mapped[str | None]
definition: Mapped[str | None]
go_id: Mapped[str]
id: Mapped[int]
is_obsolete: Mapped[bool]
name: Mapped[str | None]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
class protea.infrastructure.orm.models.annotation.go_term_relationship.GOTermRelationship(**kwargs)

Bases: Base

Directed edge in the GO DAG for a specific ontology snapshot.

child_go_term_idparent_go_term_id with a given relation_type (is_a, part_of, regulates, negatively_regulates, positively_regulates).

child: Mapped[GOTerm]
child_go_term_id: Mapped[int]
id: Mapped[int]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
parent: Mapped[GOTerm]
parent_go_term_id: Mapped[int]
relation_type: Mapped[str]

Annotation Sets

AnnotationSet groups a batch of protein GO annotations by source (goa or quickgo) and ontology snapshot version. This design allows side-by-side comparison of annotation sets from different sources or dates and ties every prediction result to a specific, versioned annotation input. ProteinGOAnnotation stores all GAF/QuickGO evidence fields verbatim: qualifier, evidence code, assigned-by, database reference, with/from, and annotation date.

class protea.infrastructure.orm.models.annotation.annotation_set.AnnotationSet(**kwargs)

Bases: Base

A versioned batch of GO annotations from a single source.

Each load operation (QuickGO download, GOA GAF ingest, CAFA dataset) creates one AnnotationSet row. This allows multiple temporal snapshots of the same source to coexist and be queried independently.

ontology_snapshot_id pins the exact GO ontology release used to interpret the annotations in this set. job_id links back to the PROTEA job that created it, providing full audit trail.

annotations: Mapped[list[ProteinGOAnnotation]]
created_at: Mapped[datetime]
id: Mapped[uuid.UUID]
job: Mapped[Job | None]
job_id: Mapped[uuid.UUID | None]
meta: Mapped[dict[str, Any]]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
source: Mapped[str]
source_version: Mapped[str | None]
class protea.infrastructure.orm.models.annotation.protein_go_annotation.ProteinGOAnnotation(**kwargs)

Bases: Base

Association between a protein and a GO term within an annotation set.

Fields map directly from GAF/QuickGO columns:

  • qualifier: e.g. enables, involved_in, located_in

  • evidence_code: GO evidence code resolved from ECO (IDA, IEA, ISS…)

  • assigned_by: database that made the annotation (UniProt, RHEA…)

  • db_reference: supporting reference (PMID:…, GO_REF:…)

  • with_from: with/from field from GAF column 8

  • annotation_date: YYYYMMDD string from the source file

annotation_date: Mapped[str | None]
annotation_set: Mapped[AnnotationSet]
annotation_set_id: Mapped[uuid.UUID]
assigned_by: Mapped[str | None]
db_reference: Mapped[str | None]
evidence_code: Mapped[str | None]
go_term: Mapped[GOTerm]
go_term_id: Mapped[int]
id: Mapped[int]
protein: Mapped[Protein]
protein_accession: Mapped[str]
qualifier: Mapped[str | None]
with_from: Mapped[str | None]

Embeddings

EmbeddingConfig defines a reproducible embedding recipe: model identifier, layer selection, pooling strategy, normalisation flags, and chunking parameters. Its UUID primary key is stable; changing any parameter creates a new configuration row. Both SequenceEmbedding rows and PredictionSet rows reference the same EmbeddingConfig, guaranteeing that query and reference embeddings are always comparable.

SequenceEmbedding stores a pgvector VECTOR for each (sequence, config, chunk) triple. When chunking is disabled the chunk index is 0 and the end index is NULL. pgvector is used for storage only; nearest- neighbour queries are performed in Python via protea.core.knn_search.

class protea.infrastructure.orm.models.embedding.embedding_config.EmbeddingConfig(**kwargs)

Bases: Base

Defines a reproducible recipe for computing protein embeddings.

Every SequenceEmbedding row points to exactly one EmbeddingConfig, providing complete provenance: which model, which transformer layers, how layers are aggregated, how the sequence is pooled, and whether the vector is L2-normalised.

Layer indexing convention (reverse, consistent with PIS)

layer_indices use reverse indexing: 0 = last (most semantic) layer, 1 = penultimate, 2 = antepenultimate, etc. This matches the convention used across all backends in PIS / FANTASIA.

Layer aggregation strategies

  • mean : element-wise average across selected layers (dim unchanged).

  • concat : concatenation of all selected layers (dim × n_layers).

Sequence pooling strategies

  • mean : mean over residue representations.

  • max : max over residue representations.

  • cls : CLS/BOS token only (position 0 of raw hidden states).

  • mean_max : concatenation of mean and max (dim × 2).

Model backends

  • esm : HuggingFace EsmModel (ESM-2 family).

  • esm3cESM SDK ESMC (ESM3c family). No external tokenizer.

    Runs FP16 on GPU. CLS and EOS tokens stripped for pooling.

  • t5HuggingFace T5EncoderModel (ProstT5, prot_t5_xl…).

    ProSTT5 mode auto-detected from model_name.

  • auto : falls back to esm.

Normalisation

  • normalize_residuesL2-normalise each residue representation before

    pooling (applied after layer aggregation).

  • normalize : L2-normalise the final pooled vector.

Chunking

Long sequences can be split into overlapping chunks before pooling. Each chunk produces one SequenceEmbedding row identified by chunk_index_s and chunk_index_e.

chunk_overlap: Mapped[int]
chunk_size: Mapped[int]
created_at: Mapped[datetime]
description: Mapped[str | None]
id: Mapped[UUID]
layer_agg: Mapped[str]
layer_indices: Mapped[list[Any]]
max_length: Mapped[int]
model_backend: Mapped[str]
model_name: Mapped[str]
normalize: Mapped[bool]
normalize_residues: Mapped[bool]
pooling: Mapped[str]
use_chunking: Mapped[bool]
class protea.infrastructure.orm.models.embedding.sequence_embedding.SequenceEmbedding(**kwargs)

Bases: Base

Stores a computed embedding for a sequence under a specific EmbeddingConfig.

One row per (sequence, config, chunk_start). When chunking is disabled chunk_index_s=0 and chunk_index_e=None (NULL in the DB), meaning the embedding covers the full sequence. When chunking is enabled, each chunk produces a separate row identified by its start/end residue indices.

Proteins sharing the same amino-acid sequence share one set of embedding rows per config (deduplicated at the Sequence level).

Full traceability: embedding_config records the exact model, transformer layers, aggregation strategy, pooling, normalisation, and chunking parameters used.

chunk_index_e: Mapped[int | None]
chunk_index_s: Mapped[int]
created_at: Mapped[datetime]
embedding: Mapped[Any]
embedding_config: Mapped[EmbeddingConfig]
embedding_config_id: Mapped[uuid.UUID]
embedding_dim: Mapped[int]
id: Mapped[int]
sequence: Mapped[Sequence]
sequence_id: Mapped[int]

Predictions

PredictionSet is the result container for one run of predict_go_terms. It links the query set, embedding configuration, annotation set, and ontology snapshot used, making every prediction set fully reproducible. GOPrediction stores one row per (query protein, GO term, reference protein) triple. The 14 optional feature-engineering columns (alignment statistics and taxonomy fields) are NULL unless the corresponding flags were set in the prediction payload.

class protea.infrastructure.orm.models.embedding.prediction_set.PredictionSet(**kwargs)

Bases: Base

Groups GO predictions from a single prediction run.

Records which EmbeddingConfig was used for similarity search and which AnnotationSet was used as the reference, providing complete traceability for every predicted GO term.

annotation_set: Mapped[AnnotationSet]
annotation_set_id: Mapped[uuid.UUID]
created_at: Mapped[datetime]
distance_threshold: Mapped[float | None]
embedding_config: Mapped[EmbeddingConfig]
embedding_config_id: Mapped[uuid.UUID]
id: Mapped[uuid.UUID]
limit_per_entry: Mapped[int]
meta: Mapped[dict[str, Any]]
ontology_snapshot: Mapped[OntologySnapshot]
ontology_snapshot_id: Mapped[uuid.UUID]
predictions: Mapped[list[GOPrediction]]
query_set: Mapped[QuerySet | None]
query_set_id: Mapped[uuid.UUID | None]
class protea.infrastructure.orm.models.embedding.go_prediction.GOPrediction(**kwargs)

Bases: Base

One predicted GO term for a protein within a prediction set.

The prediction is derived by transferring annotations from the nearest reference protein (ref_protein_accession) in embedding space. The distance field records the cosine distance to that neighbor, which serves as a proxy for prediction confidence (lower = more similar).

alignment_length_nw: Mapped[float | None]
alignment_length_sw: Mapped[float | None]
alignment_score_nw: Mapped[float | None]
alignment_score_sw: Mapped[float | None]
distance: Mapped[float]
evidence_code: Mapped[str | None]
gaps_pct_nw: Mapped[float | None]
gaps_pct_sw: Mapped[float | None]
go_term: Mapped[GOTerm]
go_term_id: Mapped[int]
id: Mapped[int]
identity_nw: Mapped[float | None]
identity_sw: Mapped[float | None]
length_query: Mapped[int | None]
length_ref: Mapped[int | None]
prediction_set: Mapped[PredictionSet]
prediction_set_id: Mapped[uuid.UUID]
protein_accession: Mapped[str]
qualifier: Mapped[str | None]
query_taxonomy_id: Mapped[int | None]
ref_protein_accession: Mapped[str]
ref_taxonomy_id: Mapped[int | None]
similarity_nw: Mapped[float | None]
similarity_sw: Mapped[float | None]
taxonomic_common_ancestors: Mapped[int | None]
taxonomic_distance: Mapped[int | None]
taxonomic_lca: Mapped[int | None]
taxonomic_relation: Mapped[str | None]

Query Sets

QuerySet represents a user-uploaded FASTA dataset. QuerySetEntry stores one row per FASTA entry, preserving the original accession header and linking to the deduplicated Sequence row. If the amino-acid string already exists in the database, the existing Sequence row is reused, avoiding redundant embedding computation.

class protea.infrastructure.orm.models.query.query_set.QuerySet(**kwargs)

Bases: Base

A named collection of sequences uploaded by the user for GO term prediction.

Each uploaded FASTA file creates one QuerySet row. Entries preserve the original accession strings from the FASTA headers and link to the deduplicated Sequence rows. This allows the same physical sequence to appear in multiple query sets without duplication.

created_at: Mapped[datetime]
description: Mapped[str | None]
entries: Mapped[list[QuerySetEntry]]
id: Mapped[UUID]
name: Mapped[str]
class protea.infrastructure.orm.models.query.query_set.QuerySetEntry(**kwargs)

Bases: Base

One sequence within a QuerySet, preserving the original FASTA accession.

accession is the raw identifier from the FASTA header (may not exist in the protein table). sequence_id links to the deduplicated Sequence row used for embedding computation and similarity search.

accession: Mapped[str]
created_at: Mapped[datetime]
id: Mapped[int]
query_set: Mapped[QuerySet]
query_set_id: Mapped[uuid.UUID]
sequence: Mapped[Sequence]
sequence_id: Mapped[int]

Queue

The queue layer provides two classes: QueueConsumer and OperationConsumer.

QueueConsumer reads a job UUID from a RabbitMQ queue and delegates to BaseWorker.handle_job(). It is used for queues where every message corresponds to a tracked Job row: protea.ping, protea.jobs, and protea.embeddings.

OperationConsumer reads a raw serialised operation payload from the queue and executes it directly, without creating a Job row. It is used for high-throughput batch queues (protea.embeddings.batch, protea.embeddings.write, protea.predictions.batch, protea.predictions.write) where creating thousands of child rows would cause queue bloat. Progress is tracked at the parent job level only, via the atomic progress_current increment.

The publisher module provides publish_job() and publish_operation() helpers. Both are called by BaseWorker after the DB commit (not before), guaranteeing that workers always find the DB row before they try to claim it.

protea.infrastructure.queue.publisher.publish_job(amqp_url: str, queue_name: str, job_id: UUID) None

Publish a job dispatch message {"job_id": "<uuid>"} to a queue.

protea.infrastructure.queue.publisher.publish_operation(amqp_url: str, queue_name: str, payload: dict[str, Any]) None

Publish an ephemeral operation message to a queue.

Unlike publish_job, publishes a full payload dict consumed by OperationConsumer. Expected format:

{
    "operation": "<name>",
    "job_id":    "<parent-uuid>",
    "payload":   { ... operation-specific fields ... },
}
class protea.infrastructure.queue.consumer.OperationConsumer(amqp_url: str, queue_name: str, registry: OperationRegistry, session_factory: sessionmaker[Session], *, prefetch_count: int = 1, requeue_on_failure: bool = False)

Bases: object

RabbitMQ consumer for ephemeral operation messages.

Unlike QueueConsumer (which manages the full Job lifecycle via BaseWorker), this consumer handles lightweight operation messages that have no DB Job row of their own. Workers process the operation, write results directly to the DB, and atomically update the parent Job’s progress counter.

Expected message format:

{
    "operation": "<operation-name>",
    "job_id":    "<parent-job-uuid>",
    "payload":   { ... operation-specific fields ... }
}
run() None
class protea.infrastructure.queue.consumer.QueueConsumer(amqp_url: str, queue_name: str, worker: BaseWorker, *, prefetch_count: int = 1, requeue_on_failure: bool = False)

Bases: object

Thin RabbitMQ consumer that delegates job execution to BaseWorker.

Responsibilities are strictly limited to transport concerns: - Connect to RabbitMQ and declare the queue. - Receive messages containing a JSON {"job_id": "<uuid>"} body. - Call BaseWorker.handle_job(job_id) for each valid message. - Ack on success, nack on failure or invalid message. - Graceful shutdown on SIGINT / SIGTERM.

All business logic, DB state transitions, and event emission happen inside BaseWorker — this class knows nothing about operations.

run() None