diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fac7aec..101f626 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -102,6 +102,7 @@ jobs: -p cleanup-worker -p supply-worker -p oracle-worker + -p enrichment-worker # Assert every bootstrap the CDK app references actually built. A bare # "any bootstrap exists" glob would false-pass on the unrelated CLIs, diff --git a/infra/envs/production.json b/infra/envs/production.json index 4942abd..b785b6d 100644 --- a/infra/envs/production.json +++ b/infra/envs/production.json @@ -8,7 +8,8 @@ "assetSupply": "rate(1 hour)", "oracleWatcher": "rate(5 minutes)", "assetDiscovery": "rate(1 hour)", - "cleanup": "cron(0 3 * * ? *)" + "cleanup": "cron(0 3 * * ? *)", + "enrichment": "rate(1 hour)" }, "ledgerProcessor": { "memoryMb": 512, diff --git a/infra/src/lib/stacks/eventbridge-stack.ts b/infra/src/lib/stacks/eventbridge-stack.ts index e5b7976..45ab9b1 100644 --- a/infra/src/lib/stacks/eventbridge-stack.ts +++ b/infra/src/lib/stacks/eventbridge-stack.ts @@ -34,6 +34,11 @@ const SUPPLY_WORKER_ASSET_DIR = const ORACLE_WORKER_ASSET_DIR = process.env['ORACLE_WORKER_ASSET_DIR'] ?? '../target/lambda/oracle-worker'; +/** Cargo-lambda build output for the `enrichment-worker` binary (task 0026). */ +const ENRICHMENT_WORKER_ASSET_DIR = + process.env['ENRICHMENT_WORKER_ASSET_DIR'] ?? + '../target/lambda/enrichment-worker'; + export interface EventBridgeStackProps extends cdk.StackProps { readonly config: EnvironmentConfig; } @@ -58,10 +63,12 @@ export class EventBridgeStack extends cdk.Stack { public readonly oracleWatcherRule: events.Rule; public readonly assetDiscoveryRule: events.Rule; public readonly cleanupRule: events.Rule; + public readonly enrichmentRule: events.Rule; public readonly assetDiscoveryFunction: lambda.Function; public readonly cleanupFunction: lambda.Function; public readonly supplyFunction: lambda.Function; public readonly oracleFunction: lambda.Function; + public readonly enrichmentFunction: lambda.Function; constructor(scope: Construct, id: string, props: EventBridgeStackProps) { super(scope, id, props); @@ -96,6 +103,12 @@ export class EventBridgeStack extends cdk.Stack { schedule: events.Schedule.expression(schedules.cleanup), }); + this.enrichmentRule = new events.Rule(this, 'EnrichmentRule', { + ruleName: `prices-${env}-enrichment`, + description: `close_usd / volume_quote_usd enrichment of price_ohlcv_1m (${env})`, + schedule: events.Schedule.expression(schedules.enrichment), + }); + // ----------------------------------------------------------------- // Asset Discovery worker Lambda (task 0054) + its rule target. // No VPC (ADR 0007 §6); mTLS to ClickHouse + S3 read on BE's ledger @@ -270,6 +283,48 @@ export class EventBridgeStack extends cdk.Stack { value: this.oracleFunction.functionName, }); + // ----------------------------------------------------------------- + // Enrichment worker Lambda (task 0026) + its hourly cron target. CH-only + // (no S3, no Horizon, no VPC): reads price_ohlcv_1m + oracle_prices and + // re-inserts higher-`version` rows with close_usd / volume_quote_usd that + // the ReplacingMergeTree collapses on merge. Writes prices.* → the same + // `ingestion` mTLS identity the other writers use. Idempotency comes from + // the `FINAL WHERE … = 0` read filter, so concurrency is not pinned. + // ----------------------------------------------------------------- + this.enrichmentFunction = createWorkerLambda(this, { + config, + accountId, + mtlsSecretName: discoveryMtlsSecretName, + idPrefix: 'Enrichment', + name: 'enrichment', + assetDir: ENRICHMENT_WORKER_ASSET_DIR, + memorySize: 512, + // A bounded pass is MAX_BATCHES × BATCH_SIZE rows of set-based + // INSERT…SELECT; generous headroom under the hourly cadence (overflow + // just defers to the next run). + timeout: cdk.Duration.minutes(5), + secretsExtensionLayer, + chDomain, + rule: this.enrichmentRule, + environment: { + CLICKHOUSE_DATABASE: 'prices', + CLICKHOUSE_TABLE: 'price_ohlcv_1m', + // ORACLE_NAME / FORWARD_FILL_WINDOW_S / PIVOT_WINDOW_S / BATCH_SIZE / + // MAX_BATCHES unset → the binary's ChEnrichConfig defaults + // (reflector / 300 / 86400 / 10000 / 20). + }, + alarmDescription: + 'Enrichment Lambda invocation errors (close_usd / volume_quote_usd enrichment pass failed).', + alarmPeriod: cdk.Duration.hours(1), + }).function; + + new cdk.CfnOutput(this, 'EnrichmentRuleArn', { + value: this.enrichmentRule.ruleArn, + }); + new cdk.CfnOutput(this, 'EnrichmentFunctionName', { + value: this.enrichmentFunction.functionName, + }); + cdk.Tags.of(this).add('Project', 'stellar-prices-api'); cdk.Tags.of(this).add('ManagedBy', 'cdk'); cdk.Tags.of(this).add('Environment', env); diff --git a/infra/src/lib/types.ts b/infra/src/lib/types.ts index d5418fa..c89f6ba 100644 --- a/infra/src/lib/types.ts +++ b/infra/src/lib/types.ts @@ -65,6 +65,11 @@ export interface EnvironmentConfig { readonly assetDiscovery: string; /** Old-data partition drop (ALTER TABLE … DROP PARTITION). */ readonly cleanup: string; + /** + * volume_quote_usd / close_usd enrichment pass over price_ohlcv_1m + * (task 0026). Bounded-batch INSERT…SELECT into the ReplacingMergeTree. + */ + readonly enrichment: string; }; // Ledger Processor ingest (consumed by IngestStack — task 0038) @@ -166,6 +171,7 @@ export function validateConfig(config: EnvironmentConfig): void { 'oracleWatcher', 'assetDiscovery', 'cleanup', + 'enrichment', ] as const; for (const key of expectedKeys) { const value = schedules[key]; diff --git a/lore/1-tasks/active/0026_FEATURE_volume-quote-usd-enrichment-impl/README.md b/lore/1-tasks/active/0026_FEATURE_volume-quote-usd-enrichment-impl/README.md index 88c9019..2a5bcbf 100644 --- a/lore/1-tasks/active/0026_FEATURE_volume-quote-usd-enrichment-impl/README.md +++ b/lore/1-tasks/active/0026_FEATURE_volume-quote-usd-enrichment-impl/README.md @@ -118,6 +118,33 @@ history: backfill credibility check) and stay out of scope under the carried-in prepare-not-deploy constraint; the BE Form-B review is still open. Task stays `active`. + - date: 2026-06-29 + status: active + who: oski + note: > + **Option 1 — CDK + packaging (prepare-only).** Wired the enrichment + Lambda into the CDK app and matched the sibling-worker packaging, closing + the EventBridge/IAM ACs at the code+synth level (no deploy). Infra: + `EnrichmentRule` (`rate(1 hour)`) + worker `Function` + IAM role + error + alarm + log group in `eventbridge-stack.ts` via the shared + `createWorkerLambda`, mirroring oracle/cleanup/supply; `enrichment` added + to the schedule config type + validation + `production.json`. `cdk synth` + produces the full resource set (verified: rule rate(1 hour) → function; + env carries the mTLS contract CH_DOMAIN/MTLS_SECRET_NAME + CLICKHOUSE_*). + Crate: gated the Lambda bin behind a `lambda` feature (required-features, + lean default build) like the siblings, and rewrote the entrypoint to build + the mTLS client via `prices_clickhouse::mtls::client_from_lambda_env` + instead of a plain CLICKHOUSE_URL client (the prior fixture-prototype + Lambda mode is dropped; the prototype lives on as `enrichment-cli`). Added + `ChEnrichmentPass::with_client`; url-based `new()` kept for the integration + tests. CI: added `-p enrichment-worker` to the cargo-lambda build matrix. + Verified locally: default + `--features lambda` build clean, 24 unit + 2 + e2e pass, clippy/fmt clean, `cargo lambda build` produces the bootstrap, + infra lint/build/typecheck + `cdk synth` green. **Still deferred:** the + custom `EnrichmentRowsRemainingAtVolumeZero` metric + CloudWatch publish + + dashboard (Option 2 / task 0056), the one-shot historical mode (Option 2), + and the actual deploy + live backfill credibility check (Option 3 — lifts + prepare-not-deploy). Task stays `active`. --- # `volume_quote_usd` enrichment Lambda — implementation @@ -168,18 +195,26 @@ notes when made. Carried over from task 0024's design spec §7: -- [ ] EventBridge cron Lambda exists with the schema in §2 wired up. -- [ ] CDK + IAM matches §1.1 / §1.2. -- [ ] Re-running on already-enriched rows produces zero changes - (idempotency test). -- [ ] Rows with missing oracle stay at `volume_quote_usd = 0`, - `EnrichmentOracleMiss` metric increments. +- [x] EventBridge cron Lambda exists with the schema in §2 wired up. + — CDK authored in `eventbridge-stack.ts` (`EnrichmentRule` rate(1 hour) + → worker Function), `cdk synth` verified; live deploy still pending. +- [x] CDK + IAM matches §1.1 / §1.2. + — `createWorkerLambda` (IAM role + mTLS-secret read + SSM read + error + alarm + log group), arm64/PROVIDED_AL2023; synth-verified. Deploy pending. +- [x] Re-running on already-enriched rows produces zero changes + (idempotency test). — `ch_enrich_it.rs` vs prod-pinned CH 26.3.10.60. +- [x] Rows with missing oracle stay at `volume_quote_usd = 0`, + `EnrichmentOracleMiss` metric increments. — "no reference stays 0" half + verified by `ch_enrich_it.rs`; the metric-emission half is deferred with + the CloudWatch publish (Option 2 / task 0056). - [ ] After full SDEX backfill + a one-shot historical enrichment pass, `current_prices.volume_24h_usd` for at least 3 XLM-quoted assets reflects SDEX-sourced volume (>0 and credible against Horizon's historical aggregates). + — deploy-gated (needs live env + one-shot mode, Option 2/3). - [ ] CloudWatch metrics from spec §5 are emitted and visible in - the dashboard. + the dashboard. — metric publish + dashboard widgets deferred to + Option 2 / task 0056 (observability-stack is a scaffold). ## Future Work diff --git a/packages/enrichment-worker/Cargo.toml b/packages/enrichment-worker/Cargo.toml index 2c910ae..4f0fd60 100644 --- a/packages/enrichment-worker/Cargo.toml +++ b/packages/enrichment-worker/Cargo.toml @@ -8,14 +8,25 @@ description = "volume_quote_usd enrichment Lambda — local-only prototype (task name = "enrichment_worker" path = "src/lib.rs" +# EventBridge cron Lambda entrypoint (task 0026). Behind `lambda` so the +# default build/test exercises `ch_enrich` + the prototype CLI without the AWS +# runtime / mTLS stack. Deployable: cargo lambda build -p enrichment-worker +# --release --arm64 --features lambda (the `lambda` feature is required or the +# bin is skipped; ADR 0006). [[bin]] name = "enrichment-worker" path = "src/main.rs" +required-features = ["lambda"] [[bin]] name = "enrichment-cli" path = "src/bin/cli.rs" +[features] +default = [] +aws-mtls = ["prices-clickhouse/aws-mtls"] +lambda = ["aws-mtls", "dep:lambda_runtime"] + [dependencies] clap = { workspace = true } clickhouse = { workspace = true } @@ -28,7 +39,7 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } prices-clickhouse = { path = "../prices-clickhouse" } -lambda_runtime = "0.13" +lambda_runtime = { workspace = true, optional = true } [dev-dependencies] tempfile = "3" diff --git a/packages/enrichment-worker/src/ch_enrich.rs b/packages/enrichment-worker/src/ch_enrich.rs index 67d4d1f..9b75c67 100644 --- a/packages/enrichment-worker/src/ch_enrich.rs +++ b/packages/enrichment-worker/src/ch_enrich.rs @@ -163,6 +163,16 @@ impl ChEnrichmentPass { Self { client, cfg } } + /// Build a pass from a pre-constructed client — e.g. the mTLS client from + /// [`prices_clickhouse::mtls::client_from_lambda_env`] used by the Lambda + /// entrypoint. The client's URL/TLS are already configured (`cfg.url` is + /// ignored on this path); only `cfg.database` is re-applied so the pass and + /// the client agree on the target database. + pub fn with_client(client: Client, cfg: ChEnrichConfig) -> Self { + let client = client.with_database(&cfg.database); + Self { client, cfg } + } + /// Cold-start health check — fail Lambda Init, not per-event. pub async fn preflight(&self) -> Result<(), ChEnrichError> { self.client.query("SELECT 1").execute().await?; diff --git a/packages/enrichment-worker/src/main.rs b/packages/enrichment-worker/src/main.rs index 93778a4..6eae316 100644 --- a/packages/enrichment-worker/src/main.rs +++ b/packages/enrichment-worker/src/main.rs @@ -1,179 +1,47 @@ -//! Lambda entrypoint — EventBridge Scheduler handler. +//! Enrichment worker Lambda entrypoint (task 0026). //! -//! The Scheduler payload is **ignored**; the Lambda runs the -//! enrichment pass driven entirely by env-var config and the -//! fixture files (in prototype mode). Production swaps the -//! fixtures for CH queries; the handler shape stays the same. +//! EventBridge cron → this binary. Each run enriches a bounded batch of +//! `prices.price_ohlcv_1m` candidates with `close_usd` / `volume_quote_usd` +//! (oracle → stablecoin-peg → XLM-pivot tiers) over mTLS to ClickHouse, then +//! re-inserts the higher-`version` rows the ReplacingMergeTree collapses on +//! merge. //! -//! Cold-start eager init mirrors BE's indexer pattern: missing env -//! / unreachable fixture surfaces as a Lambda Init Errors entry, -//! not a per-event panic. - -use std::path::PathBuf; -use std::sync::Arc; - -use enrichment_worker::candidates::JsonlCandidateSource; -use enrichment_worker::ch_enrich::{ChEnrichConfig, ChEnrichmentPass}; -use enrichment_worker::oracle::InMemoryOracleLookup; -use enrichment_worker::pass::run_pass; -use enrichment_worker::sink::StdoutJsonSink; -use lambda_runtime::{Error, LambdaEvent, service_fn}; -use tokio::sync::Mutex; -use tracing::{error, info}; - -const ENV_CANDIDATES: &str = "CANDIDATES_FIXTURE"; -const ENV_ORACLE_FIXTURE: &str = "ORACLE_FIXTURE"; -const ENV_ORACLE_NAME: &str = "ORACLE_NAME"; -const ENV_WINDOW_S: &str = "FORWARD_FILL_WINDOW_S"; -const ENV_PIVOT_WINDOW_S: &str = "PIVOT_WINDOW_S"; -const ENV_BATCH_SIZE: &str = "BATCH_SIZE"; -const ENV_MAX_BATCHES: &str = "MAX_BATCHES"; - -// Production (CH Form-B) selector + connection. When `CLICKHOUSE_URL` -// is set the Lambda runs the batch ASOF-JOIN enrichment against -// ClickHouse; otherwise it falls back to the fixture-driven prototype. -const ENV_CLICKHOUSE_URL: &str = "CLICKHOUSE_URL"; -const ENV_CH_DATABASE: &str = "CLICKHOUSE_DATABASE"; -const ENV_CH_TABLE: &str = "CLICKHOUSE_TABLE"; - -#[derive(Clone)] -struct Cfg { - candidates_path: PathBuf, - oracle_path: PathBuf, - oracle_name: String, - window_s: u32, - batch_size: usize, - max_batches: usize, -} - -struct State { - oracle: InMemoryOracleLookup, - cfg: Cfg, - /// Mutex around the candidate source — `next_batch` takes `&mut self`. - /// In prototype Lambda mode each invocation rewinds via load (see handler). - /// Kept here for future production swap. - _candidates_path_marker: PathBuf, -} +//! cargo lambda build -p enrichment-worker --release --arm64 --features lambda +//! +//! Requires the `lambda` feature (default build/test exercises `ch_enrich` and +//! the prototype `enrichment-cli` without the AWS runtime / mTLS stack). +//! +//! Cold-start eager init mirrors the sibling workers: the mTLS client +//! (`MTLS_SECRET_NAME` + `CH_DOMAIN`) is built and probed once, so a missing +//! secret / unreachable endpoint surfaces as a Lambda Init Error rather than a +//! per-event panic. Config is env-driven; unset vars fall back to the +//! `ChEnrichConfig` defaults (reflector / 300s / 86400s / 10000 / 20). +#[cfg(feature = "lambda")] #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), lambda_runtime::Error> { + use enrichment_worker::ch_enrich::{ChEnrichConfig, ChEnrichmentPass}; + use lambda_runtime::{LambdaEvent, run, service_fn}; + use std::sync::Arc; + tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .json() .init(); - // Production swap: `CLICKHOUSE_URL` present → batch ASOF-JOIN - // enrichment against ClickHouse (Form B). Absent → fixture prototype. - if let Ok(url) = std::env::var(ENV_CLICKHOUSE_URL) { - return run_production(url).await; - } - - let cfg = Cfg { - candidates_path: std::env::var(ENV_CANDIDATES) - .map(PathBuf::from) - .unwrap_or_else(|_| PathBuf::from("fixtures/candidates.jsonl")), - oracle_path: std::env::var(ENV_ORACLE_FIXTURE) - .map(PathBuf::from) - .unwrap_or_else(|_| PathBuf::from("fixtures/oracle_prices.jsonl")), - oracle_name: std::env::var(ENV_ORACLE_NAME).unwrap_or_else(|_| "reflector".to_string()), - window_s: parse_env_or(ENV_WINDOW_S, 300), - batch_size: parse_env_or(ENV_BATCH_SIZE, 10_000), - max_batches: parse_env_or(ENV_MAX_BATCHES, 20), - }; - - info!( - candidates = %cfg.candidates_path.display(), - oracle = %cfg.oracle_path.display(), - oracle_name = %cfg.oracle_name, - window_s = cfg.window_s, - batch_size = cfg.batch_size, - max_batches = cfg.max_batches, - "enrichment-worker cold start" - ); - - let oracle = InMemoryOracleLookup::load_jsonl(&cfg.oracle_path) - .await - .map_err(|e| { - error!(error = %e, "oracle fixture load failed"); - format!("oracle fixture load failed: {e}") - })?; - - let state = Arc::new(Mutex::new(State { - oracle, - cfg: cfg.clone(), - _candidates_path_marker: cfg.candidates_path.clone(), - })); - - lambda_runtime::run(service_fn(move |event: LambdaEvent| { - let s = state.clone(); - async move { handler(event, s).await } - })) - .await -} - -async fn handler( - _event: LambdaEvent, - state: Arc>, -) -> Result { - let state = state.lock().await; - // Re-open the candidate source each invocation — production - // form runs one CH query per invocation, equivalent. - let mut candidates = JsonlCandidateSource::open(&state.cfg.candidates_path) - .await - .map_err(|e| format!("candidates open failed: {e}"))?; - - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_secs() as i64) - .unwrap_or(0); - - let stats = run_pass( - &mut candidates, - &state.oracle, - &StdoutJsonSink, - &state.cfg.oracle_name, - state.cfg.window_s, - state.cfg.batch_size, - state.cfg.max_batches, - now, - ) - .await - .map_err(|e| format!("pass failed: {e}"))?; - - info!( - batches = stats.batches, - candidates_seen = stats.candidates_seen, - enriched = stats.rows_enriched, - misses = stats.oracle_misses, - "enrichment pass complete" - ); - - Ok(serde_json::json!({ - "batches": stats.batches, - "candidates_seen": stats.candidates_seen, - "rows_enriched": stats.rows_enriched, - "oracle_misses": stats.oracle_misses, - })) -} - -/// Production entrypoint — the CH Form-B path. Builds the enrichment -/// pass at cold start (preflight failures surface as Lambda Init -/// Errors, mirroring the prototype's eager oracle load), then runs one -/// bounded pass per Scheduler event. -async fn run_production(url: String) -> Result<(), Error> { let cfg = ChEnrichConfig { - url, - database: std::env::var(ENV_CH_DATABASE).unwrap_or_else(|_| "prices".to_string()), - table: std::env::var(ENV_CH_TABLE).unwrap_or_else(|_| "price_ohlcv_1m".to_string()), - oracle_name: std::env::var(ENV_ORACLE_NAME).unwrap_or_else(|_| "reflector".to_string()), - window_s: parse_env_or(ENV_WINDOW_S, 300), - pivot_window_s: parse_env_or(ENV_PIVOT_WINDOW_S, 86_400), - batch_size: parse_env_or(ENV_BATCH_SIZE, 10_000), - max_batches: parse_env_or(ENV_MAX_BATCHES, 20), + // Unused on the mTLS path — the client carries the URL/TLS. + url: String::new(), + database: env_or("CLICKHOUSE_DATABASE", "prices"), + table: env_or("CLICKHOUSE_TABLE", "price_ohlcv_1m"), + oracle_name: env_or("ORACLE_NAME", "reflector"), + window_s: env_parse_or("FORWARD_FILL_WINDOW_S", 300), + pivot_window_s: env_parse_or("PIVOT_WINDOW_S", 86_400), + batch_size: env_parse_or("BATCH_SIZE", 10_000), + max_batches: env_parse_or("MAX_BATCHES", 20), }; - info!( - url = %cfg.url, + tracing::info!( database = %cfg.database, table = %cfg.table, oracle_name = %cfg.oracle_name, @@ -181,24 +49,28 @@ async fn run_production(url: String) -> Result<(), Error> { pivot_window_s = cfg.pivot_window_s, batch_size = cfg.batch_size, max_batches = cfg.max_batches, - "enrichment-worker cold start (clickhouse mode)" + "enrichment-worker cold start" ); - let pass = ChEnrichmentPass::new(cfg); - pass.preflight().await.map_err(|e| { - error!(error = %e, "clickhouse preflight failed"); - format!("clickhouse preflight failed: {e}") - })?; + // Cold start: build the mTLS client (MTLS_SECRET_NAME + CH_DOMAIN) and probe + // connectivity. Failures here surface as a CloudWatch Init error. + let client = prices_clickhouse::mtls::client_from_lambda_env(&cfg.database).await?; + let pass = Arc::new(ChEnrichmentPass::with_client(client, cfg)); + pass.preflight().await?; + tracing::info!("enrichment-worker cold start ready"); - let pass = Arc::new(pass); - lambda_runtime::run(service_fn(move |_event: LambdaEvent| { + run(service_fn(move |_event: LambdaEvent| { let pass = pass.clone(); async move { - let stats = pass - .run() - .await - .map_err(|e| format!("enrichment pass failed: {e}"))?; - Ok::<_, Error>(serde_json::json!({ + let stats = pass.run().await?; + tracing::info!( + batches = stats.batches, + candidates_before = stats.candidates_before, + candidates_after = stats.candidates_after, + rows_enriched = stats.rows_enriched, + "enrichment pass complete" + ); + Ok::(serde_json::json!({ "batches": stats.batches, "candidates_before": stats.candidates_before, "candidates_after": stats.candidates_after, @@ -209,9 +81,24 @@ async fn run_production(url: String) -> Result<(), Error> { .await } -fn parse_env_or(var: &str, default: T) -> T { +#[cfg(feature = "lambda")] +fn env_or(var: &str, default: &str) -> String { + std::env::var(var).unwrap_or_else(|_| default.to_string()) +} + +#[cfg(feature = "lambda")] +fn env_parse_or(var: &str, default: T) -> T { std::env::var(var) .ok() .and_then(|s| s.parse().ok()) .unwrap_or(default) } + +#[cfg(not(feature = "lambda"))] +fn main() { + eprintln!( + "enrichment-worker: build with `--features lambda` (or `cargo lambda build -p \ + enrichment-worker --release --arm64 --features lambda`) for the AWS Lambda \ + entrypoint. The local prototype driver is the `enrichment-cli` binary." + ); +}