Infrastructure

The protea.infrastructure package implements the persistence and messaging layer. It is the only package that imports SQLAlchemy, psycopg2, or aio-pika directly. All other layers interact with the database through the session factory and with the queue through the publisher interface.

Database engine

protea.infrastructure.database.engine creates the SQLAlchemy engine from the configured database URL. The engine is constructed once at application startup and shared across all session factories.

Settings

protea.infrastructure.settings loads configuration from protea/config/system.yaml and applies environment variable overrides. The two mandatory settings are db_url (PostgreSQL connection string) and amqp_url (RabbitMQ connection string). Both can be overridden at runtime via PROTEA_DB_URL and PROTEA_AMQP_URL environment variables, which takes precedence over the YAML file. This makes the same configuration file usable across local development, CI, and production deployments.

Session management

session_scope() is the single entry point for all database access in PROTEA. It is a context manager that commits on normal exit and rolls back on any exception, then always closes the session. Workers open and close sessions explicitly rather than relying on this context manager for long-lived operations, but it is used throughout the API layer and in tests.

The build_session_factory() function creates a SQLAlchemy sessionmaker bound to the given database URL. It is called once at application startup and stored on app.state.session_factory, keeping the router free of global state.

Telemetry

protea.infrastructure.telemetry configures OpenTelemetry tracing and Prometheus metrics collection. It instruments the FastAPI application, SQLAlchemy engine, and aio-pika AMQP client, emitting spans and counters for every request, query, and queue publish.

OpenTelemetry SDK boot for PROTEA (T5.1a + T5.1b).

This module is the single entry point for tracing in PROTEA. It wires a global opentelemetry.sdk.trace.TracerProvider configured from environment variables (per ADR D07) and instruments FastAPI, SQLAlchemy engines, and pika.

The OTel SDK is treated as optional at import time: if the libraries are not installed (e.g. minimal worker images), configure_telemetry() logs a single warning and returns the resolved config instead of raising. This keeps the existing developer workflow (poetry install) green until the F-OPS stack rolls out and is the pattern recommended by the OTel docs for SDK-optional applications.

T5.1a scope: SDK boot + env-driven exporter URL + FastAPI instrumentation. T5.1b scope: SQLAlchemy engine instrumentation + pika producer/consumer context propagation via the W3C traceparent header so spans stitch across HTTP -> queue -> worker boundaries.

Environment variables consumed

PROTEA_OTEL_ENABLED

Truthy values (1, true, yes, on) enable tracing. Default false so opting in is explicit and the developer workflow never blocks on a running collector.

PROTEA_OTEL_ENDPOINT

OTLP HTTP exporter endpoint (e.g. http://otel-collector:4318). When unset and tracing is enabled, the OTLP HTTP exporter falls back to its own default (http://localhost:4318).

PROTEA_OTEL_SERVICE_NAME

service.name resource attribute. Defaults to protea-api. Workers set this to protea-worker-<queue> at boot.

PROTEA_OTEL_SAMPLE_RATIO

ParentBased(TraceIdRatioBased(<ratio>)) sampler ratio. 0.0 disables sampling, 1.0 samples every trace. Default 1.0 (sampling is expected to be tuned via the collector once F-OPS sets up budgets).

class protea.infrastructure.telemetry.MetricRegistry(registry: Any, jobs_total: Any, job_duration_seconds: Any, embeddings_batch_seconds: Any, predictions_batch_seconds: Any, db_pool_in_use: Any, http_requests_total: Any, http_request_duration_seconds: Any, http_requests_in_flight: Any)

Bases: object

Bag of the five Prometheus collectors exposed by /v1/metrics.

The registry is intentionally a Parameter Object (not a module-level set of globals) so tests can build a throwaway one and the API can stash a single instance on app.state.metrics.

db_pool_in_use: Any
embeddings_batch_seconds: Any
http_request_duration_seconds: Any
http_requests_in_flight: Any
http_requests_total: Any
job_duration_seconds: Any
jobs_total: Any
predictions_batch_seconds: Any
registry: Any
class protea.infrastructure.telemetry.SdkBundle(TracerProvider: Any, Resource: Any, BatchSpanProcessor: Any, OTLPSpanExporter: Any, ParentBased: Any, TraceIdRatioBased: Any)

Bases: object

Bag of OTel SDK classes used by _build_provider().

Wrapped in a Parameter Object so _build_provider()’s signature stays under the project-wide 6-arg ceiling (see check_smells.py) while remaining testable with stand-ins.

BatchSpanProcessor: Any
OTLPSpanExporter: Any
ParentBased: Any
Resource: Any
TraceIdRatioBased: Any
TracerProvider: Any
class protea.infrastructure.telemetry.TelemetryConfig(enabled: bool, endpoint: str | None, service_name: str, sample_ratio: float)

Bases: object

Resolved telemetry settings.

Built by resolve_telemetry_config() from the environment so callers can introspect what the SDK boot will actually do without triggering the boot itself (handy for /health reporting and tests).

enabled: bool
endpoint: str | None
sample_ratio: float
service_name: str
protea.infrastructure.telemetry.build_metric_registry() MetricRegistry | None

Construct the five-metric registry, returning None when prometheus_client is not installed.

Mirrors the soft-fail pattern of configure_telemetry(): a minimal worker image that does not pull the observability extras keeps booting; the /v1/metrics router degrades to a 503 instead of crashing the whole API.

protea.infrastructure.telemetry.configure_telemetry(app: Any | None = None, *, config: TelemetryConfig | None = None, default_service_name: str = 'protea-api') TelemetryConfig

Boot the OTel SDK and (optionally) instrument a FastAPI app.

Idempotent: a second call with an already-configured global provider is a no-op (logged at DEBUG). Returns the resolved TelemetryConfig regardless of whether tracing was actually enabled, so callers can stash it on app.state and surface it via /health.

protea.infrastructure.telemetry.extract_trace_context(headers: dict[str, Any] | None) Any

Return an OTel context extracted from inbound AMQP headers.

The returned object is opaque (opentelemetry.context.Context); callers pass it to tracer.start_as_current_span(..., context=ctx). Returns None when the OTel API is not installed so callers can short-circuit to a no-op path.

protea.infrastructure.telemetry.get_tracer(name: str) Any

Return an OTel tracer for name or a no-op tracer when the API is not installed.

Keeps call sites (queue consumer, etc.) free of optional-import boilerplate: they unconditionally call get_tracer(__name__) and use the resulting object via the standard tracer API.

protea.infrastructure.telemetry.inject_trace_context(headers: dict[str, Any]) dict[str, Any]

Inject the current trace context into a mutable header mapping.

Used by the pika publisher to propagate traceparent (and tracestate) onto outbound AMQP messages so consumers can stitch spans back to the producing HTTP span. Returns the same mapping for convenience; mutates in place. When the OTel API is not installed or no context is active this is a no-op.

protea.infrastructure.telemetry.instrument_sqlalchemy_engine(engine: Any) None

Wrap a SQLAlchemy Engine with the OTel instrumentor.

Safe to call when telemetry is disabled or the instrumentor is not installed: both cases log at DEBUG / WARNING and return. Workers and the API boot call this from build_engine() so every engine created in the process emits db.* spans without each call site needing to know about OTel.

protea.infrastructure.telemetry.refresh_db_pool_gauge(metrics: MetricRegistry, engine: Any) None

Read the SQLAlchemy pool’s checked-out count and set the gauge.

Called from the /v1/metrics handler on every scrape so the gauge reflects the live pool state without requiring event-listener plumbing. engine is expected to be a SQLAlchemy Engine; we duck-type via engine.pool.checkedout() so tests can pass a minimal stub. Any AttributeError is swallowed (the pool may not expose the method on some dialects).

protea.infrastructure.telemetry.render_metrics(metrics: MetricRegistry) tuple[bytes, str]

Generate the Prometheus exposition payload for metrics.

Returns the body bytes + the canonical Content-Type header value so the router can return a Response without re-importing prometheus_client itself.

protea.infrastructure.telemetry.resolve_telemetry_config(env: dict[str, str] | None = None, *, default_service_name: str = 'protea-api') TelemetryConfig

Resolve telemetry settings from the environment.

env defaults to os.environ. default_service_name lets workers override the default protea-api value at boot.

Benchmark configuration

protea.infrastructure.benchmark_config loads the protea/config/benchmark.yaml file that drives the /benchmark router. It maps scoring-config display names, GO categories, and the baseline tag used in the Fmax comparison grid.

Loader for protea/config/benchmark.yaml.

Mirrors the load_settings pattern in settings.py but returns a typed dataclass specific to the benchmark matrix view. Consumed by the /benchmark/matrix router to avoid hardcoding stage taxonomy, labels, or GO-namespace constants.

class protea.infrastructure.benchmark_config.BenchmarkConfig(preferred_default_stages: 'tuple[str, ...]', baseline_scoring_name: 'str | None', hidden_stages: 'frozenset[str]', stage_labels: 'dict[str, str]', eval_set_labels: 'dict[str, str]', categories: 'tuple[str, ...]', aspects: 'tuple[str, ...]')

Bases: object

aspects: tuple[str, ...]
baseline_scoring_name: str | None
categories: tuple[str, ...]
eval_set_labels: dict[str, str]
hidden_stages: frozenset[str]
label_for_stage(name: str) str

Return the human-readable label for a stage.

Falls back to a Title-Cased version of the raw name if the YAML does not define an explicit label.

preferred_default_stages: tuple[str, ...]
stage_labels: dict[str, str]
protea.infrastructure.benchmark_config.load_benchmark_config(project_root: Path) BenchmarkConfig

Load protea/config/benchmark.yaml into a BenchmarkConfig.

Missing file → sane defaults (no hidden stages, no custom labels, categories = NK/LK/PK, aspects = BPO/MFO/CCO). This keeps the API functional even in fresh checkouts without the YAML.

ORM models

All models use SQLAlchemy 2.x declarative style with Mapped[] type annotations. The schema is managed by Alembic; migrations are generated via alembic revision --autogenerate and stored under alembic/versions/.

The ORM declarative base is defined in protea.infrastructure.orm.base. All models inherit from this base, which applies the default naming convention (snake-case table names, consistent constraint prefixes).

Job, JobEvent, and JobComment

Job is the central entity of the job queue. It implements a five-state machine (QUEUED RUNNING SUCCEEDED | FAILED, or QUEUED CANCELLED). The parent_job_id foreign key links batch child jobs to their coordinator parent. payload and meta are PostgreSQL JSONB columns, allowing arbitrary structured data without schema migrations for new operation types. progress_current and progress_total are updated atomically by write workers to drive the frontend progress bar. The narrative columns description / findings (Text, nullable) and tags (Text[], default empty) carry operator-supplied context and never affect dispatch (T3.9 / D11).

JobEvent is an append-only audit log: rows are written by the emit callback during execution and are never updated or deleted. The frontend renders them as a chronological event timeline.

JobComment is the human-authored note thread attached to a Job (T3.10 / D11). One row per comment with author (Text, nullable), body (Text, required), and created_at (Timestamptz). Foreign key to job(id) cascades on delete. Written via POST /jobs/{job_id}/comments and read chronologically via GET /jobs/{job_id}/comments.

Protein and Sequence

Sequence stores unique amino-acid strings, deduplicated by MD5 hash. Multiple Protein rows (canonical accessions and isoforms) may reference the same Sequence row, preventing redundant embedding computation for sequences that appear under different accessions.

Protein stores one row per UniProt accession, including isoforms (<canonical>-<n>). The canonical_accession field groups isoforms together, and the view-only relationship to ProteinUniProtMetadata is joined on this field rather than a foreign key, so metadata rows are not duplicated for each isoform.

GO Ontology

OntologySnapshot records one complete GO OBO release, versioned by the obo_version string from the OBO file header. The unique constraint on obo_version makes repeated imports idempotent. GOTerm stores one row per term per snapshot; GOTermRelationship stores the directed edges of the GO DAG with their relation types (is_a, part_of, regulates, etc.). The /annotations/snapshots/{id}/subgraph endpoint uses BFS over these edges to return ancestor subgraphs for a given set of GO term IDs.

InterPro Annotations

InterproAnnotation stores the domain signature results returned by the EBI InterProScan API for each protein. InterproGoMapping stores the static mapping from InterPro domain entries to their associated GO terms, as distributed by InterPro’s interpro2go file.

Annotation Sets

AnnotationSet groups a batch of protein GO annotations by source (goa or quickgo) and ontology snapshot version. This design allows side-by-side comparison of annotation sets from different sources or dates and ties every prediction result to a specific, versioned annotation input. ProteinGOAnnotation stores all GAF/QuickGO evidence fields verbatim: qualifier, evidence code, assigned-by, database reference, with/from, and annotation date.

Evaluation Sets

EvaluationSet stores the CAFA-style temporal holdout delta between two annotation sets (old → new). Contains summary statistics (NK/LK/PK protein and annotation counts) in a JSONB stats column. A DB-level UNIQUE(old_annotation_set_id, new_annotation_set_id) constraint (alembic revision b8e3f1a7c2d9) enforces that each (old, new) pair can have at most one EvaluationSet, making generate_evaluation_set idempotent at the schema layer. EvaluationResult stores the output of running cafaeval against a prediction set: per-namespace Fmax, precision, recall, τ, and coverage for NK, LK, and PK settings.

Embeddings

EmbeddingConfig defines a reproducible embedding recipe: model identifier, layer selection, pooling strategy, normalisation flags, and chunking parameters. Its UUID primary key is stable; changing any parameter creates a new configuration row. Both SequenceEmbedding rows and PredictionSet rows reference the same EmbeddingConfig, guaranteeing that query and reference embeddings are always comparable.

SequenceEmbedding stores a pgvector VECTOR for each (sequence, config, chunk) triple. When chunking is disabled the chunk index is 0 and the end index is NULL. pgvector is used for storage only; nearest- neighbour queries are performed in Python via protea.core.knn_search.

GO Prediction Features

GOPredictionFeatures stores the per-prediction feature vector used by the re-ranker as a separate table, normalised away from GOPrediction to keep the hot predictions table slim. Each row carries the full 56-column feature set produced by the feature-enrichment pipeline.

Predictions

PredictionSet is the result container for one run of predict_go_terms. It links the query set, embedding configuration, annotation set, and ontology snapshot used, making every prediction set fully reproducible. GOPrediction stores one row per (query protein, GO term, reference protein) triple. The 14 optional feature-engineering columns (alignment statistics and taxonomy fields) and 5 re-ranker aggregate features (vote_count, k_position, go_term_frequency, ref_annotation_density, neighbor_distance_std) are NULL unless the corresponding flags were set in the prediction payload.

Re-ranker Models

RerankerModel stores a trained LightGBM re-ranker for re-scoring GO term predictions. The booster itself can live either inline in the legacy model_data (Text, nullable) column or by reference in artifact_uri (String(512)) resolved through an ArtifactStore. New rows registered via scripts/register_reranker.py from a protea-reranker-lab run always use the artifact-backed path.

Provenance columns travel with every artifact-backed row: feature_schema_sha (String(16), load-bearing at inference time), embedding_config_id / ontology_snapshot_id (FKs, both SET NULL), producer_version (String(64)), producer_git_sha (String(40)) and spec_yaml (Text). metrics and feature_importance remain JSONB.

Scoring Configurations

ScoringConfig defines a set of feature weights and parameters for scoring GO predictions. Each config is a named, immutable recipe that can be applied to any prediction set to produce a composite score per prediction row.

Datasets

Dataset is the durable handle for a frozen re-ranker training dataset published to the artifact store. One row per export_research_dataset run that completes successfully; protea-reranker-lab’s pull_dataset.py resolves either a UUID or a human name against this table to fetch the exact train.parquet / eval.parquet / manifest.json triple that produced a given booster.

Storage is backend-agnostic: train_uri / eval_uri / manifest_uri are opaque URIs (file://… for the local backend, s3://bucket/key for MinIO) resolved through the ArtifactStore interface; callers never need to know which backend is active. Two content fingerprints provide drift detection: schema_sha (16-char) records the feature-set version (must match the booster’s feature_schema_sha at inference time) and manifest_sha (64-char) is the sha256 of the serialized manifest bytes. Provenance lives in producer_version / producer_git_sha so any registered booster can be traced back to the PROTEA HEAD that emitted its dataset.

API Keys

APIKey stores hashed API keys together with an optional name label and last_used_at timestamp. The raw key is never stored; only the SHA-256 hex digest is persisted. The is_active flag allows revocation without deletion.

Support Entries

SupportEntry stores community feedback: a thumbs-up with an optional comment. Used by the /support router.

Query Sets

QuerySet represents a user-uploaded FASTA dataset. QuerySetEntry stores one row per FASTA entry, preserving the original accession header and linking to the deduplicated Sequence row. If the amino-acid string already exists in the database, the existing Sequence row is reused, avoiding redundant embedding computation.

Visitor Events

VisitorEvent is the append-only log used by the Grafana “unique visitors” dashboard. One row is written per HTTP GET to a non-asset path. The schema deliberately omits IP addresses: each row stores only visitor_hash, the first 16 hex chars of sha256(daily_salt || client_ip) where daily_salt is a 32-byte random value held in process memory and rotated every calendar day. Once the day rolls over the salt is gone, so cross-day correlation of a visitor becomes cryptographically infeasible. This is the same no-PII model used by Plausible and Fathom.

Experiment Runs

ExperimentRun is the per-research-run narrative + provenance anchor introduced in T3.8 (Fase 4). A single row aggregates multiple Job / EvaluationResult / RerankerModel rows under one human name (unique). The narrative trio (description / hypothesis / findings) mirrors Job’s T3.9 D11 columns, and the JSONB config + provenance bags are designed to receive snapshots from protea.core.provenance.capture_provenance(). The ExperimentRunStatus enum is plannedrunningdone or abandoned; planned (rather than queued) reflects the draft-first lifecycle of a research run, and abandoned (rather than failed) covers stops without a hard error. Linkage to sibling rows lands in T4.7-T4.9.

Logging

protea.infrastructure.logging provides structured JSON logging via a custom JSONFormatter. The configure_logging() function sets up the root logger with either JSON or plain text output, used by worker processes and the API server.

Structured logging configuration for PROTEA.

Provides a JSON formatter using only the Python standard library and a configure_logging() helper that workers and the API can call at startup.

class protea.infrastructure.logging.JSONFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)

Bases: Formatter

Formats log records as single-line JSON objects.

Each line contains at least timestamp, level, logger, and message. Any extra fields attached to the record are merged into the top-level JSON object, making it easy to add structured context (e.g. logger.info("started", extra={"queue": "protea.jobs"})).

format(record: LogRecord) str

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

protea.infrastructure.logging.configure_logging(*, json: bool = True, level: str = 'INFO') None

Configure the root logger for the process.

Parameters:
  • json – When True (the default), use JSONFormatter so that every log line is a valid JSON object. When False, fall back to the plain-text format used during local development.

  • level – Root log level name (e.g. "INFO", "DEBUG").

Artifact storage

protea.infrastructure.storage defines the ArtifactStore Protocol and its two concrete backends. It is the single surface for writing and reading large produced blobs (re-ranker boosters, frozen datasets) and is kept strictly separate from the evaluation-artifacts directory consumed by run_cafa_evaluation.

ArtifactStore is a typing.Protocol with four methods:

  • put(key: str, src: Path | bytes) -> str: store a blob under key and return its URI.

  • get(key: str) -> bytes: fetch raw bytes stored at key.

  • url(key: str) -> str: return the backend-specific URI for key without performing I/O.

  • exists(key: str) -> bool: check whether key is present.

URIs are always persisted verbatim in the database so consumers can resolve them without knowing the concrete backend:

  • LocalFsArtifactStore emits file:///absolute/path/... URIs.

  • MinioArtifactStore emits s3://<bucket>/<key> URIs.

The MinioArtifactStore client is imported lazily so PROTEA can be installed without the minio package; the constructor raises a clear ImportError pointing at the [storage] extra when the dependency is missing.

get_artifact_store(settings) is the entry point used by every operation that needs to write a blob. It reads settings.storage_backend ("local" or "minio") and returns the appropriate instance. If MinIO is selected but the endpoint or credentials are incomplete, or the server is unreachable at construction time, the factory logs a warning and returns a LocalFsArtifactStore rooted at settings.storage_root (or settings.artifacts_dir as a fallback). This prevents missing optional infrastructure from crashing the stack in development.

Queue

The queue layer provides two classes: QueueConsumer and OperationConsumer.

QueueConsumer reads a job UUID from a RabbitMQ queue and delegates to BaseWorker.handle_job(). It is used for queues where every message corresponds to a tracked Job row: protea.ping, protea.jobs, and protea.embeddings. Before dispatching, QueueConsumer checks whether the Job row is already in CANCELLED state and, if so, nacks the delivery with requeue=False. This prevents pointless worker execution on stale pre-queued messages and avoids a prefetch=1 deadlock on queues full of orphaned cancellations (T-INFRA.NACK, PR #373).

OperationConsumer reads a raw serialised operation payload from the queue and executes it directly, without creating a Job row. It is used for high-throughput batch queues (protea.embeddings.batch, protea.embeddings.write, protea.predictions.batch, protea.predictions.write) where creating thousands of child rows would cause queue bloat. Progress is tracked at the parent job level only, via the atomic progress_current increment.

The publisher module provides publish_job() and publish_operation() helpers. Both are called by BaseWorker after the DB commit (not before), guaranteeing that workers always find the DB row before they try to claim it.

See also