From 71e300e16d27434a40ddecd8c2da25f35f9e4b5a Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Fri, 22 May 2026 11:35:47 +0100 Subject: [PATCH] persist journal hashes to hot storage --- Cargo.toml | 9 + crates/block-processor/src/v1/processor.rs | 12 +- crates/node-tests/Cargo.toml | 1 + crates/node-tests/tests/basic.rs | 67 +++- crates/node-tests/tests/db.rs | 59 +++- crates/node-tests/tests/multiple-blocks.rs | 6 - crates/node-tests/tests/reorg.rs | 65 ++-- crates/node/src/node.rs | 359 +++++++++++++++------ 8 files changed, 431 insertions(+), 147 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c74529c7..989dfa52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,3 +126,12 @@ url = "2.5.4" # Test Utils tempfile = "3.17.0" + +[patch.crates-io] +signet-storage = { git = "https://github.com/init4tech/storage.git", branch = "main" } +signet-cold = { git = "https://github.com/init4tech/storage.git", branch = "main" } +signet-cold-sql = { git = "https://github.com/init4tech/storage.git", branch = "main" } +signet-hot = { git = "https://github.com/init4tech/storage.git", branch = "main" } +signet-hot-mdbx = { git = "https://github.com/init4tech/storage.git", branch = "main" } +signet-cold-mdbx = { git = "https://github.com/init4tech/storage.git", branch = "main" } +signet-storage-types = { git = "https://github.com/init4tech/storage.git", branch = "main" } diff --git a/crates/block-processor/src/v1/processor.rs b/crates/block-processor/src/v1/processor.rs index 30f89cc7..21e8cad7 100644 --- a/crates/block-processor/src/v1/processor.rs +++ b/crates/block-processor/src/v1/processor.rs @@ -259,12 +259,12 @@ where // `Sealed
`; unseal the `SignetHeaderV1` wrapper before // handing it off. ExecutedBlockBuilder::new() - .header(header.into_inner()) - .bundle(bundle) - .transactions(transactions) - .receipts(receipts) - .signet_events(signet_events) - .zenith_header(zenith_header) + .with_header(header.into_inner()) + .with_bundle(bundle) + .with_transactions(transactions) + .with_receipts(receipts) + .with_signet_events(signet_events) + .with_zenith_header(zenith_header) .build() .wrap_err("failed to build ExecutedBlock") } diff --git a/crates/node-tests/Cargo.toml b/crates/node-tests/Cargo.toml index 9777d03a..3f65228c 100644 --- a/crates/node-tests/Cargo.toml +++ b/crates/node-tests/Cargo.toml @@ -39,3 +39,4 @@ tracing-subscriber.workspace = true [dev-dependencies] serde_json.workspace = true serial_test = "3.2.0" +signet-journal.workspace = true diff --git a/crates/node-tests/tests/basic.rs b/crates/node-tests/tests/basic.rs index 30065185..d7736768 100644 --- a/crates/node-tests/tests/basic.rs +++ b/crates/node-tests/tests/basic.rs @@ -9,10 +9,14 @@ use alloy::{ rpc::types::eth::{AccessList, AccessListItem, TransactionRequest}, signers::Signer, }; +use core::{sync::atomic::Ordering, time::Duration}; use serial_test::serial; use signet_constants::{KnownChains, RollupPermitted}; use signet_genesis::GenesisSpec; -use signet_node_tests::{HostBlockSpec, run_test, utils::adjust_usd_decimals}; +use signet_node_tests::{ + HostBlockSpec, NotificationWithSidecars, SignetTestContext, run_test, + utils::adjust_usd_decimals, +}; const SOME_USER: Address = Address::repeat_byte(0x39); @@ -43,6 +47,17 @@ async fn test_simple_enter() { #[tokio::test] async fn test_basic_reorg() { run_test(|ctx| async move { + // Reorg to height 0 is unsupported (the journal chain's ring buffer never stores + // genesis); `on_host_revert` bails when storage would be wiped to 0. Process an + // unrelated warmup block at height 1 first so the revert only takes us back to + // height 1, not height 0. + let warmup = HostBlockSpec::new(ctx.constants()).enter_token( + Address::repeat_byte(0x40), + 1, + ctx.constants().host().tokens().usdc(), + ); + ctx.process_block(warmup).await.unwrap(); + let mut bal = ctx.track_balance(SOME_USER, Some("user")); let enter_amnt = 31999; @@ -61,10 +76,60 @@ async fn test_basic_reorg() { ctx.revert_block(block).await.unwrap(); bal.assert_decrease_exact(change); + + // Process a fresh block on top of the surviving warmup. This exercises the post-revert + // journal-hash continuity path: `previous_journal_hash` must read the persisted hash + // from storage at height 1 so the chain can validate the replacement journal at + // height 2 as a `Reorg`. If persistence is broken, this would fail with + // `PreviousHashMismatch` or `ReorgParentEvicted`. + let replacement_amnt = 12345; + let replacement = HostBlockSpec::new(ctx.constants()).enter_token( + SOME_USER, + replacement_amnt, + ctx.constants().host().tokens().usdc(), + ); + ctx.process_block(replacement).await.unwrap(); + bal.assert_increase_exact(adjust_usd_decimals(replacement_amnt, 6)); }) .await; } +// Run directly (not via `run_test`) because the node task is expected to terminate with an +// error and `run_test`'s wrapper would convert that into a test failure. `ctx.revert_block` +// also isn't usable here - it polls the RPC after the revert, but the bail tears the RPC +// down before that poll can complete. +#[serial] +#[tokio::test] +async fn test_revert_to_genesis_bails() { + let (ctx, signet) = SignetTestContext::new().await; + + // Process a single block so the journal chain's ring buffer holds a tip but does not yet + // contain an anchor at height 0. + let block = HostBlockSpec::new(ctx.constants()).enter_token( + SOME_USER, + 1, + ctx.constants().host().tokens().usdc(), + ); + let for_revert = block.clone(); + ctx.process_block(block).await.unwrap(); + + // Send the revert directly so we don't depend on RPC liveness after the bail. Storage + // drains to 0, tags rewind, reorg fires, then the node bails because the chain cannot + // anchor the next post-revert journal at `target == 0`. Mirror `revert_block`'s height + // bookkeeping: `fetch_sub` returns the pre-decrement height (= the block being reverted) + // and rewinds `ctx.height` so any post-bail inspection sees a consistent value. + for_revert.set_block_number(ctx.height.fetch_sub(1, Ordering::SeqCst)); + ctx.send_notification(NotificationWithSidecars::revert_single_block(for_revert)).await; + + let join_result = tokio::time::timeout(Duration::from_secs(10), signet) + .await + .expect("node did not bail within 10s") + .expect("node task panicked"); + let error = join_result.expect_err("expected the node to bail after revert-to-genesis"); + let rendered = format!("{error:#}"); + assert!(rendered.contains("ring buffer no longer holds"), "unexpected error: {rendered}"); +} + #[serial] #[tokio::test] async fn test_genesis_allocs() { diff --git a/crates/node-tests/tests/db.rs b/crates/node-tests/tests/db.rs index a26824ec..46b01159 100644 --- a/crates/node-tests/tests/db.rs +++ b/crates/node-tests/tests/db.rs @@ -1,12 +1,14 @@ +use alloy::primitives::Address; use serial_test::serial; use signet_cold::mem::MemColdBackend; use signet_hot::{ db::{HotDbRead, UnsafeDbWrite}, mem::MemKv, }; +use signet_journal::GENESIS_JOURNAL_HASH; use signet_node::SignetNodeBuilder; use signet_node_config::test_utils::test_config; -use signet_node_tests::TestHostNotifier; +use signet_node_tests::{HostBlockSpec, TestHostNotifier, run_test}; use signet_rpc::{ServeConfig, StorageRpcConfig}; use signet_storage::{CancellationToken, HistoryRead, HistoryWrite, HotKv, UnifiedStorage}; use std::sync::Arc; @@ -68,5 +70,60 @@ async fn test_genesis() { assert_eq!(header.parent_hash, zero_hash); assert_eq!(header.base_fee_per_gas, Some(0x3b9aca00)); + // Genesis is loaded outside the producer's journal-emit path, so the `JournalHashes` table + // has no entry at height 0 - `previous_journal_hash` falls back to `GENESIS_JOURNAL_HASH`. + assert_eq!(reader.get_journal_hash(0).unwrap(), None); + cancel_token.cancel(); } + +#[serial] +#[tokio::test] +async fn test_journal_hash_persisted_after_process_block() { + run_test(|ctx| async move { + // Sanity: nothing persisted at genesis. + let reader = ctx.storage.reader().unwrap(); + assert_eq!(reader.get_journal_hash(0).unwrap(), None); + assert_eq!(reader.get_journal_hash(1).unwrap(), None); + drop(reader); + + let block = HostBlockSpec::new(ctx.constants()).enter_token( + Address::repeat_byte(0x77), + 100, + ctx.constants().host().tokens().usdc(), + ); + ctx.process_block(block).await.unwrap(); + + // After processing block 1, the producer's encoded journal hash must be persisted in + // hot storage so a restart can seed `previous_journal_hash` from it. + let hash_1 = { + let reader = ctx.storage.reader().unwrap(); + let hash = reader + .get_journal_hash(1) + .unwrap() + .expect("journal hash for block 1 was not persisted"); + assert_ne!( + hash, GENESIS_JOURNAL_HASH, + "first journal hash must not equal genesis sentinel" + ); + hash + }; + + // Processing a second block should chain off the first - persistence at block 2 must + // also succeed, and the two hashes must differ. + let block = HostBlockSpec::new(ctx.constants()).enter_token( + Address::repeat_byte(0x77), + 200, + ctx.constants().host().tokens().usdc(), + ); + ctx.process_block(block).await.unwrap(); + + let reader = ctx.storage.reader().unwrap(); + let hash_2 = reader + .get_journal_hash(2) + .unwrap() + .expect("journal hash for block 2 was not persisted"); + assert_ne!(hash_1, hash_2); + }) + .await; +} diff --git a/crates/node-tests/tests/multiple-blocks.rs b/crates/node-tests/tests/multiple-blocks.rs index c394e196..f711a83b 100644 --- a/crates/node-tests/tests/multiple-blocks.rs +++ b/crates/node-tests/tests/multiple-blocks.rs @@ -199,9 +199,6 @@ async fn test_write_account_histories_with_empty_block() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_write_account_histories_with_reorg_and_empty_blocks() { run_test(|ctx| async move { let ctx = setup_accounts_history(ctx).await; @@ -415,9 +412,6 @@ async fn test_historical_state_provider_with_empty_blocks() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_historical_state_provider_with_reorg() { run_test(|ctx| async move { let ctx = setup_accounts_history(ctx).await; diff --git a/crates/node-tests/tests/reorg.rs b/crates/node-tests/tests/reorg.rs index 7639a183..6fcd9dca 100644 --- a/crates/node-tests/tests/reorg.rs +++ b/crates/node-tests/tests/reorg.rs @@ -39,18 +39,16 @@ async fn process_increment(ctx: &SignetTestContext, contract_address: Address) - #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_block_tags_reorg() { run_test(|ctx| async move { - // Process two blocks via enter events. + // Reorg to height 0 is unsupported (the journal chain's ring buffer never stores + // genesis); `on_host_revert` bails when storage would be wiped to 0, so this test + // only reverts down to height 1. let block1 = HostBlockSpec::new(ctx.constants()).enter_token( SOME_USER, 1000, ctx.constants().host().tokens().usdc(), ); - let block1_clone = block1.clone(); ctx.process_block(block1).await.unwrap(); let block2 = HostBlockSpec::new(ctx.constants()).enter_token( @@ -61,35 +59,41 @@ async fn test_block_tags_reorg() { let block2_clone = block2.clone(); ctx.process_block(block2).await.unwrap(); - assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2); + let block3 = HostBlockSpec::new(ctx.constants()).enter_token( + SOME_USER, + 3000, + ctx.constants().host().tokens().usdc(), + ); + let block3_clone = block3.clone(); + ctx.process_block(block3).await.unwrap(); - // Revert block 2. + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 3); + + // Revert block 3, then block 2; stop above height 1 (see comment above). + ctx.revert_block(block3_clone).await.unwrap(); + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2); ctx.revert_block(block2_clone).await.unwrap(); assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 1); - // Revert block 1. - ctx.revert_block(block1_clone).await.unwrap(); - assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 0); - - // Rebuild two new blocks. - let new_block1 = HostBlockSpec::new(ctx.constants()).enter_token( + // Rebuild two new blocks on top of the surviving block 1. + let new_block2 = HostBlockSpec::new(ctx.constants()).enter_token( SOME_USER, 500, ctx.constants().host().tokens().usdc(), ); - ctx.process_block(new_block1).await.unwrap(); - assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 1); + ctx.process_block(new_block2).await.unwrap(); + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2); - let new_block2 = HostBlockSpec::new(ctx.constants()).enter_token( + let new_block3 = HostBlockSpec::new(ctx.constants()).enter_token( SOME_USER, 600, ctx.constants().host().tokens().usdc(), ); - ctx.process_block(new_block2).await.unwrap(); - assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2); + ctx.process_block(new_block3).await.unwrap(); + assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 3); - // Verify the new block 2 is accessible. - let block = ctx.alloy_provider.get_block_by_number(2.into()).await.unwrap(); + // Verify the new block 3 is accessible. + let block = ctx.alloy_provider.get_block_by_number(3.into()).await.unwrap(); assert!(block.is_some()); }) .await; @@ -101,9 +105,6 @@ async fn test_block_tags_reorg() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_block_filter_reorg() { rpc_test(|ctx, contract| async move { // Install a block filter (starts after block 1, where contract was deployed). @@ -148,9 +149,6 @@ async fn test_block_filter_reorg() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_log_filter_reorg() { rpc_test(|ctx, contract| async move { // Install a log filter on the Counter address. @@ -201,9 +199,6 @@ async fn test_log_filter_reorg() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_block_subscription_reorg() { rpc_test(|ctx, contract| async move { let mut sub = ctx.alloy_provider.subscribe_blocks().await.unwrap(); @@ -236,9 +231,6 @@ async fn test_block_subscription_reorg() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_log_subscription_reorg() { rpc_test(|ctx, contract| async move { let mut sub = ctx @@ -405,9 +397,6 @@ async fn test_no_regression_filters_and_subscriptions() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_multi_block_reorg_log_filter() { rpc_test(|ctx, contract| async move { let addr = *contract.address(); @@ -463,9 +452,6 @@ async fn test_multi_block_reorg_log_filter() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_multi_block_reorg_log_subscription() { rpc_test(|ctx, contract| async move { let addr = *contract.address(); @@ -515,9 +501,6 @@ async fn test_multi_block_reorg_log_subscription() { #[serial] #[tokio::test] -#[ignore = "ENG-2017: needs producer-side journal-hash persistence to seed \ - previous_journal_hash on revert; without it the chain rejects the \ - first post-revert journal with PreviousHashMismatch."] async fn test_multiple_reorgs_between_polls() { rpc_test(|ctx, contract| async move { let addr = *contract.address(); diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 43cc4394..c88e6140 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -20,18 +20,16 @@ use signet_rpc::{ ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard, ServeConfig, StorageRpcConfig, }; -use signet_storage::{DrainedBlock, ExecutedBlock, HistoryRead, HotKv, HotKvRead, UnifiedStorage}; -use signet_types::{PairedHeights, constants::SignetSystemConstants}; -use std::{ - borrow::Cow, - fmt, - sync::{Arc, Mutex}, +use signet_storage::{ + DrainedBlock, ExecutedBlock, HistoryRead, HotDbRead, HotKv, HotKvRead, UnifiedStorage, }; +use signet_types::{PairedHeights, constants::SignetSystemConstants}; +use std::{borrow::Cow, fmt, sync::Arc}; use tokio::{ sync::{mpsc, watch}, task::{JoinError, JoinHandle}, }; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, warn}; use trevm::{ journal::{BundleStateIndex, JournalEncode}, revm::database::DBErrorMarker, @@ -91,17 +89,6 @@ where /// and lets the journal chain's ingestion task drain and exit cleanly. journal_sender: mpsc::Sender, - /// Hash of the previously emitted journal. Used as the `previous_hash` - /// field in the next block's `JournalMeta`. Seeded with - /// [`GENESIS_JOURNAL_HASH`] at startup. - // TODO(ENG-2017): persist this alongside the block in storage so it - // survives restarts, can be rewound on host revert, and is recoverable - // after mid-block failures between `encode_journal` and - // `emit_journal` / `append_blocks`. Until then the rolling hash is - // in-memory only and the chain will reject the first journal after - // any restart or revert with `PreviousHashMismatch`. - journal_previous_hash: Mutex, - /// Join handle for the journal chain's ingestion task. journal_task: Option>>, } @@ -175,7 +162,6 @@ where rpc_config, journal_chain_handle, journal_sender, - journal_previous_hash: Mutex::new(GENESIS_JOURNAL_HASH), journal_task: Some(journal_task), }; Ok((this, receiver)) @@ -270,7 +256,11 @@ where let Some(notification) = notification else { break Ok(()) }; match notification.wrap_err("error in host notifications stream") { Ok(notification) => { - if let Err(error) = self.process_notification(¬ification).await { + if let Err(error) = self + .on_notification(¬ification) + .await + .wrap_err("error while processing notification") + { break Err(error); } } @@ -291,30 +281,15 @@ where main_result.and(journal_result) } - /// Run [`Self::on_notification`] and, if state changed, refresh the - /// block tags from the notification's safe / finalized heights. - async fn process_notification( - &self, - notification: &HostNotification, - ) -> eyre::Result<()> { - let changed = self - .on_notification(notification) - .await - .wrap_err("error while processing notification")?; - if !changed { - return Ok(()); - } - let ru_height = self.last_rollup_block()?; - self.update_block_tags( - ru_height, - notification.safe_block_number, - notification.finalized_block_number, - ) - } - /// Runs on any notification received from the host. /// - /// Returns `true` if any rollup state changed. + /// Drives the full per-notification pipeline: revert (if any), committed chain (if any), + /// status-channel refresh, and the safe / finalized tag and `FinishedHeight` update. When + /// the revert step requests a shutdown - e.g. because the journal chain's ring buffer no + /// longer holds the post-revert tip - the local tag refresh and the host-bound + /// `FinishedHeight` still run so reth can prune to the post-revert finalized height before + /// the bail error propagates out of the main loop. A tag-update failure on the shutdown + /// path is logged but does not override the shutdown error. #[instrument(parent = None, skip_all, fields( reverted = notification.revert_range().map(|r| r.len()).unwrap_or_default(), committed = notification.committed_chain().map(|c| c.len()).unwrap_or_default(), @@ -322,18 +297,26 @@ where pub async fn on_notification( &self, notification: &HostNotification, - ) -> eyre::Result { + ) -> eyre::Result<()> { metrics::record_notification_received(notification); let mut changed = false; + let mut shutdown: Option = None; // NB: REVERTS MUST RUN FIRST if let Some(range) = notification.revert_range() { - changed |= + let outcome = self.on_host_revert(range).await.wrap_err("error encountered during revert")?; + changed |= outcome.changed; + shutdown = outcome.shutdown; } - if let Some(chain) = notification.committed_chain() { + // Skip committed-chain processing when a shutdown is pending: storage has been drained + // to a point the in-process journal chain can no longer anchor, so emitting a fresh + // journal would just fail validation downstream. + if shutdown.is_none() + && let Some(chain) = notification.committed_chain() + { changed |= self .process_committed_chain(chain) .await @@ -341,11 +324,33 @@ where } if changed { - self.update_status_channel()?; + let tag_result = self + .update_status_channel() + .and_then(|()| self.last_rollup_block()) + .and_then(|ru_height| { + self.update_block_tags( + ru_height, + notification.safe_block_number, + notification.finalized_block_number, + ) + }); + match (tag_result, shutdown.is_some()) { + (Err(tag_error), true) => { + // Shutdown error is the root cause; log the secondary failure so it's + // not lost, then let the shutdown error propagate below. + error!(error = ?tag_error, "tag refresh failed during shutdown bail"); + } + (Err(tag_error), false) => return Err(tag_error), + (Ok(()), _) => {} + } } metrics::record_notification_processed(notification); - Ok(changed) + + match shutdown { + Some(error) => Err(error), + None => Ok(()), + } } /// Process a committed chain by extracting and executing blocks. @@ -375,40 +380,78 @@ where self.blob_cacher.clone(), ); let executed = processor.process_block(block_extracts).await?; - // TODO(ENG-2017): this encode → notify → append → emit ordering is - // not crash-safe: if `append_blocks` or `emit_journal` fails after - // `encode_journal` has advanced `journal_previous_hash` (and after - // `notify_new_block` has broadcast), the rolling hash will be - // ahead of what's persisted. The hash needs to be persisted - // alongside the block; see `journal_previous_hash`. - let journal_bytes = self.encode_journal(block_extracts.host_block.number(), &executed); - self.notify_new_block(&executed); - self.storage.append_blocks(vec![executed]).await?; + let previous_hash = self.previous_journal_hash()?; + let (executed, journal_bytes) = + encode_journal(previous_hash, block_extracts.host_block.number(), executed); + // Order: emit -> append -> notify. + // + // `emit` first so any post-emit failure (append error, hard crash) leaves storage + // at N-1; the host re-delivers N on restart and the producer re-emits + // deterministically against a freshly-built chain (`tip = None`), avoiding the + // permanent "block persisted, journal lost" gap that the reverse order leaves + // behind. `notify` last so `eth_subscribe("newHeads")` clients querying storage + // immediately after the broadcast see block N already indexed; `send_new_block` + // only errors when there are no subscribers, which is safe to ignore. + // + // `/journal` consumers may see the journal before storage has indexed the block, + // but they reconstruct state from the journal itself, not from storage RPC, so + // they are unaffected by that ordering. + // + // The broadcast payload is built before `append_blocks` consumes `executed`. + let notification = NewBlockNotification { + header: executed.header.inner().clone(), + transactions: executed.transactions.iter().map(|tx| tx.inner().clone()).collect(), + receipts: executed.receipts.clone(), + }; self.emit_journal(journal_bytes).await?; + self.storage.append_blocks(vec![executed]).await?; + let _ = self.chain.send_new_block(notification); processed = true; } Ok(processed) } - /// Build the serialized journal for a freshly executed block and update - /// the rolling `previous_journal_hash` state. Returns the encoded wire - /// bytes ready to send into the journal chain. + /// Read the rolling `previous_journal_hash` for the next produced block from storage. + /// + /// Returns [`GENESIS_JOURNAL_HASH`] when the database is empty or only contains the genesis + /// block, and also when the storage tip has no recorded journal hash - the persistence-off + /// startup path of `DESIGN.md` §5.5, also covering an upgrade from a pre-`JournalHashes` + /// build. In that fallback the next emit presents as the initial journal of a fresh chain; + /// downstream `/journal` consumers with cached checkpoints will fail validation and must + /// re-bootstrap. /// - /// The hash that becomes the next block's `previous_journal_hash` is the - /// keccak256 of the full encoded `Journal::V1` (version tag included), - /// matching the hash the journal chain computes on ingest. - #[instrument(skip(self, executed), fields(ru_height = executed.header.number()))] - fn encode_journal(&self, host_height: u64, executed: &ExecutedBlock) -> Bytes { - let previous_hash = - *self.journal_previous_hash.lock().expect("journal previous hash lock poisoned"); - let host_journal = HostJournal::new( - JournalMeta::new(host_height, previous_hash, Cow::Borrowed(executed.header.inner())), - BundleStateIndex::from(&executed.bundle), + /// The fallback is only sound while the in-process journal chain is itself fresh + /// (`tip = None`); if the chain already holds a tip, emitting with `previous_hash = + /// GENESIS_JOURNAL_HASH` would be rejected as `PreviousHashMismatch` and the journal task + /// would exit with a generic error. That combination indicates storage corruption or a + /// block appended without going through [`encode_journal`], so surface the real cause here. + fn previous_journal_hash(&self) -> eyre::Result { + let reader = self.storage.reader()?; + let storage_tip = reader.last_block_number()?.unwrap_or(0); + + if storage_tip == 0 { + return Ok(GENESIS_JOURNAL_HASH); + } + + if let Some(hash) = reader.get_journal_hash(storage_tip)? { + return Ok(hash); + } + + if self.journal_chain_handle.tip().is_some() { + return Err(eyre!( + "storage tip {storage_tip} has no recorded journal hash but the in-process \ + journal chain already holds a tip; emitting a fresh-chain initial here would \ + be rejected as `PreviousHashMismatch`. This indicates storage corruption or a \ + block appended without going through `encode_journal`." + )); + } + + warn!( + storage_tip, + "no journal hash recorded for storage tip; presenting next journal as a \ + fresh-chain initial. Downstream `/journal` consumers must re-bootstrap." ); - let encoded: Bytes = Journal::V1(host_journal).encoded().into(); - *self.journal_previous_hash.lock().expect("journal previous hash lock poisoned") = - keccak256(&encoded); - encoded + Ok(GENESIS_JOURNAL_HASH) } /// Push the encoded journal bytes into the journal chain. Awaits if @@ -424,17 +467,6 @@ where .map_err(|_| eyre!("journal chain ingestion task exited unexpectedly")) } - /// Send a new block notification on the broadcast channel. - fn notify_new_block(&self, block: &ExecutedBlock) { - let notif = NewBlockNotification { - header: block.header.inner().clone(), - transactions: block.transactions.iter().map(|tx| tx.inner().clone()).collect(), - receipts: block.receipts.clone(), - }; - // Ignore send errors — no subscribers is fine. - let _ = self.chain.send_new_block(notif); - } - /// Send a reorg notification on the broadcast channel. fn notify_reorg(&self, drained: Vec, common_ancestor: u64) { let removed_blocks = drained @@ -520,24 +552,22 @@ where /// Called when the host chain has reverted a block or set of blocks. /// - /// Returns `true` if any rollup state was unwound. + /// Returns a [`RevertOutcome`] describing whether any rollup state was unwound and whether + /// the caller must shut the node down after running its post-notification work. /// /// # Errors /// /// Returns an error if the revert range is inconsistent with stored /// state — i.e. the range tip does not cover the node's current /// rollup tip. - #[instrument(skip_all, fields( - first = range.first(), - tip = range.tip(), - ))] - pub async fn on_host_revert(&self, range: RevertRange) -> eyre::Result { + #[instrument(skip_all, fields(first = range.first(), tip = range.tip()))] + async fn on_host_revert(&self, range: RevertRange) -> eyre::Result { let tip = range.tip(); let first = range.first(); // If the end is before the RU genesis, nothing to do. if tip <= self.constants.host_deploy_height() { - return Ok(false); + return Ok(RevertOutcome::unchanged()); } // Validate that the revert range is consistent with our stored @@ -561,6 +591,14 @@ where .unwrap_or_default() .saturating_sub(1); + let chain_tip = self.journal_chain_handle.tip(); + let shutdown_for_chain_reset = revert_forces_shutdown( + target, + rollup_tip, + chain_tip.map(|checkpoint| checkpoint.height), + self.journal_chain_handle.contains(target), + ); + let drained = self.storage.drain_above(target).await?; // Immediately cap block tags to the common ancestor so that @@ -576,7 +614,35 @@ where self.notify_reorg(drained, target); } - Ok(true) + let shutdown = shutdown_for_chain_reset.then(|| { + eyre!( + "rollup reverted to height {target} but the in-process journal chain's ring \ + buffer no longer holds that height (either it is genesis, or it has been \ + evicted by ring-buffer rotation); the chain cannot validate the post-revert \ + reorg replacement. Restart the node to rebuild the chain in lockstep with \ + storage." + ) + }); + + Ok(RevertOutcome { changed: true, shutdown }) + } +} + +/// Outcome of [`SignetNode::on_host_revert`]. +#[derive(Debug)] +struct RevertOutcome { + /// Whether any rollup state was unwound. + changed: bool, + /// Set when the in-process journal chain can no longer validate the post-revert tip and + /// the node must shut down. Storage and tags have already been drained; the caller is + /// responsible for running the per-notification tag refresh before propagating this. + shutdown: Option, +} + +impl RevertOutcome { + /// A revert that performed no work and requires no shutdown. + const fn unchanged() -> Self { + Self { changed: false, shutdown: None } } } @@ -629,3 +695,112 @@ fn build_journal_chain(config: &JournalConfig) -> eyre::Result (ExecutedBlock, Bytes) { + let host_journal = HostJournal::new( + JournalMeta::new(host_height, previous_hash, Cow::Borrowed(executed.header.inner())), + BundleStateIndex::from(&executed.bundle), + ); + let encoded: Bytes = Journal::V1(host_journal).encoded().into(); + executed.journal_hash = Some(keccak256(&encoded)); + (executed, encoded) +} + +/// Decide whether a host revert that rewinds the rollup to `target` must shut the node down +/// because the in-process journal chain can no longer anchor the post-revert replacement at +/// `target + 1`. +/// +/// Inputs are the revert `target`, the node's current stored `rollup_tip`, the journal +/// chain's tip height (`chain_tip_height`, `None` when the chain is fresh), and whether its +/// ring buffer still holds `target` (`chain_contains_target`). +/// +/// Two situations force a shutdown: +/// +/// * `target == 0`: genesis is never stored in the ring buffer. The producer has already +/// emitted journals for every block being reverted, so the chain holds - or, once it +/// drains its queued journals, will hold - a tip `>= 1` that it cannot rewind past +/// genesis. This is independent of how far the chain's asynchronous ingestion has +/// progressed, so it must NOT be gated on the live tip: doing so races ingestion and +/// makes the bail non-deterministic. The `rollup_tip > 0` guard keeps a no-op revert of +/// a genesis-only chain (nothing emitted, tip never set) from spuriously bailing. +/// * `target > 0` but the journal for `target` has been evicted by ring-buffer rotation +/// (`chain_tip_height >= target && !chain_contains_target`). +/// +/// A tip *behind* a non-zero `target` is fine: `emit` precedes `append`, so the missing +/// journals are queued ahead of the chain and will be ingested before any replacement +/// arrives. +const fn revert_forces_shutdown( + target: u64, + rollup_tip: u64, + chain_tip_height: Option, + chain_contains_target: bool, +) -> bool { + if target == 0 { + rollup_tip > 0 + } else { + match chain_tip_height { + Some(tip_height) => tip_height >= target && !chain_contains_target, + None => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::revert_forces_shutdown; + + // Revert-to-genesis of a non-empty chain must bail regardless of how far the journal + // chain's asynchronous ingestion has progressed. The `None` case is the exact boundary + // that previously raced ingestion and made `test_revert_to_genesis_bails` flaky: the + // revert arrived before the chain ingested the only block's journal, the bail was gated + // on the (still unset) live tip, and the node failed to shut down. + #[test] + fn revert_to_genesis_bails_independent_of_ingestion() { + // Chain has not yet ingested the journal (tip unset) - must still bail. + assert!(revert_forces_shutdown(0, 1, None, false)); + // Chain has ingested the journal (tip set) - bails for the same reason. `contains` is + // irrelevant at genesis, so it must not change the outcome either way. + assert!(revert_forces_shutdown(0, 1, Some(1), false)); + assert!(revert_forces_shutdown(0, 5, Some(5), true)); + } + + // A revert that targets genesis on a chain that only ever held genesis is a no-op: nothing + // was emitted, so there is no replacement to anchor and the node must not bail. + #[test] + fn revert_to_genesis_of_empty_chain_does_not_bail() { + assert!(!revert_forces_shutdown(0, 0, None, false)); + } + + // For `target > 0`, a chain that has not yet caught up to `target` (or sits below it) is + // fine: the missing journals are queued ahead of the chain and will be ingested before any + // replacement arrives. + #[test] + fn revert_above_genesis_with_chain_behind_does_not_bail() { + assert!(!revert_forces_shutdown(2, 3, None, false)); + assert!(!revert_forces_shutdown(2, 3, Some(1), false)); + } + + // For `target > 0`, the chain can anchor the replacement as long as its ring buffer still + // holds `target`, even after it has advanced past it. + #[test] + fn revert_above_genesis_with_target_retained_does_not_bail() { + assert!(!revert_forces_shutdown(2, 3, Some(3), true)); + assert!(!revert_forces_shutdown(2, 2, Some(2), true)); + } + + // For `target > 0`, only an evicted `target` (advanced past it, ring buffer no longer holds + // it) is fatal: the chain cannot anchor the replacement and the node must bail. + #[test] + fn revert_above_genesis_with_target_evicted_bails() { + assert!(revert_forces_shutdown(2, 3, Some(3), false)); + } +}