Infrastructure¶
The protea.infrastructure package implements the persistence and messaging
layer. It is the only package that imports SQLAlchemy, psycopg2, or aio-pika
directly. All other layers interact with the database through the session
factory and with the queue through the publisher interface.
Settings¶
protea.infrastructure.settings loads configuration from
protea/config/system.yaml and applies environment variable overrides.
The two mandatory settings are db_url (PostgreSQL connection string) and
amqp_url (RabbitMQ connection string). Both can be overridden at runtime
via PROTEA_DB_URL and PROTEA_AMQP_URL environment variables, which
takes precedence over the YAML file. This makes the same configuration file
usable across local development, CI, and production deployments.
Session management¶
session_scope() is the single entry point for all database access in
PROTEA. It is a context manager that commits on normal exit and rolls back
on any exception, then always closes the session. Workers open and close
sessions explicitly rather than relying on this context manager for
long-lived operations, but it is used throughout the API layer and in tests.
The build_session_factory() function creates a SQLAlchemy sessionmaker
bound to the given database URL. It is called once at application startup and
stored on app.state.session_factory, keeping the router free of global
state.
- protea.infrastructure.session.build_session_factory(db_url: str) sessionmaker[Session]¶
Create a SQLAlchemy session factory bound to the given database URL.
- protea.infrastructure.session.session_scope(factory: sessionmaker[Session]) Iterator[Session]¶
Context manager that commits on success and rolls back on exception.
ORM models¶
All models use SQLAlchemy 2.x declarative style with Mapped[] type
annotations. The schema is managed by Alembic; migrations are generated via
alembic revision --autogenerate and stored under alembic/versions/.
Job and JobEvent
Job is the central entity of the job queue. It implements a five-state
machine (QUEUED → RUNNING → SUCCEEDED | FAILED, or QUEUED →
CANCELLED). The parent_job_id foreign key links batch child jobs to
their coordinator parent. payload and meta are PostgreSQL JSONB
columns, allowing arbitrary structured data without schema migrations for
new operation types. progress_current and progress_total are updated
atomically by write workers to drive the frontend progress bar.
JobEvent is an append-only audit log: rows are written by the emit
callback during execution and are never updated or deleted. The frontend
renders them as a chronological event timeline.
- class protea.infrastructure.orm.models.job.Job(**kwargs)¶
Bases:
Base- created_at: Mapped[datetime]¶
- error_code: Mapped[str | None]¶
- error_message: Mapped[str | None]¶
- finished_at: Mapped[datetime | None]¶
- id: Mapped[UUID]¶
- meta: Mapped[dict[str, Any]]¶
- operation: Mapped[str]¶
- parent_job_id: Mapped[UUID | None]¶
- payload: Mapped[dict[str, Any]]¶
- progress_current: Mapped[int | None]¶
- progress_total: Mapped[int | None]¶
- queue_name: Mapped[str]¶
- started_at: Mapped[datetime | None]¶
- class protea.infrastructure.orm.models.job.JobEvent(**kwargs)¶
Bases:
Base- event: Mapped[str]¶
- fields: Mapped[dict[str, Any]]¶
- id: Mapped[int]¶
- job_id: Mapped[UUID]¶
- level: Mapped[str]¶
- message: Mapped[str | None]¶
- ts: Mapped[datetime]¶
- class protea.infrastructure.orm.models.job.JobStatus(*values)¶
Bases:
StrEnum- CANCELLED = 'cancelled'¶
- FAILED = 'failed'¶
- QUEUED = 'queued'¶
- RUNNING = 'running'¶
- SUCCEEDED = 'succeeded'¶
- protea.infrastructure.orm.models.job.utcnow() datetime¶
Protein and Sequence
Sequence stores unique amino-acid strings, deduplicated by MD5 hash.
Multiple Protein rows (canonical accessions and isoforms) may reference
the same Sequence row, preventing redundant embedding computation for
sequences that appear under different accessions.
Protein stores one row per UniProt accession, including isoforms
(<canonical>-<n>). The canonical_accession field groups isoforms
together, and the view-only relationship to ProteinUniProtMetadata is
joined on this field rather than a foreign key, so metadata rows are not
duplicated for each isoform.
- class protea.infrastructure.orm.models.protein.protein.Protein(**kwargs)¶
Bases:
BaseOne row per UniProt accession, including isoforms (
<canonical>-<n>).Isoforms are grouped by
canonical_accession. Many proteins can share the sameSequencerow —sequence_idis deliberately non-unique. Theuniprot_metadatarelationship is view-only, joined bycanonical_accession.- accession: Mapped[str]¶
- canonical_accession: Mapped[str]¶
- created_at: Mapped[datetime]¶
- entry_name: Mapped[str | None]¶
- gene_name: Mapped[str | None]¶
- is_canonical: Mapped[bool]¶
- isoform_index: Mapped[int | None]¶
- length: Mapped[int | None]¶
- organism: Mapped[str | None]¶
- static parse_isoform(accession: str) tuple[str, bool, int | None]¶
Parse isoform accession pattern “<canonical>-<n>”. Returns: (canonical_accession, is_canonical, isoform_index)
- reviewed: Mapped[bool | None]¶
- sequence_id: Mapped[int | None]¶
- taxonomy_id: Mapped[str | None]¶
- uniprot_metadata: Mapped[ProteinUniProtMetadata | None]¶
- updated_at: Mapped[datetime]¶
- class protea.infrastructure.orm.models.protein.protein_metadata.ProteinUniProtMetadata(**kwargs)¶
Bases:
BaseRaw UniProt metadata stored ONCE per canonical accession.
Primary key: canonical_accession (e.g., X6R8D5)
Isoforms reuse the same metadata via Protein.canonical_accession.
Relationship to Protein is view-only (no FK required).
- absorption: Mapped[str | None]¶
- active_site: Mapped[str | None]¶
- activity_regulation: Mapped[str | None]¶
- binding_site: Mapped[str | None]¶
- canonical_accession: Mapped[str]¶
- catalytic_activity: Mapped[str | None]¶
- cofactor: Mapped[str | None]¶
- created_at: Mapped[datetime]¶
- dna_binding: Mapped[str | None]¶
- ec_number: Mapped[str | None]¶
- features: Mapped[str | None]¶
- function_cc: Mapped[str | None]¶
- keywords: Mapped[str | None]¶
- kinetics: Mapped[str | None]¶
- pathway: Mapped[str | None]¶
- ph_dependence: Mapped[str | None]¶
- redox_potential: Mapped[str | None]¶
- rhea_id: Mapped[str | None]¶
- site: Mapped[str | None]¶
- temperature_dependence: Mapped[str | None]¶
- updated_at: Mapped[datetime]¶
- class protea.infrastructure.orm.models.sequence.sequence.Sequence(*args, **kwargs)¶
Bases:
BaseProduction-grade Sequence schema.
Key points: - Stores raw amino-acid sequence. - Deduplicated by sequence_hash (MD5). - A Sequence can be referenced by MANY proteins (Sequence.proteins).
- static compute_hash(seq: str) str¶
- created_at: Mapped[datetime]¶
- id: Mapped[int]¶
- sequence: Mapped[str]¶
- sequence_hash: Mapped[str]¶
- updated_at: Mapped[datetime]¶
GO Ontology
OntologySnapshot records one complete GO OBO release, versioned by the
obo_version string from the OBO file header. The unique constraint on
obo_version makes repeated imports idempotent. GOTerm stores one row
per term per snapshot; GOTermRelationship stores the directed edges of
the GO DAG with their relation types (is_a, part_of, regulates,
etc.). The /annotations/snapshots/{id}/subgraph endpoint uses BFS over
these edges to return ancestor subgraphs for a given set of GO term IDs.
- class protea.infrastructure.orm.models.annotation.ontology_snapshot.OntologySnapshot(**kwargs)¶
Bases:
BaseOne row per loaded go.obo file.
obo_versionis extracted from thedata-version:header of the OBO file (e.g.releases/2024-01-17).obo_urlis the URL from which the file was downloaded, providing full provenance. MultipleAnnotationSetrows can reference the same snapshot when they were built against the same ontology release.- annotation_sets: Mapped[list[AnnotationSet]]¶
- id: Mapped[uuid.UUID]¶
- loaded_at: Mapped[datetime]¶
- obo_url: Mapped[str]¶
- obo_version: Mapped[str]¶
- class protea.infrastructure.orm.models.annotation.go_term.GOTerm(**kwargs)¶
Bases:
BaseOne row per GO term per ontology snapshot.
GO terms are scoped to an
OntologySnapshotso that the meaning of a term at a specific ontology release is preserved.(go_id, ontology_snapshot_id)is unique — the same GO:XXXXXXX can exist in multiple snapshots with potentially different names or definitions.- annotations: Mapped[list[ProteinGOAnnotation]]¶
- aspect: Mapped[str | None]¶
- definition: Mapped[str | None]¶
- go_id: Mapped[str]¶
- id: Mapped[int]¶
- is_obsolete: Mapped[bool]¶
- name: Mapped[str | None]¶
- ontology_snapshot: Mapped[OntologySnapshot]¶
- ontology_snapshot_id: Mapped[uuid.UUID]¶
- class protea.infrastructure.orm.models.annotation.go_term_relationship.GOTermRelationship(**kwargs)¶
Bases:
BaseDirected edge in the GO DAG for a specific ontology snapshot.
child_go_term_id→parent_go_term_idwith a givenrelation_type(is_a,part_of,regulates,negatively_regulates,positively_regulates).- child_go_term_id: Mapped[int]¶
- id: Mapped[int]¶
- ontology_snapshot: Mapped[OntologySnapshot]¶
- ontology_snapshot_id: Mapped[uuid.UUID]¶
- parent_go_term_id: Mapped[int]¶
- relation_type: Mapped[str]¶
Annotation Sets
AnnotationSet groups a batch of protein GO annotations by source
(goa or quickgo) and ontology snapshot version. This design allows
side-by-side comparison of annotation sets from different sources or dates
and ties every prediction result to a specific, versioned annotation input.
ProteinGOAnnotation stores all GAF/QuickGO evidence fields verbatim:
qualifier, evidence code, assigned-by, database reference, with/from, and
annotation date.
- class protea.infrastructure.orm.models.annotation.annotation_set.AnnotationSet(**kwargs)¶
Bases:
BaseA versioned batch of GO annotations from a single source.
Each load operation (QuickGO download, GOA GAF ingest, CAFA dataset) creates one
AnnotationSetrow. This allows multiple temporal snapshots of the same source to coexist and be queried independently.ontology_snapshot_idpins the exact GO ontology release used to interpret the annotations in this set.job_idlinks back to the PROTEA job that created it, providing full audit trail.- annotations: Mapped[list[ProteinGOAnnotation]]¶
- created_at: Mapped[datetime]¶
- id: Mapped[uuid.UUID]¶
- job_id: Mapped[uuid.UUID | None]¶
- meta: Mapped[dict[str, Any]]¶
- ontology_snapshot: Mapped[OntologySnapshot]¶
- ontology_snapshot_id: Mapped[uuid.UUID]¶
- source: Mapped[str]¶
- source_version: Mapped[str | None]¶
- class protea.infrastructure.orm.models.annotation.protein_go_annotation.ProteinGOAnnotation(**kwargs)¶
Bases:
BaseAssociation between a protein and a GO term within an annotation set.
Fields map directly from GAF/QuickGO columns:
qualifier: e.g.enables,involved_in,located_inevidence_code: GO evidence code resolved from ECO (IDA, IEA, ISS…)assigned_by: database that made the annotation (UniProt, RHEA…)db_reference: supporting reference (PMID:…, GO_REF:…)with_from: with/from field from GAF column 8annotation_date: YYYYMMDD string from the source file
- annotation_date: Mapped[str | None]¶
- annotation_set: Mapped[AnnotationSet]¶
- annotation_set_id: Mapped[uuid.UUID]¶
- assigned_by: Mapped[str | None]¶
- db_reference: Mapped[str | None]¶
- evidence_code: Mapped[str | None]¶
- go_term_id: Mapped[int]¶
- id: Mapped[int]¶
- protein_accession: Mapped[str]¶
- qualifier: Mapped[str | None]¶
- with_from: Mapped[str | None]¶
Embeddings
EmbeddingConfig defines a reproducible embedding recipe: model identifier,
layer selection, pooling strategy, normalisation flags, and chunking
parameters. Its UUID primary key is stable; changing any parameter creates
a new configuration row. Both SequenceEmbedding rows and PredictionSet
rows reference the same EmbeddingConfig, guaranteeing that query and
reference embeddings are always comparable.
SequenceEmbedding stores a pgvector VECTOR for each
(sequence, config, chunk) triple. When chunking is disabled the chunk index
is 0 and the end index is NULL. pgvector is used for storage only; nearest-
neighbour queries are performed in Python via protea.core.knn_search.
- class protea.infrastructure.orm.models.embedding.embedding_config.EmbeddingConfig(**kwargs)¶
Bases:
BaseDefines a reproducible recipe for computing protein embeddings.
Every
SequenceEmbeddingrow points to exactly oneEmbeddingConfig, providing complete provenance: which model, which transformer layers, how layers are aggregated, how the sequence is pooled, and whether the vector is L2-normalised.Layer indexing convention (reverse, consistent with PIS)¶
layer_indicesuse reverse indexing: 0 = last (most semantic) layer, 1 = penultimate, 2 = antepenultimate, etc. This matches the convention used across all backends in PIS / FANTASIA.Layer aggregation strategies¶
mean: element-wise average across selected layers (dim unchanged).concat: concatenation of all selected layers (dim × n_layers).
Sequence pooling strategies¶
mean: mean over residue representations.max: max over residue representations.cls: CLS/BOS token only (position 0 of raw hidden states).mean_max: concatenation of mean and max (dim × 2).
Model backends¶
esm: HuggingFaceEsmModel(ESM-2 family).esm3cESM SDKESMC(ESM3c family). No external tokenizer.Runs FP16 on GPU. CLS and EOS tokens stripped for pooling.
t5HuggingFaceT5EncoderModel(ProstT5, prot_t5_xl…).ProSTT5 mode auto-detected from
model_name.
auto: falls back toesm.
Normalisation¶
normalize_residuesL2-normalise each residue representation beforepooling (applied after layer aggregation).
normalize: L2-normalise the final pooled vector.
Chunking¶
Long sequences can be split into overlapping chunks before pooling. Each chunk produces one
SequenceEmbeddingrow identified bychunk_index_sandchunk_index_e.- chunk_overlap: Mapped[int]¶
- chunk_size: Mapped[int]¶
- created_at: Mapped[datetime]¶
- description: Mapped[str | None]¶
- id: Mapped[UUID]¶
- layer_agg: Mapped[str]¶
- layer_indices: Mapped[list[Any]]¶
- max_length: Mapped[int]¶
- model_backend: Mapped[str]¶
- model_name: Mapped[str]¶
- normalize: Mapped[bool]¶
- normalize_residues: Mapped[bool]¶
- pooling: Mapped[str]¶
- use_chunking: Mapped[bool]¶
- class protea.infrastructure.orm.models.embedding.sequence_embedding.SequenceEmbedding(**kwargs)¶
Bases:
BaseStores a computed embedding for a sequence under a specific EmbeddingConfig.
One row per (sequence, config, chunk_start). When chunking is disabled
chunk_index_s=0andchunk_index_e=None(NULL in the DB), meaning the embedding covers the full sequence. When chunking is enabled, each chunk produces a separate row identified by its start/end residue indices.Proteins sharing the same amino-acid sequence share one set of embedding rows per config (deduplicated at the
Sequencelevel).Full traceability:
embedding_configrecords the exact model, transformer layers, aggregation strategy, pooling, normalisation, and chunking parameters used.- chunk_index_e: Mapped[int | None]¶
- chunk_index_s: Mapped[int]¶
- created_at: Mapped[datetime]¶
- embedding: Mapped[Any]¶
- embedding_config: Mapped[EmbeddingConfig]¶
- embedding_config_id: Mapped[uuid.UUID]¶
- embedding_dim: Mapped[int]¶
- id: Mapped[int]¶
- sequence_id: Mapped[int]¶
Predictions
PredictionSet is the result container for one run of
predict_go_terms. It links the query set, embedding configuration,
annotation set, and ontology snapshot used, making every prediction set
fully reproducible. GOPrediction stores one row per (query protein,
GO term, reference protein) triple. The 14 optional feature-engineering
columns (alignment statistics and taxonomy fields) are NULL unless the
corresponding flags were set in the prediction payload.
- class protea.infrastructure.orm.models.embedding.prediction_set.PredictionSet(**kwargs)¶
Bases:
BaseGroups GO predictions from a single prediction run.
Records which
EmbeddingConfigwas used for similarity search and whichAnnotationSetwas used as the reference, providing complete traceability for every predicted GO term.- annotation_set: Mapped[AnnotationSet]¶
- annotation_set_id: Mapped[uuid.UUID]¶
- created_at: Mapped[datetime]¶
- distance_threshold: Mapped[float | None]¶
- embedding_config: Mapped[EmbeddingConfig]¶
- embedding_config_id: Mapped[uuid.UUID]¶
- id: Mapped[uuid.UUID]¶
- limit_per_entry: Mapped[int]¶
- meta: Mapped[dict[str, Any]]¶
- ontology_snapshot: Mapped[OntologySnapshot]¶
- ontology_snapshot_id: Mapped[uuid.UUID]¶
- predictions: Mapped[list[GOPrediction]]¶
- query_set_id: Mapped[uuid.UUID | None]¶
- class protea.infrastructure.orm.models.embedding.go_prediction.GOPrediction(**kwargs)¶
Bases:
BaseOne predicted GO term for a protein within a prediction set.
The prediction is derived by transferring annotations from the nearest reference protein (
ref_protein_accession) in embedding space. Thedistancefield records the cosine distance to that neighbor, which serves as a proxy for prediction confidence (lower = more similar).- alignment_length_nw: Mapped[float | None]¶
- alignment_length_sw: Mapped[float | None]¶
- alignment_score_nw: Mapped[float | None]¶
- alignment_score_sw: Mapped[float | None]¶
- distance: Mapped[float]¶
- evidence_code: Mapped[str | None]¶
- gaps_pct_nw: Mapped[float | None]¶
- gaps_pct_sw: Mapped[float | None]¶
- go_term_id: Mapped[int]¶
- id: Mapped[int]¶
- identity_nw: Mapped[float | None]¶
- identity_sw: Mapped[float | None]¶
- length_query: Mapped[int | None]¶
- length_ref: Mapped[int | None]¶
- prediction_set: Mapped[PredictionSet]¶
- prediction_set_id: Mapped[uuid.UUID]¶
- protein_accession: Mapped[str]¶
- qualifier: Mapped[str | None]¶
- query_taxonomy_id: Mapped[int | None]¶
- ref_protein_accession: Mapped[str]¶
- ref_taxonomy_id: Mapped[int | None]¶
- similarity_nw: Mapped[float | None]¶
- similarity_sw: Mapped[float | None]¶
- taxonomic_common_ancestors: Mapped[int | None]¶
- taxonomic_distance: Mapped[int | None]¶
- taxonomic_lca: Mapped[int | None]¶
- taxonomic_relation: Mapped[str | None]¶
Query Sets
QuerySet represents a user-uploaded FASTA dataset. QuerySetEntry
stores one row per FASTA entry, preserving the original accession header
and linking to the deduplicated Sequence row. If the amino-acid string
already exists in the database, the existing Sequence row is reused,
avoiding redundant embedding computation.
- class protea.infrastructure.orm.models.query.query_set.QuerySet(**kwargs)¶
Bases:
BaseA named collection of sequences uploaded by the user for GO term prediction.
Each uploaded FASTA file creates one
QuerySetrow. Entries preserve the original accession strings from the FASTA headers and link to the deduplicatedSequencerows. This allows the same physical sequence to appear in multiple query sets without duplication.- created_at: Mapped[datetime]¶
- description: Mapped[str | None]¶
- entries: Mapped[list[QuerySetEntry]]¶
- id: Mapped[UUID]¶
- name: Mapped[str]¶
- class protea.infrastructure.orm.models.query.query_set.QuerySetEntry(**kwargs)¶
Bases:
BaseOne sequence within a QuerySet, preserving the original FASTA accession.
accessionis the raw identifier from the FASTA header (may not exist in theproteintable).sequence_idlinks to the deduplicatedSequencerow used for embedding computation and similarity search.- accession: Mapped[str]¶
- created_at: Mapped[datetime]¶
- id: Mapped[int]¶
- query_set_id: Mapped[uuid.UUID]¶
- sequence_id: Mapped[int]¶
Queue¶
The queue layer provides two classes: QueueConsumer and
OperationConsumer.
QueueConsumer reads a job UUID from a RabbitMQ queue and delegates to
BaseWorker.handle_job(). It is used for queues where every message
corresponds to a tracked Job row: protea.ping, protea.jobs, and
protea.embeddings.
OperationConsumer reads a raw serialised operation payload from the
queue and executes it directly, without creating a Job row. It is used
for high-throughput batch queues (protea.embeddings.batch,
protea.embeddings.write, protea.predictions.batch,
protea.predictions.write) where creating thousands of child rows would
cause queue bloat. Progress is tracked at the parent job level only, via
the atomic progress_current increment.
The publisher module provides publish_job() and publish_operation()
helpers. Both are called by BaseWorker after the DB commit (not before),
guaranteeing that workers always find the DB row before they try to claim it.
- protea.infrastructure.queue.publisher.publish_job(amqp_url: str, queue_name: str, job_id: UUID) None¶
Publish a job dispatch message
{"job_id": "<uuid>"}to a queue.
- protea.infrastructure.queue.publisher.publish_operation(amqp_url: str, queue_name: str, payload: dict[str, Any]) None¶
Publish an ephemeral operation message to a queue.
Unlike publish_job, publishes a full payload dict consumed by OperationConsumer. Expected format:
{ "operation": "<name>", "job_id": "<parent-uuid>", "payload": { ... operation-specific fields ... }, }
- class protea.infrastructure.queue.consumer.OperationConsumer(amqp_url: str, queue_name: str, registry: OperationRegistry, session_factory: sessionmaker[Session], *, prefetch_count: int = 1, requeue_on_failure: bool = False)¶
Bases:
objectRabbitMQ consumer for ephemeral operation messages.
Unlike QueueConsumer (which manages the full Job lifecycle via BaseWorker), this consumer handles lightweight operation messages that have no DB Job row of their own. Workers process the operation, write results directly to the DB, and atomically update the parent Job’s progress counter.
Expected message format:
{ "operation": "<operation-name>", "job_id": "<parent-job-uuid>", "payload": { ... operation-specific fields ... } }
- run() None¶
- class protea.infrastructure.queue.consumer.QueueConsumer(amqp_url: str, queue_name: str, worker: BaseWorker, *, prefetch_count: int = 1, requeue_on_failure: bool = False)¶
Bases:
objectThin RabbitMQ consumer that delegates job execution to BaseWorker.
Responsibilities are strictly limited to transport concerns: - Connect to RabbitMQ and declare the queue. - Receive messages containing a JSON
{"job_id": "<uuid>"}body. - CallBaseWorker.handle_job(job_id)for each valid message. - Ack on success, nack on failure or invalid message. - Graceful shutdown on SIGINT / SIGTERM.All business logic, DB state transitions, and event emission happen inside BaseWorker — this class knows nothing about operations.
- run() None¶