From 98fd557652c8b96c691de77289f200927c7a4b19 Mon Sep 17 00:00:00 2001 From: karczuRF Date: Fri, 26 Jun 2026 13:10:32 +0200 Subject: [PATCH 1/3] fix(lore-0059): qualify timestamp in rollup argMin/argMax + full-chain test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The shipped rollups.sql and preroll.sql alias the bucket key `toStartOfInterval(timestamp, …) AS timestamp`, which shadows the source `timestamp` column. The bare `timestamp` inside argMin(open)/argMax(close)/ argMax(close_usd) then resolves to the constant bucket-start, so open/close/ close_usd tie-break to an arbitrary row instead of the true first/last by time (volumes, high, low were unaffected). Qualify the source column via FROM … AS t. Add tests/rollup_chain_it.rs: a full _1m → … → _1M integration test that drives the real refreshable-MV chain end-to-end and proves enrichment re-inserts propagate with no under/double-count and a winning version at every grain, plus a preroll.sql full-range pass. The fix is what makes the OHLC assertions green. current.sql was already correct (FROM … AS c + c.timestamp). Schema-overview §3.2 reference DDL updated to match. --- .../database-schema-overview.md | 21 +- packages/prices-clickhouse/schema/preroll.sql | 66 ++-- packages/prices-clickhouse/schema/rollups.sql | 79 +++-- .../tests/rollup_chain_it.rs | 324 ++++++++++++++++++ 4 files changed, 418 insertions(+), 72 deletions(-) create mode 100644 packages/prices-clickhouse/tests/rollup_chain_it.rs diff --git a/docs/database-schema/database-schema-overview.md b/docs/database-schema/database-schema-overview.md index 9282070..6bceef9 100644 --- a/docs/database-schema/database-schema-overview.md +++ b/docs/database-schema/database-schema-overview.md @@ -491,22 +491,22 @@ CREATE MATERIALIZED VIEW prices.mv_ohlcv_1m_to_15m REFRESH EVERY 1 MINUTE -- coarser grains refresh less often TO prices.price_ohlcv_15m AS SELECT - toStartOfInterval(timestamp, INTERVAL 15 MINUTE) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 15 MINUTE) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, - max(high) AS high, + argMin(open, t.timestamp) AS open, -- qualified: the AS-timestamp + max(high) AS high, -- alias shadows the source column min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, volume_quote_usd / nullIf(volume_base, 0) AS vwap, -- ref aliases, never re-sum(…) sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1m FINAL -- post-dedup, post-enrichment -WHERE timestamp >= now() - INTERVAL 2 HOUR -- bounded re-scan; widen for coarse grains +FROM prices.price_ohlcv_1m AS t FINAL -- post-dedup, post-enrichment +WHERE t.timestamp >= now() - INTERVAL 2 HOUR -- bounded re-scan; widen for coarse grains GROUP BY timestamp, asset_id, quote_asset_id, source; -- Repeat for 15m → 1h, 1h → 4h, 4h → 1d, 1d → 1w, 1w → 1M — each FROM the @@ -525,6 +525,15 @@ volume_base, 0)`), never `sum(…)/sum(…)` — re-summing an aliased column ne an early minute leaves the bucket max unchanged, tying the stale and corrected rollup rows; project a strictly-increasing version (`sum(version)` or a refresh epoch) there. +- **Qualify the bucket-time argument.** The bucket key is aliased `AS timestamp` + to land in the target's `timestamp` column, but that alias **shadows** the + source `timestamp` column. `argMin(open, …)` / `argMax(close, …)` / + `argMax(close_usd, …)` must therefore reference the **qualified** source column + `t.timestamp` (`FROM … AS t`). With the bare `timestamp` they read the + constant bucket-start, so open/close/close_usd tie-break to an arbitrary row in + the bucket instead of the true first/last by time. The 0059 full-chain + integration test (`prices-clickhouse/tests/rollup_chain_it.rs`) caught this in + the as-shipped `rollups.sql` / `preroll.sql`; both are fixed. Refreshable MVs require ClickHouse ≥ 23.12; the exact mechanism (refreshable MV vs. scheduled re-aggregate) is finalised in task **0051** against the Hetzner diff --git a/packages/prices-clickhouse/schema/preroll.sql b/packages/prices-clickhouse/schema/preroll.sql index f09d581..9f9281b 100644 --- a/packages/prices-clickhouse/schema/preroll.sql +++ b/packages/prices-clickhouse/schema/preroll.sql @@ -10,111 +10,117 @@ -- splits on `;`). Re-runnable: ReplacingMergeTree(version) collapses the -- duplicate-PK rows a second run would add. For a clean measurement, TRUNCATE -- the coarse tables first (the runbook does this). +-- +-- Correctness (task 0059): the bucket key is aliased `AS timestamp`, which +-- SHADOWS the source `timestamp` column. argMin/argMax must reference the +-- QUALIFIED source column `t.timestamp` (FROM … AS t); the bare `timestamp` +-- would resolve to the constant bucket-start alias and tie-break open / close / +-- close_usd to an arbitrary row instead of the true first / last by time. INSERT INTO prices.price_ohlcv_15m SELECT - toStartOfInterval(timestamp, INTERVAL 15 MINUTE) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 15 MINUTE) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1m FINAL +FROM prices.price_ohlcv_1m AS t FINAL GROUP BY timestamp, asset_id, quote_asset_id, source; INSERT INTO prices.price_ohlcv_1h SELECT - toStartOfInterval(timestamp, INTERVAL 1 HOUR) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 HOUR) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_15m FINAL +FROM prices.price_ohlcv_15m AS t FINAL GROUP BY timestamp, asset_id, quote_asset_id, source; INSERT INTO prices.price_ohlcv_4h SELECT - toStartOfInterval(timestamp, INTERVAL 4 HOUR) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 4 HOUR) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1h FINAL +FROM prices.price_ohlcv_1h AS t FINAL GROUP BY timestamp, asset_id, quote_asset_id, source; INSERT INTO prices.price_ohlcv_1d SELECT - toStartOfInterval(timestamp, INTERVAL 1 DAY) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 DAY) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_4h FINAL +FROM prices.price_ohlcv_4h AS t FINAL GROUP BY timestamp, asset_id, quote_asset_id, source; INSERT INTO prices.price_ohlcv_1w SELECT - toStartOfInterval(timestamp, INTERVAL 1 WEEK) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 WEEK) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1d FINAL +FROM prices.price_ohlcv_1d AS t FINAL GROUP BY timestamp, asset_id, quote_asset_id, source; INSERT INTO prices.price_ohlcv_1M SELECT - toStartOfInterval(timestamp, INTERVAL 1 MONTH) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 MONTH) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1w FINAL +FROM prices.price_ohlcv_1w AS t FINAL GROUP BY timestamp, asset_id, quote_asset_id, source; diff --git a/packages/prices-clickhouse/schema/rollups.sql b/packages/prices-clickhouse/schema/rollups.sql index 7f6ae34..09ffd32 100644 --- a/packages/prices-clickhouse/schema/rollups.sql +++ b/packages/prices-clickhouse/schema/rollups.sql @@ -15,129 +15,136 @@ -- - version = max(version) is correct for a TRUE refreshable MV (atomic -- target replace). If a CH build forces scheduled INSERT…SELECT into a -- ReplacingMergeTree instead, project a strictly-increasing version. +-- - The bucket key is aliased `AS timestamp` to match the target column, but +-- that alias SHADOWS the source `timestamp` column. argMin/argMax must +-- therefore reference the QUALIFIED source column `t.timestamp` (FROM … AS +-- t) — using the bare `timestamp` resolves to the bucket-start alias, which +-- is constant within a bucket, so open/close/close_usd would tie-break to an +-- arbitrary row instead of the true first/last by time (task 0059 full-chain +-- integration test). The WHERE window is likewise qualified `t.timestamp`. CREATE MATERIALIZED VIEW IF NOT EXISTS prices.mv_ohlcv_1m_to_15m REFRESH EVERY 1 MINUTE TO prices.price_ohlcv_15m AS SELECT - toStartOfInterval(timestamp, INTERVAL 15 MINUTE) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 15 MINUTE) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1m FINAL -WHERE timestamp >= now() - INTERVAL 2 HOUR +FROM prices.price_ohlcv_1m AS t FINAL +WHERE t.timestamp >= now() - INTERVAL 2 HOUR GROUP BY timestamp, asset_id, quote_asset_id, source; CREATE MATERIALIZED VIEW IF NOT EXISTS prices.mv_ohlcv_15m_to_1h REFRESH EVERY 15 MINUTE TO prices.price_ohlcv_1h AS SELECT - toStartOfInterval(timestamp, INTERVAL 1 HOUR) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 HOUR) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_15m FINAL -WHERE timestamp >= now() - INTERVAL 8 HOUR +FROM prices.price_ohlcv_15m AS t FINAL +WHERE t.timestamp >= now() - INTERVAL 8 HOUR GROUP BY timestamp, asset_id, quote_asset_id, source; CREATE MATERIALIZED VIEW IF NOT EXISTS prices.mv_ohlcv_1h_to_4h REFRESH EVERY 1 HOUR TO prices.price_ohlcv_4h AS SELECT - toStartOfInterval(timestamp, INTERVAL 4 HOUR) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 4 HOUR) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1h FINAL -WHERE timestamp >= now() - INTERVAL 1 DAY +FROM prices.price_ohlcv_1h AS t FINAL +WHERE t.timestamp >= now() - INTERVAL 1 DAY GROUP BY timestamp, asset_id, quote_asset_id, source; CREATE MATERIALIZED VIEW IF NOT EXISTS prices.mv_ohlcv_4h_to_1d REFRESH EVERY 4 HOUR TO prices.price_ohlcv_1d AS SELECT - toStartOfInterval(timestamp, INTERVAL 1 DAY) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 DAY) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_4h FINAL -WHERE timestamp >= now() - INTERVAL 7 DAY +FROM prices.price_ohlcv_4h AS t FINAL +WHERE t.timestamp >= now() - INTERVAL 7 DAY GROUP BY timestamp, asset_id, quote_asset_id, source; CREATE MATERIALIZED VIEW IF NOT EXISTS prices.mv_ohlcv_1d_to_1w REFRESH EVERY 1 DAY TO prices.price_ohlcv_1w AS SELECT - toStartOfInterval(timestamp, INTERVAL 1 WEEK) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 WEEK) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1d FINAL -WHERE timestamp >= now() - INTERVAL 60 DAY +FROM prices.price_ohlcv_1d AS t FINAL +WHERE t.timestamp >= now() - INTERVAL 60 DAY GROUP BY timestamp, asset_id, quote_asset_id, source; CREATE MATERIALIZED VIEW IF NOT EXISTS prices.mv_ohlcv_1w_to_1M REFRESH EVERY 1 DAY TO prices.price_ohlcv_1M AS SELECT - toStartOfInterval(timestamp, INTERVAL 1 MONTH) AS timestamp, + toStartOfInterval(t.timestamp, INTERVAL 1 MONTH) AS timestamp, asset_id, quote_asset_id, source, - argMin(open, timestamp) AS open, + argMin(open, t.timestamp) AS open, max(high) AS high, min(low) AS low, - argMax(close, timestamp) AS close, + argMax(close, t.timestamp) AS close, sum(volume_base) AS volume_base, sum(volume_quote) AS volume_quote, sum(volume_quote_usd) AS volume_quote_usd, - argMax(close_usd, timestamp) AS close_usd, + argMax(close_usd, t.timestamp) AS close_usd, volume_quote / nullIf(volume_base, 0) AS vwap, sum(trade_count) AS trade_count, max(version) AS version -FROM prices.price_ohlcv_1w FINAL -WHERE timestamp >= now() - INTERVAL 400 DAY +FROM prices.price_ohlcv_1w AS t FINAL +WHERE t.timestamp >= now() - INTERVAL 400 DAY GROUP BY timestamp, asset_id, quote_asset_id, source; diff --git a/packages/prices-clickhouse/tests/rollup_chain_it.rs b/packages/prices-clickhouse/tests/rollup_chain_it.rs new file mode 100644 index 0000000..08f8d2b --- /dev/null +++ b/packages/prices-clickhouse/tests/rollup_chain_it.rs @@ -0,0 +1,324 @@ +//! Full-chain rollup version-propagation integration test (task 0059). +//! +//! docker compose up -d clickhouse +//! cargo test -p prices-clickhouse --test rollup_chain_it -- --ignored +//! +//! Exercises the REAL shipped refreshable-MV chain (`schema/rollups.sql`, +//! landed by task 0051) end-to-end across every granularity `_1m → _15m → _1h +//! → _4h → _1d → _1w → _1M`, and proves the two correctness properties task +//! 0059 was opened to verify against the production DDL: +//! +//! AC#1/AC#2 — an enrichment re-INSERT into `_1m` (`volume_quote_usd` filled, +//! `version` bumped) propagates to EVERY rolled granularity after a refresh, +//! with NO under-count and NO double-count of the summed volumes, and the +//! re-aggregated row WINS at every grain. +//! +//! The shipped chain is a TRUE refreshable MV in *replace* mode (atomic target +//! swap) re-aggregating from the previous grain `FINAL` — so `max(version)` is +//! a sufficient projection (the swap discards the stale row; there is no +//! ReplacingMergeTree version tie to lose). This test pins that behaviour. +//! +//! Owns an isolated scratch database and drops it at the end. The 0059 G-note +//! proof (`lore/.../proof/`) only covered the `_1m → _15m` hop by hand; this is +//! the full-chain automation that closes AC#3. + +use clickhouse::Client; +use std::time::Duration; + +fn ch_url() -> String { + std::env::var("CLICKHOUSE_URL").unwrap_or_else(|_| "http://localhost:8123".to_string()) +} + +/// Rewrite the `prices.*` schema onto an isolated scratch database name +/// (same trick as `views_it.rs`), so the test never touches the real `prices` +/// tables and can be dropped wholesale at the end. +fn rewrite(sql: &str, db: &str) -> String { + sql.replace("prices.", &format!("{db}.")) + .replace("IF NOT EXISTS prices", &format!("IF NOT EXISTS {db}")) +} + +/// One 15-minute bucket of three per-minute `_1m` rows for a single +/// `(asset 1, quote 2, sdex)` series, all anchored to the PREVIOUS completed +/// 15-minute bucket (12–14 min before the current boundary) so they: +/// - share one `_15m`/`_1h`/.../`_1M` bucket at every grain, and +/// - sit comfortably inside every rollup `WHERE timestamp >= now() - …` +/// window (the tightest is `_15m`'s 2 HOUR). +/// +/// `vqusd` is the per-row `volume_quote_usd` (0 = un-enriched, >0 = enriched); +/// `version` is the `ReplacingMergeTree(version)` discriminator. +fn insert_bucket(db: &str, vqusd: &str, version: u64) -> String { + let b = "toStartOfInterval(now(), INTERVAL 15 MINUTE)"; + format!( + "INSERT INTO {db}.price_ohlcv_1m \ + (timestamp, asset_id, quote_asset_id, source, open, high, low, close, \ + volume_base, volume_quote, volume_quote_usd, close_usd, vwap, trade_count, version) VALUES \ + ({b} - INTERVAL 14 MINUTE, 1,2,'sdex', 1.00,1.50,0.90,1.10, 10,50,{vqusd},0,1.00,1,{version}), \ + ({b} - INTERVAL 13 MINUTE, 1,2,'sdex', 1.10,1.60,1.00,1.20, 10,50,{vqusd},0,1.10,1,{version}), \ + ({b} - INTERVAL 12 MINUTE, 1,2,'sdex', 1.20,1.40,0.80,1.30, 10,50,{vqusd},0,1.20,1,{version})" + ) +} + +/// The chain in dependency order: each MV re-aggregates the previous grain's +/// target `FINAL`, so they must be refreshed front-to-back. +const CHAIN: &[(&str, &str)] = &[ + ("mv_ohlcv_1m_to_15m", "price_ohlcv_15m"), + ("mv_ohlcv_15m_to_1h", "price_ohlcv_1h"), + ("mv_ohlcv_1h_to_4h", "price_ohlcv_4h"), + ("mv_ohlcv_4h_to_1d", "price_ohlcv_1d"), + ("mv_ohlcv_1d_to_1w", "price_ohlcv_1w"), + ("mv_ohlcv_1w_to_1M", "price_ohlcv_1M"), +]; + +/// Trigger an immediate refresh of one MV and block until its target reflects +/// the expected value of `metric_expr` (an aggregate over the target `FINAL`). +/// Deterministic stand-in for waiting on the `REFRESH EVERY` schedule. +async fn refresh_until( + client: &Client, + db: &str, + mv: &str, + target: &str, + metric_expr: &str, + want: f64, +) { + client + .query(&format!("SYSTEM REFRESH VIEW {db}.{mv}")) + .execute() + .await + .unwrap_or_else(|e| panic!("refresh {mv}: {e}")); + + for _ in 0..40 { + let got: f64 = client + .query(&format!( + "SELECT toFloat64({metric_expr}) FROM {db}.{target} FINAL" + )) + .fetch_one() + .await + .unwrap_or_else(|e| panic!("poll {target}: {e}")); + if (got - want).abs() < 1e-6 { + return; + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + panic!("{target}: {metric_expr} never reached {want} after refresh of {mv}"); +} + +/// Drive the whole chain front-to-back, waiting on `metric_expr == want` at +/// each grain before refreshing the next (the coarser MV reads this grain +/// `FINAL`, so it must be settled first). +async fn drive_chain(client: &Client, db: &str, metric_expr: &str, want: f64) { + for (mv, target) in CHAIN { + refresh_until(client, db, mv, target, metric_expr, want).await; + } +} + +/// Assert the single rolled bucket at `target`. The bucket is fixed across the +/// test (3 minutes of `volume_base = 10`, `volume_quote = 50`, `trade_count = +/// 1`), so `volume_base = 30` / `volume_quote = 150` / `trade_count = 3` and the +/// OHLC corners are invariants — only `volume_quote_usd` and the projected +/// `version` change with enrichment, so those are the parameters. One row per +/// grain (FINAL), so every property is checked against the post-dedup winner. +async fn assert_bucket( + client: &Client, + db: &str, + target: &str, + want_vqusd: f64, + want_version: u64, +) { + let n: u64 = client + .query(&format!("SELECT count() FROM {db}.{target} FINAL")) + .fetch_one() + .await + .unwrap_or_else(|e| panic!("count {target}: {e}")); + assert_eq!( + n, 1, + "{target}: expected exactly one rolled bucket, got {n}" + ); + + let (vbase, vquote, vqusd, open, high, low, close, tc, version): ( + f64, + f64, + f64, + f64, + f64, + f64, + f64, + u32, + u64, + ) = client + .query(&format!( + "SELECT toFloat64(volume_base), toFloat64(volume_quote), toFloat64(volume_quote_usd), \ + toFloat64(open), toFloat64(high), toFloat64(low), toFloat64(close), \ + trade_count, version \ + FROM {db}.{target} FINAL" + )) + .fetch_one() + .await + .unwrap_or_else(|e| panic!("row {target}: {e}")); + + let approx = |a: f64, b: f64, what: &str| { + assert!( + (a - b).abs() < 1e-6, + "{target}: {what} expected {b}, got {a}" + ); + }; + // Volumes: sum of the whole bucket — proves no multi-row under-count and, + // post-enrichment, no double-count. + approx(vbase, 30.0, "volume_base"); + approx(vquote, 150.0, "volume_quote"); + approx(vqusd, want_vqusd, "volume_quote_usd"); + // OHLC: argMin(open)/max(high)/min(low)/argMax(close) over the 3 minutes, + // identical at every grain because they share one bucket. + approx(open, 1.00, "open (argMin by ts)"); + approx(high, 1.60, "high (max)"); + approx(low, 0.80, "low (min)"); + approx(close, 1.30, "close (argMax by ts)"); + assert_eq!(tc, 3, "{target}: trade_count"); + assert_eq!(version, want_version, "{target}: projected version"); +} + +#[tokio::test] +#[ignore = "requires a local ClickHouse (docker compose up -d clickhouse)"] +async fn enrichment_propagates_through_full_rollup_chain() { + let db = "it_rollup_chain"; + let admin = Client::default().with_url(ch_url()); + + // Fresh scratch DB with the real init schema (base tables) ... + admin + .query(&format!("DROP DATABASE IF EXISTS {db}")) + .execute() + .await + .unwrap(); + admin + .query(&format!("CREATE DATABASE {db}")) + .execute() + .await + .unwrap(); + prices_clickhouse::apply_sql(&admin, &rewrite(prices_clickhouse::INIT_SQL, db)) + .await + .expect("apply init schema"); + + // ... and the REAL production rollup chain (needs the experimental flag on + // builds where refreshable MVs are still gated, matching current_mv_it.rs). + let mv_client = admin + .clone() + .with_option("allow_experimental_refreshable_materialized_view", "1"); + prices_clickhouse::apply_sql(&mv_client, &rewrite(prices_clickhouse::ROLLUPS_SQL, db)) + .await + .expect("create rollup MV chain"); + + // ---- Phase 1: un-enriched (volume_quote_usd = 0), roll up the full chain. + admin + .query(&insert_bucket(db, "0", 1)) + .execute() + .await + .expect("insert un-enriched _1m bucket"); + + // Sanity: _1m FINAL already holds the full bucket (3 rows summed). + let one_min_vbase: f64 = admin + .query(&format!( + "SELECT toFloat64(sum(volume_base)) FROM {db}.price_ohlcv_1m FINAL" + )) + .fetch_one() + .await + .unwrap(); + assert!( + (one_min_vbase - 30.0).abs() < 1e-6, + "_1m FINAL volume_base should be 30, got {one_min_vbase}" + ); + + drive_chain(&admin, db, "sum(volume_base)", 30.0).await; + + // Every grain reflects the full summed bucket; USD volume still 0 pre-enrich. + for (_, target) in CHAIN { + assert_bucket(&admin, db, target, 0.0, 1).await; + } + + // ---- Phase 2: enrichment re-INSERT — fill volume_quote_usd, bump version. + admin + .query(&insert_bucket(db, "100", 2)) + .execute() + .await + .expect("insert enriched _1m bucket"); + + // _1m FINAL dedups to the enriched 3 rows: USD volume = 300, base STILL 30. + let (m_vbase, m_vqusd): (f64, f64) = admin + .query(&format!( + "SELECT toFloat64(sum(volume_base)), toFloat64(sum(volume_quote_usd)) \ + FROM {db}.price_ohlcv_1m FINAL" + )) + .fetch_one() + .await + .unwrap(); + assert!( + (m_vbase - 30.0).abs() < 1e-6, + "_1m FINAL volume_base must stay 30 after enrichment (no double-count), got {m_vbase}" + ); + assert!( + (m_vqusd - 300.0).abs() < 1e-6, + "_1m FINAL volume_quote_usd should be 300 after enrichment, got {m_vqusd}" + ); + + drive_chain(&admin, db, "sum(volume_quote_usd)", 300.0).await; + + // The enriched value wins at EVERY grain, volumes are NOT double-counted + // (volume_base still 30, not 60), and the projected version advanced 1 → 2. + for (_, target) in CHAIN { + assert_bucket(&admin, db, target, 300.0, 2).await; + } + + admin + .query(&format!("DROP DATABASE {db}")) + .execute() + .await + .unwrap(); +} + +/// The full-range `preroll.sql` path (backfill / sizing — task 0060) shares the +/// rollup SELECT and therefore the same `argMin/argMax` correctness contract. +/// One deterministic pass over a multi-row bucket must produce the true +/// first-open / last-close at EVERY grain (the bug task 0059's full-chain test +/// surfaced: the `AS timestamp` bucket alias shadowing the source column). +#[tokio::test] +#[ignore = "requires a local ClickHouse (docker compose up -d clickhouse)"] +async fn preroll_reaggregates_full_chain_ohlc_correctly() { + let db = "it_preroll_chain"; + let admin = Client::default().with_url(ch_url()); + + admin + .query(&format!("DROP DATABASE IF EXISTS {db}")) + .execute() + .await + .unwrap(); + admin + .query(&format!("CREATE DATABASE {db}")) + .execute() + .await + .unwrap(); + prices_clickhouse::apply_sql(&admin, &rewrite(prices_clickhouse::INIT_SQL, db)) + .await + .expect("apply init schema"); + + admin + .query(&insert_bucket(db, "100", 1)) + .execute() + .await + .expect("insert _1m bucket"); + + // preroll is a plain front-to-back INSERT…SELECT chain (no refresh): one + // apply populates _15m … _1M from _1m FINAL. + prices_clickhouse::apply_sql(&admin, &rewrite(prices_clickhouse::PREROLL_SQL, db)) + .await + .expect("run preroll chain"); + + for (_, target) in CHAIN { + // open=1.00 (first minute), close=1.30 (last) — the argMin/argMax-by-time + // properties; volumes summed across the 3 minutes, version carried. + assert_bucket(&admin, db, target, 300.0, 1).await; + } + + admin + .query(&format!("DROP DATABASE {db}")) + .execute() + .await + .unwrap(); +} From 3c2889161701b01aa52df879f9cc97604fefa6b7 Mon Sep 17 00:00:00 2001 From: karczuRF Date: Fri, 26 Jun 2026 13:10:43 +0200 Subject: [PATCH 2/3] docs(lore-0059): record full-chain verification + spawn 0071 Mark 0059's remaining ACs proven against the real 0051 DDL, document the argMin/argMax timestamp-shadowing finding (Emerged), and spawn 0071 to re-apply the corrected MV/preroll DDL to the live ch-prod-01 cluster (the buggy DDL was applied under 0051). --- .../README.md | 93 +++++++++++++++++-- ...ly-corrected-rollup-mv-ddl-to-live-prod.md | 70 ++++++++++++++ 2 files changed, 153 insertions(+), 10 deletions(-) create mode 100644 lore/1-tasks/backlog/0071_BUG_reapply-corrected-rollup-mv-ddl-to-live-prod.md diff --git a/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md b/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md index 7db723b..64e8ad2 100644 --- a/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md +++ b/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md @@ -93,6 +93,25 @@ history: this task verifies now exists. Moving back to active to land the remaining ACs: the full _15m…_1M chain integration test + extending the proof harness against the real rollups.sql DDL. + - date: 2026-06-26 + status: active + who: oski + note: > + **Remaining ACs delivered + a real bug fixed.** Authored + packages/prices-clickhouse/tests/rollup_chain_it.rs — a full-chain + (_1m → _15m → … → _1M) integration test driving the REAL shipped + rollups.sql, plus a preroll.sql full-range pass; both green vs docker + CH 25.6. The test surfaced a correctness bug in the as-shipped + rollups.sql AND preroll.sql: `toStartOfInterval(timestamp,…) AS + timestamp` shadows the source column, so argMin(open)/argMax(close)/ + argMax(close_usd) tie-break to an arbitrary row (volumes/high/low were + fine). Fixed both (FROM … AS t + qualified t.timestamp) and the + schema-overview §3.2 reference DDL; current.sql was already correct. + The buggy DDL is live on ch-prod-01 (applied under 0051) → spawned + 0071 to re-apply. All four ACs now [x]. Verified the shipped + max(version) projection is correct for the true-refreshable replace-mode + chain that 0051 actually shipped (differs from the G-note's APPEND/ + sum(version) lock-in). --- # MV rollup-chain version propagation under enriched `_1m` re-inserts @@ -172,14 +191,68 @@ contract; also implies `_1m` retention ≥ widest rollup refresh window. ## Acceptance Criteria -- [~] MV chain projects a `version` that lets an enriched `_1m` - re-insert win at every rolled-up granularity — **semantics decided + - proven** (`sum(version)`/refresh-epoch, not `max(version)`); production - DDL lands in 0051 -- [~] No double-count / under-count of `volume_quote_usd` in `_15m … _1M` - after an enrichment pass — **proven on `_1m → _15m`** (re-aggregate from - `_1m FINAL`); full chain `_15m … _1M` verified once 0051 lands the DDL -- [~] Integration test covering write → roll up → enrich → assert across - all granularities (`FINAL`) — **proof harness exists** (`proof/`, 1 hop); - extend to all grains against the real 0051 DDL +- [x] MV chain projects a `version` that lets an enriched `_1m` + re-insert win at every rolled-up granularity — **proven against the real + 0051 DDL** across the full `_1m → _15m → … → _1M` chain + (`tests/rollup_chain_it.rs`): the shipped chain is a *true* refreshable MV + in replace mode (atomic target swap), so `max(version)` is sufficient and + the enriched row wins at every grain (version advances 1 → 2). +- [x] No double-count / under-count of `volume_quote_usd` in `_15m … _1M` + after an enrichment pass — **proven on the full chain**: `volume_base` + stays 30 (no double-count) and `volume_quote_usd` propagates 0 → 300 at + every grain after the enrichment re-insert + refresh. +- [x] Integration test covering write → roll up → enrich → assert across + all granularities (`FINAL`) — **`tests/rollup_chain_it.rs`** drives the + real `rollups.sql` chain end-to-end (+ a `preroll.sql` full-range pass), + green against docker CH 25.6. - [x] 0026 G-note dependency note resolved / cross-linked + +## Implementation Notes + +Full-chain integration test landed at +`packages/prices-clickhouse/tests/rollup_chain_it.rs` (two `#[ignore]` tests, +scratch-DB isolated like `views_it.rs`, driven deterministically via +`SYSTEM REFRESH VIEW` + poll like `current_mv_it.rs`). It applies the **real** +shipped `INIT_SQL` + `ROLLUPS_SQL`, rolls a 3-minute bucket up all six grains, +then enrichment-re-inserts (`version+1`, `volume_quote_usd` filled) and +re-drives the chain, asserting OHLCV + version at every grain `FINAL`. + +**Bug found and fixed (see Design Decisions → Emerged).** The as-shipped +`rollups.sql` / `preroll.sql` mis-computed `open`/`close`/`close_usd`. Fixed in +both schema files + the schema-overview §3.2 reference DDL. The buggy DDL is +live on `ch-prod-01` (applied under 0051) → re-apply spawned as **0071**. + +## Design Decisions + +### Emerged + +1. **`AS timestamp` bucket alias shadowed the source column (correctness bug).** + `toStartOfInterval(timestamp, …) AS timestamp` makes the bare `timestamp` + inside `argMin(open, …)` / `argMax(close, …)` / `argMax(close_usd, …)` + resolve to the *bucket-start* (constant within a bucket), so O/C/close_usd + tie-break to an arbitrary row instead of the true first/last by time. Volumes + (`sum`), `high`/`low` are unaffected. The 0059 desk proof only checked + volumes, so it never surfaced this. **Fix:** `FROM … AS t` + qualified + `t.timestamp` in `rollups.sql` and `preroll.sql`. `current.sql` was already + correct (it uses `AS c` + `c.timestamp`). + +2. **`max(version)` accepted (not `sum(version)`).** The G-note locked in + "APPEND + `sum(version)`", but 0051 actually shipped a *true* refreshable MV + in **replace** mode (atomic target swap) + bounded window, with `preroll.sql` + as the separate full-range historical path. Under atomic replace there is no + `ReplacingMergeTree` version tie to lose, so the shipped `max(version)` is + correct — the integration test confirms the enriched row wins at every grain. + The test asserts the shipped semantics rather than re-litigating the G-note. + +## Issues Encountered + +- **MV → target column mapping is positional, not by-name** (verified with a + swapped-alias `INSERT … SELECT` probe), so qualifying the source column while + keeping the output column aliased `timestamp` is safe. +- `prefer_column_name_to_alias=1` is **not** a viable fix — it makes `GROUP BY + timestamp` bind to the raw column, shattering the bucket into per-minute rows. + +## Future Work + +- **0071** (spawned) — re-apply the corrected `rollups.sql` / `preroll.sql` to + the live `ch-prod-01` cluster (the buggy DDL is already deployed under 0051). diff --git a/lore/1-tasks/backlog/0071_BUG_reapply-corrected-rollup-mv-ddl-to-live-prod.md b/lore/1-tasks/backlog/0071_BUG_reapply-corrected-rollup-mv-ddl-to-live-prod.md new file mode 100644 index 0000000..552e420 --- /dev/null +++ b/lore/1-tasks/backlog/0071_BUG_reapply-corrected-rollup-mv-ddl-to-live-prod.md @@ -0,0 +1,70 @@ +--- +id: "0071" +title: "Re-apply corrected rollup/preroll DDL to live ch-prod-01 (argMin/argMax timestamp-shadowing fix)" +type: BUG +status: backlog +related_adr: ["0007"] +related_tasks: ["0059", "0051"] +tags: [layer-database, priority-high, effort-small, clickhouse, materialized-views, rollups, operations] +links: + - "../../../packages/prices-clickhouse/schema/rollups.sql" + - "../../../packages/prices-clickhouse/schema/preroll.sql" +history: + - date: 2026-06-26 + status: backlog + who: oski + note: > + Spawned from 0059. The 0059 full-chain integration test surfaced an + OHLC-correctness bug in the as-shipped rollups.sql / preroll.sql: the + `toStartOfInterval(timestamp, …) AS timestamp` bucket alias shadows the + source `timestamp` column, so argMin(open)/argMax(close)/argMax(close_usd) + tie-break to an arbitrary row instead of the true first/last by time. The + schema files are fixed (FROM … AS t + qualified t.timestamp), but the + BUGGY DDL was already applied LIVE on ch-prod-01 under 0051 (2026-06-22). + The six refreshable MVs in prices.* must be re-created from the corrected + rollups.sql. Deferred to a deploy-capable session (prepare-not-deploy). +--- + +# Re-apply corrected rollup/preroll DDL to live ch-prod-01 + +## Summary + +The live `prices.*` refreshable-MV rollup chain on `ch-prod-01` was created +from a version of `schema/rollups.sql` that mis-computes `open` / `close` / +`close_usd` (the argMin/argMax-by-time aggregates). Task 0059 fixed the schema +files; this task re-applies the fix to the running cluster. + +## Context + +The bug (task 0059, full-chain integration test `rollup_chain_it.rs`): the +bucket key `toStartOfInterval(timestamp, …) AS timestamp` **shadows** the source +`timestamp` column, so `argMin(open, timestamp)` / `argMax(close, timestamp)` / +`argMax(close_usd, timestamp)` evaluate against the constant bucket-start and +tie-break to an arbitrary row. Volumes (`sum`), `high` (`max`), `low` (`min`) +are unaffected — only O/C/close_usd are wrong. Fixed by `FROM … AS t` + +qualified `t.timestamp`. `current.sql` was already correct (it uses `AS c` + +`c.timestamp`); only `rollups.sql` and `preroll.sql` were affected. + +Live apply provenance: task 0051 `notes/G-live-schema-state.md` (Route A, +`ssh … docker exec app-clickhouse-1 clickhouse-client --multiquery`, CH 26.3.10, +loopback `default` admin). + +## Implementation + +- `DROP VIEW` the six `prices.mv_ohlcv_*` MVs on `ch-prod-01`, then re-create + them from the corrected `schema/rollups.sql` (loopback `default` admin, same + Route A path as 0051). Dropping an MV does **not** touch its target table, so + no rollup rows are lost. +- Recompute any already-mis-rolled buckets: TRUNCATE + re-run the corrected + `preroll.sql` over the backfilled range, or let the bounded-window MVs + overwrite the recent window on their next refresh (replace mode). Decide based + on how much coarse data has accrued since 0051's live apply. +- Verify post-apply: spot-check that `open`/`close` at `_15m`+ match the true + first/last `_1m` close in a bucket (the assertion the integration test makes). + +## Acceptance Criteria + +- [ ] Six `prices.mv_ohlcv_*` MVs on ch-prod-01 re-created from corrected DDL +- [ ] Mis-rolled historical buckets recomputed (preroll or window refresh) +- [ ] Live spot-check confirms correct argMin-open / argMax-close at ≥ `_15m` +- [ ] Provenance appended to 0051 `notes/G-live-schema-state.md` (or a 0071 note) From b5a9f409957962a2bd682bf7f0cb5ecd04b34640 Mon Sep 17 00:00:00 2001 From: karczuRF Date: Fri, 26 Jun 2026 14:42:46 +0200 Subject: [PATCH 3/3] test(lore-0059): pin CH to prod version + deterministic rollup test anchor The full-chain rollup test ran against docker CH 25.6 while ch-prod-01 runs 26.3.10.60 (25.x -> 26.x gap), so "green locally" did not cover the engine the shipped DDL actually runs on. Pin docker-compose to the exact prod version 26.3.10.60; full prices-clickhouse suite re-verified green. Also make insert_bucket deterministic: it re-evaluated toStartOfInterval(now()) per INSERT, so a wall-clock 15-minute boundary crossing between the un-enriched and enriched batches anchored them to different buckets, breaking the ReplacingMergeTree dedup and the single-bucket assertions. bucket_anchor() now freezes the boundary once as an integer epoch (toUInt64(toUnixTimestamp(...)) -> toDateTime()), reused across every INSERT and robust across CH versions/timezones. --- docker-compose.yml | 6 ++- .../README.md | 28 +++++++++++ .../tests/rollup_chain_it.rs | 46 ++++++++++++++++--- 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f816166..5bd1559 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,10 @@ services: clickhouse: - image: clickhouse/clickhouse-server:25.6 + # Pinned to the EXACT version running on ch-prod-01 (Hetzner) so local/CI + # tests exercise the same engine as production — refreshable-MV semantics, + # alias resolution and date/time functions can differ across CH releases. + # Before changing this, re-check the live `SELECT version()` on ch-prod-01. + image: clickhouse/clickhouse-server:26.3.10.60 ports: - '8123:8123' - '9000:9000' diff --git a/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md b/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md index 64e8ad2..2d4c9b3 100644 --- a/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md +++ b/lore/1-tasks/active/0059_FEATURE_mv-rollup-version-propagation-enriched-reinserts/README.md @@ -244,6 +244,34 @@ live on `ch-prod-01` (applied under 0051) → re-apply spawned as **0071**. correct — the integration test confirms the enriched row wins at every grain. The test asserts the shipped semantics rather than re-litigating the G-note. +3. **Pin the local/CI ClickHouse to the EXACT production version.** Code review + flagged that the new integration test was being validated against + `docker-compose.yml`'s `clickhouse-server:25.6` (`25.6.13.41`), while + `ch-prod-01` runs **`26.3.10.60`** (per the 0051 G-live-schema-state note and + 0063 provisioning verification) — a 25.x → 26.x major gap. Refreshable-MV + semantics, SQL alias resolution and date/time functions can all differ across + CH releases, so "green locally" was not evidence for the engine the DDL + actually runs on. **Decision:** pin `docker-compose.yml` to the exact prod + version `clickhouse/clickhouse-server:26.3.10.60` and re-run the full + `prices-clickhouse` suite against it (rollup_chain, current_mv, views — all + green). Policy going forward: local/CI must match the live `ch-prod-01` + version; re-check `SELECT version()` on prod before bumping the pin. (This + strengthens — but does not replace — the live re-verify owed by **0071** on + the production cluster itself.) + +4. **Flaky test anchor made deterministic + version-robust.** The first cut of + `insert_bucket` embedded `toStartOfInterval(now(), …)`, re-evaluated + server-side at *each* INSERT; the un-enriched and enriched batches are + separate INSERTs with the whole chain-drive between them, so a wall-clock + 15-minute boundary crossing would anchor them to different buckets → no + `ReplacingMergeTree` dedup → `_1m FINAL` keeps all 6 rows and the + single-bucket / no-double-count assertions fail. **Fix:** `bucket_anchor()` + fetches the boundary **once** as a `toUInt64(toUnixTimestamp(…))` integer + epoch and rebuilds it as `toDateTime()`, reused for every INSERT. The epoch + round-trip (not `formatDateTime`, whose `%i`/`%M` specifiers are a portability + liability across CH versions) keeps it correct regardless of server version or + timezone. + ## Issues Encountered - **MV → target column mapping is positional, not by-name** (verified with a diff --git a/packages/prices-clickhouse/tests/rollup_chain_it.rs b/packages/prices-clickhouse/tests/rollup_chain_it.rs index 08f8d2b..481eaee 100644 --- a/packages/prices-clickhouse/tests/rollup_chain_it.rs +++ b/packages/prices-clickhouse/tests/rollup_chain_it.rs @@ -37,17 +37,44 @@ fn rewrite(sql: &str, db: &str) -> String { .replace("IF NOT EXISTS prices", &format!("IF NOT EXISTS {db}")) } +/// Fetch a single fixed 15-minute bucket boundary from the server, returned as +/// a `toDateTime('…')` SQL literal. Computed ONCE per test and reused for every +/// INSERT (see [`insert_bucket`]) so the un-enriched and enriched batches share +/// identical row timestamps — i.e. identical PKs. If each INSERT re-evaluated +/// `toStartOfInterval(now(), …)` instead, a wall-clock 15-minute boundary +/// crossing between the two batches (the chain drive in between can take tens of +/// seconds) would anchor them to different buckets: the enrichment re-INSERT +/// would no longer dedup against the original under `ReplacingMergeTree(version)`, +/// leaving `_1m FINAL` with all 6 rows (volume_base 60, two `_15m` buckets) and +/// breaking the single-bucket / no-double-count assertions. +async fn bucket_anchor(client: &Client) -> String { + // Freeze the boundary as an integer epoch and rebuild it with `toDateTime`. + // Round-tripping through a Unix timestamp (not `formatDateTime`/string + // parsing) keeps this independent of format-specifier and timezone + // behaviour, which need not be identical between the local/CI ClickHouse and + // the older/newer server version running on the cluster. + let epoch: u64 = client + .query("SELECT toUInt64(toUnixTimestamp(toStartOfInterval(now(), INTERVAL 15 MINUTE)))") + .fetch_one() + .await + .expect("fetch bucket anchor"); + format!("toDateTime({epoch})") +} + /// One 15-minute bucket of three per-minute `_1m` rows for a single /// `(asset 1, quote 2, sdex)` series, all anchored to the PREVIOUS completed -/// 15-minute bucket (12–14 min before the current boundary) so they: +/// 15-minute bucket (12–14 min before `anchor`, a fixed boundary from +/// [`bucket_anchor`]) so they: /// - share one `_15m`/`_1h`/.../`_1M` bucket at every grain, and /// - sit comfortably inside every rollup `WHERE timestamp >= now() - …` /// window (the tightest is `_15m`'s 2 HOUR). /// +/// `anchor` is reused across every INSERT in a test so enrichment re-INSERTs +/// share the original rows' timestamps (PKs) — see [`bucket_anchor`]. /// `vqusd` is the per-row `volume_quote_usd` (0 = un-enriched, >0 = enriched); /// `version` is the `ReplacingMergeTree(version)` discriminator. -fn insert_bucket(db: &str, vqusd: &str, version: u64) -> String { - let b = "toStartOfInterval(now(), INTERVAL 15 MINUTE)"; +fn insert_bucket(db: &str, anchor: &str, vqusd: &str, version: u64) -> String { + let b = anchor; format!( "INSERT INTO {db}.price_ohlcv_1m \ (timestamp, asset_id, quote_asset_id, source, open, high, low, close, \ @@ -206,9 +233,14 @@ async fn enrichment_propagates_through_full_rollup_chain() { .await .expect("create rollup MV chain"); + // One fixed bucket boundary, reused by BOTH the un-enriched and the enriched + // INSERT so the enrichment re-INSERT dedups against the original (same PKs) + // even if wall-clock crosses a 15-minute boundary mid-test. + let anchor = bucket_anchor(&admin).await; + // ---- Phase 1: un-enriched (volume_quote_usd = 0), roll up the full chain. admin - .query(&insert_bucket(db, "0", 1)) + .query(&insert_bucket(db, &anchor, "0", 1)) .execute() .await .expect("insert un-enriched _1m bucket"); @@ -234,8 +266,9 @@ async fn enrichment_propagates_through_full_rollup_chain() { } // ---- Phase 2: enrichment re-INSERT — fill volume_quote_usd, bump version. + // Same `anchor` as Phase 1 → same timestamps → ReplacingMergeTree dedup. admin - .query(&insert_bucket(db, "100", 2)) + .query(&insert_bucket(db, &anchor, "100", 2)) .execute() .await .expect("insert enriched _1m bucket"); @@ -298,8 +331,9 @@ async fn preroll_reaggregates_full_chain_ohlc_correctly() { .await .expect("apply init schema"); + let anchor = bucket_anchor(&admin).await; admin - .query(&insert_bucket(db, "100", 1)) + .query(&insert_bucket(db, &anchor, "100", 1)) .execute() .await .expect("insert _1m bucket");