Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ jobs:
-p cleanup-worker
-p supply-worker
-p oracle-worker
-p enrichment-worker

# Assert every bootstrap the CDK app references actually built. A bare
# "any bootstrap exists" glob would false-pass on the unrelated CLIs,
Expand Down
3 changes: 2 additions & 1 deletion infra/envs/production.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"assetSupply": "rate(1 hour)",
"oracleWatcher": "rate(5 minutes)",
"assetDiscovery": "rate(1 hour)",
"cleanup": "cron(0 3 * * ? *)"
"cleanup": "cron(0 3 * * ? *)",
"enrichment": "rate(1 hour)"
},
"ledgerProcessor": {
"memoryMb": 512,
Expand Down
55 changes: 55 additions & 0 deletions infra/src/lib/stacks/eventbridge-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ const SUPPLY_WORKER_ASSET_DIR =
const ORACLE_WORKER_ASSET_DIR =
process.env['ORACLE_WORKER_ASSET_DIR'] ?? '../target/lambda/oracle-worker';

/** Cargo-lambda build output for the `enrichment-worker` binary (task 0026). */
const ENRICHMENT_WORKER_ASSET_DIR =
process.env['ENRICHMENT_WORKER_ASSET_DIR'] ??
'../target/lambda/enrichment-worker';

export interface EventBridgeStackProps extends cdk.StackProps {
readonly config: EnvironmentConfig;
}
Expand All @@ -58,10 +63,12 @@ export class EventBridgeStack extends cdk.Stack {
public readonly oracleWatcherRule: events.Rule;
public readonly assetDiscoveryRule: events.Rule;
public readonly cleanupRule: events.Rule;
public readonly enrichmentRule: events.Rule;
public readonly assetDiscoveryFunction: lambda.Function;
public readonly cleanupFunction: lambda.Function;
public readonly supplyFunction: lambda.Function;
public readonly oracleFunction: lambda.Function;
public readonly enrichmentFunction: lambda.Function;

constructor(scope: Construct, id: string, props: EventBridgeStackProps) {
super(scope, id, props);
Expand Down Expand Up @@ -96,6 +103,12 @@ export class EventBridgeStack extends cdk.Stack {
schedule: events.Schedule.expression(schedules.cleanup),
});

this.enrichmentRule = new events.Rule(this, 'EnrichmentRule', {
ruleName: `prices-${env}-enrichment`,
description: `close_usd / volume_quote_usd enrichment of price_ohlcv_1m (${env})`,
schedule: events.Schedule.expression(schedules.enrichment),
});

// -----------------------------------------------------------------
// Asset Discovery worker Lambda (task 0054) + its rule target.
// No VPC (ADR 0007 §6); mTLS to ClickHouse + S3 read on BE's ledger
Expand Down Expand Up @@ -270,6 +283,48 @@ export class EventBridgeStack extends cdk.Stack {
value: this.oracleFunction.functionName,
});

// -----------------------------------------------------------------
// Enrichment worker Lambda (task 0026) + its hourly cron target. CH-only
// (no S3, no Horizon, no VPC): reads price_ohlcv_1m + oracle_prices and
// re-inserts higher-`version` rows with close_usd / volume_quote_usd that
// the ReplacingMergeTree collapses on merge. Writes prices.* → the same
// `ingestion` mTLS identity the other writers use. Idempotency comes from
// the `FINAL WHERE … = 0` read filter, so concurrency is not pinned.
// -----------------------------------------------------------------
this.enrichmentFunction = createWorkerLambda(this, {
config,
accountId,
mtlsSecretName: discoveryMtlsSecretName,
idPrefix: 'Enrichment',
name: 'enrichment',
assetDir: ENRICHMENT_WORKER_ASSET_DIR,
memorySize: 512,
// A bounded pass is MAX_BATCHES × BATCH_SIZE rows of set-based
// INSERT…SELECT; generous headroom under the hourly cadence (overflow
// just defers to the next run).
timeout: cdk.Duration.minutes(5),
secretsExtensionLayer,
chDomain,
rule: this.enrichmentRule,
environment: {
CLICKHOUSE_DATABASE: 'prices',
CLICKHOUSE_TABLE: 'price_ohlcv_1m',
// ORACLE_NAME / FORWARD_FILL_WINDOW_S / PIVOT_WINDOW_S / BATCH_SIZE /
// MAX_BATCHES unset → the binary's ChEnrichConfig defaults
// (reflector / 300 / 86400 / 10000 / 20).
},
alarmDescription:
'Enrichment Lambda invocation errors (close_usd / volume_quote_usd enrichment pass failed).',
alarmPeriod: cdk.Duration.hours(1),
}).function;

new cdk.CfnOutput(this, 'EnrichmentRuleArn', {
value: this.enrichmentRule.ruleArn,
});
new cdk.CfnOutput(this, 'EnrichmentFunctionName', {
value: this.enrichmentFunction.functionName,
});

cdk.Tags.of(this).add('Project', 'stellar-prices-api');
cdk.Tags.of(this).add('ManagedBy', 'cdk');
cdk.Tags.of(this).add('Environment', env);
Expand Down
6 changes: 6 additions & 0 deletions infra/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ export interface EnvironmentConfig {
readonly assetDiscovery: string;
/** Old-data partition drop (ALTER TABLE … DROP PARTITION). */
readonly cleanup: string;
/**
* volume_quote_usd / close_usd enrichment pass over price_ohlcv_1m
* (task 0026). Bounded-batch INSERT…SELECT into the ReplacingMergeTree.
*/
readonly enrichment: string;
};

// Ledger Processor ingest (consumed by IngestStack — task 0038)
Expand Down Expand Up @@ -166,6 +171,7 @@ export function validateConfig(config: EnvironmentConfig): void {
'oracleWatcher',
'assetDiscovery',
'cleanup',
'enrichment',
] as const;
for (const key of expectedKeys) {
const value = schedules[key];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,33 @@ history:
backfill credibility check) and stay out of scope under the carried-in
prepare-not-deploy constraint; the BE Form-B review is still open. Task
stays `active`.
- date: 2026-06-29
status: active
who: oski
note: >
**Option 1 — CDK + packaging (prepare-only).** Wired the enrichment
Lambda into the CDK app and matched the sibling-worker packaging, closing
the EventBridge/IAM ACs at the code+synth level (no deploy). Infra:
`EnrichmentRule` (`rate(1 hour)`) + worker `Function` + IAM role + error
alarm + log group in `eventbridge-stack.ts` via the shared
`createWorkerLambda`, mirroring oracle/cleanup/supply; `enrichment` added
to the schedule config type + validation + `production.json`. `cdk synth`
produces the full resource set (verified: rule rate(1 hour) → function;
env carries the mTLS contract CH_DOMAIN/MTLS_SECRET_NAME + CLICKHOUSE_*).
Crate: gated the Lambda bin behind a `lambda` feature (required-features,
lean default build) like the siblings, and rewrote the entrypoint to build
the mTLS client via `prices_clickhouse::mtls::client_from_lambda_env`
instead of a plain CLICKHOUSE_URL client (the prior fixture-prototype
Lambda mode is dropped; the prototype lives on as `enrichment-cli`). Added
`ChEnrichmentPass::with_client`; url-based `new()` kept for the integration
tests. CI: added `-p enrichment-worker` to the cargo-lambda build matrix.
Verified locally: default + `--features lambda` build clean, 24 unit + 2
e2e pass, clippy/fmt clean, `cargo lambda build` produces the bootstrap,
infra lint/build/typecheck + `cdk synth` green. **Still deferred:** the
custom `EnrichmentRowsRemainingAtVolumeZero` metric + CloudWatch publish +
dashboard (Option 2 / task 0056), the one-shot historical mode (Option 2),
and the actual deploy + live backfill credibility check (Option 3 — lifts
prepare-not-deploy). Task stays `active`.
---

# `volume_quote_usd` enrichment Lambda — implementation
Expand Down Expand Up @@ -168,18 +195,26 @@ notes when made.

Carried over from task 0024's design spec §7:

- [ ] EventBridge cron Lambda exists with the schema in §2 wired up.
- [ ] CDK + IAM matches §1.1 / §1.2.
- [ ] Re-running on already-enriched rows produces zero changes
(idempotency test).
- [ ] Rows with missing oracle stay at `volume_quote_usd = 0`,
`EnrichmentOracleMiss` metric increments.
- [x] EventBridge cron Lambda exists with the schema in §2 wired up.
— CDK authored in `eventbridge-stack.ts` (`EnrichmentRule` rate(1 hour)
→ worker Function), `cdk synth` verified; live deploy still pending.
- [x] CDK + IAM matches §1.1 / §1.2.
— `createWorkerLambda` (IAM role + mTLS-secret read + SSM read + error
alarm + log group), arm64/PROVIDED_AL2023; synth-verified. Deploy pending.
- [x] Re-running on already-enriched rows produces zero changes
(idempotency test). — `ch_enrich_it.rs` vs prod-pinned CH 26.3.10.60.
- [x] Rows with missing oracle stay at `volume_quote_usd = 0`,
`EnrichmentOracleMiss` metric increments. — "no reference stays 0" half
verified by `ch_enrich_it.rs`; the metric-emission half is deferred with
the CloudWatch publish (Option 2 / task 0056).
- [ ] After full SDEX backfill + a one-shot historical enrichment
pass, `current_prices.volume_24h_usd` for at least 3
XLM-quoted assets reflects SDEX-sourced volume (>0 and
credible against Horizon's historical aggregates).
— deploy-gated (needs live env + one-shot mode, Option 2/3).
- [ ] CloudWatch metrics from spec §5 are emitted and visible in
the dashboard.
the dashboard. — metric publish + dashboard widgets deferred to
Option 2 / task 0056 (observability-stack is a scaffold).

## Future Work

Expand Down
13 changes: 12 additions & 1 deletion packages/enrichment-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,25 @@ description = "volume_quote_usd enrichment Lambda — local-only prototype (task
name = "enrichment_worker"
path = "src/lib.rs"

# EventBridge cron Lambda entrypoint (task 0026). Behind `lambda` so the
# default build/test exercises `ch_enrich` + the prototype CLI without the AWS
# runtime / mTLS stack. Deployable: cargo lambda build -p enrichment-worker
# --release --arm64 --features lambda (the `lambda` feature is required or the
# bin is skipped; ADR 0006).
[[bin]]
name = "enrichment-worker"
path = "src/main.rs"
required-features = ["lambda"]

[[bin]]
name = "enrichment-cli"
path = "src/bin/cli.rs"

[features]
default = []
aws-mtls = ["prices-clickhouse/aws-mtls"]
lambda = ["aws-mtls", "dep:lambda_runtime"]

[dependencies]
clap = { workspace = true }
clickhouse = { workspace = true }
Expand All @@ -28,7 +39,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
prices-clickhouse = { path = "../prices-clickhouse" }

lambda_runtime = "0.13"
lambda_runtime = { workspace = true, optional = true }

[dev-dependencies]
tempfile = "3"
10 changes: 10 additions & 0 deletions packages/enrichment-worker/src/ch_enrich.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ impl ChEnrichmentPass {
Self { client, cfg }
}

/// Build a pass from a pre-constructed client — e.g. the mTLS client from
/// [`prices_clickhouse::mtls::client_from_lambda_env`] used by the Lambda
/// entrypoint. The client's URL/TLS are already configured (`cfg.url` is
/// ignored on this path); only `cfg.database` is re-applied so the pass and
/// the client agree on the target database.
pub fn with_client(client: Client, cfg: ChEnrichConfig) -> Self {
let client = client.with_database(&cfg.database);
Self { client, cfg }
}

/// Cold-start health check — fail Lambda Init, not per-event.
pub async fn preflight(&self) -> Result<(), ChEnrichError> {
self.client.query("SELECT 1").execute().await?;
Expand Down
Loading
Loading