Infrastructure¶
The protea.infrastructure package implements the persistence and messaging
layer. It is the only package that imports SQLAlchemy, psycopg2, or aio-pika
directly. All other layers interact with the database through the session
factory and with the queue through the publisher interface.
Database engine
protea.infrastructure.database.engine creates the SQLAlchemy engine
from the configured database URL. The engine is constructed once at
application startup and shared across all session factories.
Settings¶
protea.infrastructure.settings loads configuration from
protea/config/system.yaml and applies environment variable overrides.
The two mandatory settings are db_url (PostgreSQL connection string) and
amqp_url (RabbitMQ connection string). Both can be overridden at runtime
via PROTEA_DB_URL and PROTEA_AMQP_URL environment variables, which
takes precedence over the YAML file. This makes the same configuration file
usable across local development, CI, and production deployments.
Session management¶
session_scope() is the single entry point for all database access in
PROTEA. It is a context manager that commits on normal exit and rolls back
on any exception, then always closes the session. Workers open and close
sessions explicitly rather than relying on this context manager for
long-lived operations, but it is used throughout the API layer and in tests.
The build_session_factory() function creates a SQLAlchemy sessionmaker
bound to the given database URL. It is called once at application startup and
stored on app.state.session_factory, keeping the router free of global
state.
Telemetry
protea.infrastructure.telemetry configures OpenTelemetry tracing and
Prometheus metrics collection. It instruments the FastAPI application,
SQLAlchemy engine, and aio-pika AMQP client, emitting spans and counters
for every request, query, and queue publish.
OpenTelemetry SDK boot for PROTEA (T5.1a + T5.1b).
This module is the single entry point for tracing in PROTEA. It wires
a global opentelemetry.sdk.trace.TracerProvider configured
from environment variables (per ADR D07) and instruments FastAPI,
SQLAlchemy engines, and pika.
The OTel SDK is treated as optional at import time: if the libraries
are not installed (e.g. minimal worker images), configure_telemetry()
logs a single warning and returns the resolved config instead of
raising. This keeps the existing developer workflow (poetry install)
green until the F-OPS stack rolls out and is the pattern recommended
by the OTel docs for SDK-optional applications.
T5.1a scope: SDK boot + env-driven exporter URL + FastAPI instrumentation.
T5.1b scope: SQLAlchemy engine instrumentation + pika producer/consumer
context propagation via the W3C traceparent header so spans stitch
across HTTP -> queue -> worker boundaries.
Environment variables consumed¶
PROTEA_OTEL_ENABLEDTruthy values (
1,true,yes,on) enable tracing. Defaultfalseso opting in is explicit and the developer workflow never blocks on a running collector.PROTEA_OTEL_ENDPOINTOTLP HTTP exporter endpoint (e.g.
http://otel-collector:4318). When unset and tracing is enabled, the OTLP HTTP exporter falls back to its own default (http://localhost:4318).PROTEA_OTEL_SERVICE_NAMEservice.nameresource attribute. Defaults toprotea-api. Workers set this toprotea-worker-<queue>at boot.PROTEA_OTEL_SAMPLE_RATIOParentBased(TraceIdRatioBased(<ratio>))sampler ratio.0.0disables sampling,1.0samples every trace. Default1.0(sampling is expected to be tuned via the collector once F-OPS sets up budgets).
- class protea.infrastructure.telemetry.MetricRegistry(registry: Any, jobs_total: Any, job_duration_seconds: Any, embeddings_batch_seconds: Any, predictions_batch_seconds: Any, db_pool_in_use: Any, http_requests_total: Any, http_request_duration_seconds: Any, http_requests_in_flight: Any)¶
Bases:
objectBag of the five Prometheus collectors exposed by
/v1/metrics.The registry is intentionally a Parameter Object (not a module-level set of globals) so tests can build a throwaway one and the API can stash a single instance on
app.state.metrics.- db_pool_in_use: Any¶
- embeddings_batch_seconds: Any¶
- http_request_duration_seconds: Any¶
- http_requests_in_flight: Any¶
- http_requests_total: Any¶
- job_duration_seconds: Any¶
- jobs_total: Any¶
- predictions_batch_seconds: Any¶
- registry: Any¶
- class protea.infrastructure.telemetry.SdkBundle(TracerProvider: Any, Resource: Any, BatchSpanProcessor: Any, OTLPSpanExporter: Any, ParentBased: Any, TraceIdRatioBased: Any)¶
Bases:
objectBag of OTel SDK classes used by
_build_provider().Wrapped in a Parameter Object so
_build_provider()’s signature stays under the project-wide 6-arg ceiling (seecheck_smells.py) while remaining testable with stand-ins.- BatchSpanProcessor: Any¶
- OTLPSpanExporter: Any¶
- ParentBased: Any¶
- Resource: Any¶
- TraceIdRatioBased: Any¶
- TracerProvider: Any¶
- class protea.infrastructure.telemetry.TelemetryConfig(enabled: bool, endpoint: str | None, service_name: str, sample_ratio: float)¶
Bases:
objectResolved telemetry settings.
Built by
resolve_telemetry_config()from the environment so callers can introspect what the SDK boot will actually do without triggering the boot itself (handy for/healthreporting and tests).- enabled: bool¶
- endpoint: str | None¶
- sample_ratio: float¶
- service_name: str¶
- protea.infrastructure.telemetry.build_metric_registry() MetricRegistry | None¶
Construct the five-metric registry, returning
Nonewhenprometheus_clientis not installed.Mirrors the soft-fail pattern of
configure_telemetry(): a minimal worker image that does not pull the observability extras keeps booting; the/v1/metricsrouter degrades to a 503 instead of crashing the whole API.
- protea.infrastructure.telemetry.configure_telemetry(app: Any | None = None, *, config: TelemetryConfig | None = None, default_service_name: str = 'protea-api') TelemetryConfig¶
Boot the OTel SDK and (optionally) instrument a FastAPI app.
Idempotent: a second call with an already-configured global provider is a no-op (logged at DEBUG). Returns the resolved
TelemetryConfigregardless of whether tracing was actually enabled, so callers can stash it onapp.stateand surface it via/health.
- protea.infrastructure.telemetry.extract_trace_context(headers: dict[str, Any] | None) Any¶
Return an OTel context extracted from inbound AMQP headers.
The returned object is opaque (
opentelemetry.context.Context); callers pass it totracer.start_as_current_span(..., context=ctx). ReturnsNonewhen the OTel API is not installed so callers can short-circuit to a no-op path.
- protea.infrastructure.telemetry.get_tracer(name: str) Any¶
Return an OTel tracer for
nameor a no-op tracer when the API is not installed.Keeps call sites (queue consumer, etc.) free of optional-import boilerplate: they unconditionally call
get_tracer(__name__)and use the resulting object via the standard tracer API.
- protea.infrastructure.telemetry.inject_trace_context(headers: dict[str, Any]) dict[str, Any]¶
Inject the current trace context into a mutable header mapping.
Used by the pika publisher to propagate
traceparent(andtracestate) onto outbound AMQP messages so consumers can stitch spans back to the producing HTTP span. Returns the same mapping for convenience; mutates in place. When the OTel API is not installed or no context is active this is a no-op.
- protea.infrastructure.telemetry.instrument_sqlalchemy_engine(engine: Any) None¶
Wrap a SQLAlchemy
Enginewith the OTel instrumentor.Safe to call when telemetry is disabled or the instrumentor is not installed: both cases log at DEBUG / WARNING and return. Workers and the API boot call this from
build_engine()so every engine created in the process emitsdb.*spans without each call site needing to know about OTel.
- protea.infrastructure.telemetry.refresh_db_pool_gauge(metrics: MetricRegistry, engine: Any) None¶
Read the SQLAlchemy pool’s checked-out count and set the gauge.
Called from the
/v1/metricshandler on every scrape so the gauge reflects the live pool state without requiring event-listener plumbing.engineis expected to be a SQLAlchemyEngine; we duck-type viaengine.pool.checkedout()so tests can pass a minimal stub. Any AttributeError is swallowed (the pool may not expose the method on some dialects).
- protea.infrastructure.telemetry.render_metrics(metrics: MetricRegistry) tuple[bytes, str]¶
Generate the Prometheus exposition payload for
metrics.Returns the body bytes + the canonical
Content-Typeheader value so the router can return aResponsewithout re-importingprometheus_clientitself.
- protea.infrastructure.telemetry.resolve_telemetry_config(env: dict[str, str] | None = None, *, default_service_name: str = 'protea-api') TelemetryConfig¶
Resolve telemetry settings from the environment.
envdefaults toos.environ.default_service_namelets workers override the defaultprotea-apivalue at boot.
Benchmark configuration
protea.infrastructure.benchmark_config loads the
protea/config/benchmark.yaml file that drives the /benchmark
router. It maps scoring-config display names, GO categories, and the
baseline tag used in the Fmax comparison grid.
Loader for protea/config/benchmark.yaml.
Mirrors the load_settings pattern in settings.py but returns a typed
dataclass specific to the benchmark matrix view. Consumed by the
/benchmark/matrix router to avoid hardcoding stage taxonomy, labels,
or GO-namespace constants.
- class protea.infrastructure.benchmark_config.BenchmarkConfig(preferred_default_stages: 'tuple[str, ...]', baseline_scoring_name: 'str | None', hidden_stages: 'frozenset[str]', stage_labels: 'dict[str, str]', eval_set_labels: 'dict[str, str]', categories: 'tuple[str, ...]', aspects: 'tuple[str, ...]')¶
Bases:
object- aspects: tuple[str, ...]¶
- baseline_scoring_name: str | None¶
- categories: tuple[str, ...]¶
- eval_set_labels: dict[str, str]¶
- label_for_stage(name: str) str¶
Return the human-readable label for a stage.
Falls back to a Title-Cased version of the raw name if the YAML does not define an explicit label.
- preferred_default_stages: tuple[str, ...]¶
- stage_labels: dict[str, str]¶
- protea.infrastructure.benchmark_config.load_benchmark_config(project_root: Path) BenchmarkConfig¶
Load
protea/config/benchmark.yamlinto aBenchmarkConfig.Missing file → sane defaults (no hidden stages, no custom labels, categories = NK/LK/PK, aspects = BPO/MFO/CCO). This keeps the API functional even in fresh checkouts without the YAML.
ORM models¶
All models use SQLAlchemy 2.x declarative style with Mapped[] type
annotations. The schema is managed by Alembic; migrations are generated via
alembic revision --autogenerate and stored under alembic/versions/.
The ORM declarative base is defined in protea.infrastructure.orm.base.
All models inherit from this base, which applies the default naming
convention (snake-case table names, consistent constraint prefixes).
Job, JobEvent, and JobComment
Job is the central entity of the job queue. It implements a five-state
machine (QUEUED → RUNNING → SUCCEEDED | FAILED, or QUEUED →
CANCELLED). The parent_job_id foreign key links batch child jobs to
their coordinator parent. payload and meta are PostgreSQL JSONB
columns, allowing arbitrary structured data without schema migrations for
new operation types. progress_current and progress_total are updated
atomically by write workers to drive the frontend progress bar. The
narrative columns description / findings (Text, nullable) and
tags (Text[], default empty) carry operator-supplied context
and never affect dispatch (T3.9 / D11).
JobEvent is an append-only audit log: rows are written by the emit
callback during execution and are never updated or deleted. The frontend
renders them as a chronological event timeline.
JobComment is the human-authored note thread attached to a Job
(T3.10 / D11). One row per comment with author (Text, nullable),
body (Text, required), and created_at (Timestamptz). Foreign
key to job(id) cascades on delete. Written via
POST /jobs/{job_id}/comments and read chronologically via
GET /jobs/{job_id}/comments.
Protein and Sequence
Sequence stores unique amino-acid strings, deduplicated by MD5 hash.
Multiple Protein rows (canonical accessions and isoforms) may reference
the same Sequence row, preventing redundant embedding computation for
sequences that appear under different accessions.
Protein stores one row per UniProt accession, including isoforms
(<canonical>-<n>). The canonical_accession field groups isoforms
together, and the view-only relationship to ProteinUniProtMetadata is
joined on this field rather than a foreign key, so metadata rows are not
duplicated for each isoform.
GO Ontology
OntologySnapshot records one complete GO OBO release, versioned by the
obo_version string from the OBO file header. The unique constraint on
obo_version makes repeated imports idempotent. GOTerm stores one row
per term per snapshot; GOTermRelationship stores the directed edges of
the GO DAG with their relation types (is_a, part_of, regulates,
etc.). The /annotations/snapshots/{id}/subgraph endpoint uses BFS over
these edges to return ancestor subgraphs for a given set of GO term IDs.
InterPro Annotations
InterproAnnotation stores the domain signature results returned by
the EBI InterProScan API for each protein. InterproGoMapping stores
the static mapping from InterPro domain entries to their associated GO
terms, as distributed by InterPro’s interpro2go file.
Annotation Sets
AnnotationSet groups a batch of protein GO annotations by source
(goa or quickgo) and ontology snapshot version. This design allows
side-by-side comparison of annotation sets from different sources or dates
and ties every prediction result to a specific, versioned annotation input.
ProteinGOAnnotation stores all GAF/QuickGO evidence fields verbatim:
qualifier, evidence code, assigned-by, database reference, with/from, and
annotation date.
Evaluation Sets
EvaluationSet stores the CAFA-style temporal holdout delta between two
annotation sets (old → new). Contains summary statistics (NK/LK/PK protein and
annotation counts) in a JSONB stats column. A DB-level
UNIQUE(old_annotation_set_id, new_annotation_set_id) constraint (alembic
revision b8e3f1a7c2d9) enforces that each (old, new) pair can have at most
one EvaluationSet, making generate_evaluation_set idempotent at the
schema layer. EvaluationResult stores the output of running cafaeval
against a prediction set: per-namespace Fmax, precision, recall, τ, and
coverage for NK, LK, and PK settings.
Embeddings
EmbeddingConfig defines a reproducible embedding recipe: model identifier,
layer selection, pooling strategy, normalisation flags, and chunking
parameters. Its UUID primary key is stable; changing any parameter creates
a new configuration row. Both SequenceEmbedding rows and PredictionSet
rows reference the same EmbeddingConfig, guaranteeing that query and
reference embeddings are always comparable.
SequenceEmbedding stores a pgvector VECTOR for each
(sequence, config, chunk) triple. When chunking is disabled the chunk index
is 0 and the end index is NULL. pgvector is used for storage only; nearest-
neighbour queries are performed in Python via protea.core.knn_search.
GO Prediction Features
GOPredictionFeatures stores the per-prediction feature vector used
by the re-ranker as a separate table, normalised away from
GOPrediction to keep the hot predictions table slim. Each row
carries the full 56-column feature set produced by the feature-enrichment
pipeline.
Predictions
PredictionSet is the result container for one run of
predict_go_terms. It links the query set, embedding configuration,
annotation set, and ontology snapshot used, making every prediction set
fully reproducible. GOPrediction stores one row per (query protein,
GO term, reference protein) triple. The 14 optional feature-engineering
columns (alignment statistics and taxonomy fields) and 5 re-ranker aggregate
features (vote_count, k_position, go_term_frequency,
ref_annotation_density, neighbor_distance_std) are NULL unless the
corresponding flags were set in the prediction payload.
Re-ranker Models
RerankerModel stores a trained LightGBM re-ranker for re-scoring GO
term predictions. The booster itself can live either inline in the
legacy model_data (Text, nullable) column or by reference in
artifact_uri (String(512)) resolved through an ArtifactStore.
New rows registered via scripts/register_reranker.py from a
protea-reranker-lab run always use the artifact-backed path.
Provenance columns travel with every artifact-backed row:
feature_schema_sha (String(16), load-bearing at inference time),
embedding_config_id / ontology_snapshot_id (FKs, both
SET NULL), producer_version (String(64)), producer_git_sha
(String(40)) and spec_yaml (Text). metrics and
feature_importance remain JSONB.
Scoring Configurations
ScoringConfig defines a set of feature weights and parameters for scoring
GO predictions. Each config is a named, immutable recipe that can be applied
to any prediction set to produce a composite score per prediction row.
Datasets
Dataset is the durable handle for a frozen re-ranker training dataset
published to the artifact store. One row per export_research_dataset
run that completes successfully; protea-reranker-lab’s
pull_dataset.py resolves either a UUID or a human name against
this table to fetch the exact train.parquet / eval.parquet /
manifest.json triple that produced a given booster.
Storage is backend-agnostic: train_uri / eval_uri /
manifest_uri are opaque URIs (file://… for the local backend,
s3://bucket/key for MinIO) resolved through the
ArtifactStore interface; callers never need to know which
backend is active. Two content fingerprints provide drift detection:
schema_sha (16-char) records the feature-set version (must match
the booster’s feature_schema_sha at inference time) and
manifest_sha (64-char) is the sha256 of the serialized manifest
bytes. Provenance lives in producer_version / producer_git_sha
so any registered booster can be traced back to the PROTEA HEAD that
emitted its dataset.
API Keys
APIKey stores hashed API keys together with an optional name
label and last_used_at timestamp. The raw key is never stored; only
the SHA-256 hex digest is persisted. The is_active flag allows
revocation without deletion.
Support Entries
SupportEntry stores community feedback: a thumbs-up with an optional
comment. Used by the /support router.
Query Sets
QuerySet represents a user-uploaded FASTA dataset. QuerySetEntry
stores one row per FASTA entry, preserving the original accession header
and linking to the deduplicated Sequence row. If the amino-acid string
already exists in the database, the existing Sequence row is reused,
avoiding redundant embedding computation.
Visitor Events
VisitorEvent is the append-only log used by the Grafana
“unique visitors” dashboard. One row is written per HTTP GET to a
non-asset path. The schema deliberately omits IP addresses: each row
stores only visitor_hash, the first 16 hex chars of
sha256(daily_salt || client_ip) where daily_salt is a 32-byte
random value held in process memory and rotated every calendar day.
Once the day rolls over the salt is gone, so cross-day correlation of a
visitor becomes cryptographically infeasible. This is the same
no-PII model used by Plausible and Fathom.
Experiment Runs
ExperimentRun is the per-research-run narrative + provenance
anchor introduced in T3.8 (Fase 4). A single row aggregates multiple
Job / EvaluationResult / RerankerModel rows under one
human name (unique). The narrative trio
(description / hypothesis / findings) mirrors Job’s
T3.9 D11 columns, and the JSONB config + provenance bags
are designed to receive snapshots from
protea.core.provenance.capture_provenance(). The
ExperimentRunStatus enum is planned → running → done
or abandoned; planned (rather than queued) reflects the
draft-first lifecycle of a research run, and abandoned (rather
than failed) covers stops without a hard error. Linkage to sibling
rows lands in T4.7-T4.9.
Logging¶
protea.infrastructure.logging provides structured JSON logging via a
custom JSONFormatter. The configure_logging() function sets up the
root logger with either JSON or plain text output, used by worker processes
and the API server.
Structured logging configuration for PROTEA.
Provides a JSON formatter using only the Python standard library and a
configure_logging() helper that workers and the API can call at startup.
- class protea.infrastructure.logging.JSONFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)¶
Bases:
FormatterFormats log records as single-line JSON objects.
Each line contains at least
timestamp,level,logger, andmessage. Any extra fields attached to the record are merged into the top-level JSON object, making it easy to add structured context (e.g.logger.info("started", extra={"queue": "protea.jobs"})).- format(record: LogRecord) str¶
Format the specified record as text.
The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
- protea.infrastructure.logging.configure_logging(*, json: bool = True, level: str = 'INFO') None¶
Configure the root logger for the process.
- Parameters:
json – When True (the default), use
JSONFormatterso that every log line is a valid JSON object. When False, fall back to the plain-text format used during local development.level – Root log level name (e.g.
"INFO","DEBUG").
Artifact storage¶
protea.infrastructure.storage defines the ArtifactStore Protocol
and its two concrete backends. It is the single surface for writing and
reading large produced blobs (re-ranker boosters, frozen datasets) and
is kept strictly separate from the evaluation-artifacts directory
consumed by run_cafa_evaluation.
ArtifactStore is a typing.Protocol with four methods:
put(key: str, src: Path | bytes) -> str: store a blob underkeyand return its URI.get(key: str) -> bytes: fetch raw bytes stored atkey.url(key: str) -> str: return the backend-specific URI forkeywithout performing I/O.exists(key: str) -> bool: check whetherkeyis present.
URIs are always persisted verbatim in the database so consumers can resolve them without knowing the concrete backend:
LocalFsArtifactStoreemitsfile:///absolute/path/...URIs.MinioArtifactStoreemitss3://<bucket>/<key>URIs.
The MinioArtifactStore client is imported lazily so PROTEA can be
installed without the minio package; the constructor raises a
clear ImportError pointing at the [storage] extra when the
dependency is missing.
get_artifact_store(settings) is the entry point used by every
operation that needs to write a blob. It reads
settings.storage_backend ("local" or "minio") and returns
the appropriate instance. If MinIO is selected but the endpoint or
credentials are incomplete, or the server is unreachable at
construction time, the factory logs a warning and returns a
LocalFsArtifactStore rooted at settings.storage_root (or
settings.artifacts_dir as a fallback). This prevents missing
optional infrastructure from crashing the stack in development.
Queue¶
The queue layer provides two classes: QueueConsumer and
OperationConsumer.
QueueConsumer reads a job UUID from a RabbitMQ queue and delegates to
BaseWorker.handle_job(). It is used for queues where every message
corresponds to a tracked Job row: protea.ping, protea.jobs, and
protea.embeddings. Before dispatching, QueueConsumer checks whether the
Job row is already in CANCELLED state and, if so, nacks the delivery
with requeue=False. This prevents pointless worker execution on stale
pre-queued messages and avoids a prefetch=1 deadlock on queues full of orphaned
cancellations (T-INFRA.NACK, PR #373).
OperationConsumer reads a raw serialised operation payload from the
queue and executes it directly, without creating a Job row. It is used
for high-throughput batch queues (protea.embeddings.batch,
protea.embeddings.write, protea.predictions.batch,
protea.predictions.write) where creating thousands of child rows would
cause queue bloat. Progress is tracked at the parent job level only, via
the atomic progress_current increment.
The publisher module provides publish_job() and publish_operation()
helpers. Both are called by BaseWorker after the DB commit (not before),
guaranteeing that workers always find the DB row before they try to claim it.
See also
Data Model: the conceptual model behind every ORM class above.
Workers: how
BaseWorkerand the consumer classes use the publisher and session helpers documented here.ADR-005: Reusable RabbitMQ connections: why the publisher reuses one connection per thread.