diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 3bfd9367..16c7c7f1 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -55,6 +55,48 @@ pub use ethlambda_types::block::MAX_ATTESTATIONS_DATA; /// /// See: leanSpec PR #682. pub const GOSSIP_DISPARITY_INTERVALS: u64 = 1; +/// Local head lag beyond which the node is considered to be syncing. +/// +/// See: leanSpec PR #708. +const SYNC_LAG_THRESHOLD: u64 = 4; +/// Freshest-known block lag beyond which the network is considered stalled. +/// +/// During a network-wide stall the node remains synced so validators can help +/// the chain recover. +const NETWORK_STALL_THRESHOLD: u64 = 8; +/// Recovery band that prevents the sync status from flapping near the threshold. +const SYNC_HYSTERESIS_BAND: u64 = 2; + +#[derive(Default)] +struct SyncStatusTracker { + syncing: bool, +} + +impl SyncStatusTracker { + fn update( + &mut self, + current_slot: u64, + head_slot: u64, + max_seen_slot: u64, + ) -> metrics::SyncStatus { + let head_lag = current_slot.saturating_sub(head_slot); + let network_lag = current_slot.saturating_sub(max_seen_slot); + + if network_lag > NETWORK_STALL_THRESHOLD { + self.syncing = false; + } else if self.syncing { + self.syncing = head_lag > SYNC_LAG_THRESHOLD.saturating_sub(SYNC_HYSTERESIS_BAND); + } else { + self.syncing = head_lag > SYNC_LAG_THRESHOLD; + } + + if self.syncing { + metrics::SyncStatus::Syncing + } else { + metrics::SyncStatus::Synced + } + } +} /// Milliseconds until the next interval boundary, measured relative to genesis. fn ms_until_next_interval(now_ms: u64, genesis_time_ms: u64) -> u64 { @@ -104,6 +146,7 @@ impl BlockChain { last_tick_instant: None, attestation_committee_count, pre_merge_coverage: None, + sync_status: SyncStatusTracker::default(), } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -168,6 +211,9 @@ pub struct BlockChainServer { /// single-threaded message loop, so no synchronization is needed. /// Observability-only. pre_merge_coverage: Option, + + /// Stateful sync heuristic used by `lean_node_sync_status`. + sync_status: SyncStatusTracker, } impl BlockChainServer { @@ -215,6 +261,7 @@ impl BlockChainServer { // Update current slot metric metrics::update_current_slot(slot); + self.update_sync_status(slot); // Snapshot the aggregator flag once per tick so all read sites within // the tick see a consistent value even if the admin API toggles it @@ -840,6 +887,15 @@ impl BlockChainServer { let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation) .inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation")); } + + fn update_sync_status(&mut self, current_slot: u64) { + let head_slot = self.store.head_slot(); + let max_seen_slot = self.store.max_live_chain_slot().unwrap_or(head_slot); + let status = self + .sync_status + .update(current_slot, head_slot, max_seen_slot); + metrics::set_node_sync_status(status); + } } // Protocol trait for internal messages only (tick scheduling). @@ -997,3 +1053,63 @@ impl Handler for BlockChainServer { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sync_status_allows_lag_through_threshold() { + let mut tracker = SyncStatusTracker::default(); + + for lag in 0..=SYNC_LAG_THRESHOLD { + assert_eq!( + tracker.update(10 + lag, 10, 10 + lag), + metrics::SyncStatus::Synced + ); + } + } + + #[test] + fn sync_status_detects_local_lag_when_fresh_blocks_are_known() { + let mut tracker = SyncStatusTracker::default(); + let current_slot = 10 + SYNC_LAG_THRESHOLD + 1; + + assert_eq!( + tracker.update(current_slot, 10, current_slot), + metrics::SyncStatus::Syncing + ); + } + + #[test] + fn sync_status_treats_stale_known_blocks_as_network_stall() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(100, 0, 0), metrics::SyncStatus::Synced); + } + + #[test] + fn sync_status_hysteresis_prevents_flapping() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 11, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(15, 13, 15), metrics::SyncStatus::Synced); + } + + #[test] + fn network_stall_reopens_sync_status() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(20, 0, 20), metrics::SyncStatus::Syncing); + assert_eq!(tracker.update(30, 0, 20), metrics::SyncStatus::Synced); + } + + #[test] + fn future_head_saturates_lag_at_zero() { + let mut tracker = SyncStatusTracker::default(); + + assert_eq!(tracker.update(15, 20, 20), metrics::SyncStatus::Synced); + } +} diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index facb4097..28e510f5 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -491,6 +491,7 @@ static LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED: std::sync::LazyLock = // --- Sync Status --- /// Node synchronization status. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SyncStatus { Idle, Syncing, diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index c0d3079a..fb06021f 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -827,6 +827,16 @@ impl Store { .collect() } + /// Return the highest slot in the live chain. + pub fn max_live_chain_slot(&self) -> Option { + let view = self.backend.begin_read().expect("read view"); + view.prefix_iterator(Table::LiveChain, &[]) + .expect("iterator") + .filter_map(Result::ok) + .map(|(key, _)| decode_live_chain_key(&key).0) + .max() + } + /// Get all known block roots as HashSet. /// /// Useful for checking block existence without deserializing.