Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,48 @@ pub use ethlambda_types::block::MAX_ATTESTATIONS_DATA;
///
/// See: leanSpec PR #682.
pub const GOSSIP_DISPARITY_INTERVALS: u64 = 1;
/// Local head lag beyond which the node is considered to be syncing.
///
/// See: leanSpec PR #708.
const SYNC_LAG_THRESHOLD: u64 = 4;
/// Freshest-known block lag beyond which the network is considered stalled.
///
/// During a network-wide stall the node remains synced so validators can help
/// the chain recover.
const NETWORK_STALL_THRESHOLD: u64 = 8;
/// Recovery band that prevents the sync status from flapping near the threshold.
const SYNC_HYSTERESIS_BAND: u64 = 2;

#[derive(Default)]
struct SyncStatusTracker {
syncing: bool,
}

impl SyncStatusTracker {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to another module?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the impl SyncStatusTracker?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the whole SyncStatusTracker and related constants. Also the tests. I want to make this file lighter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will open a PR for this to fix this.

fn update(
&mut self,
current_slot: u64,
head_slot: u64,
max_seen_slot: u64,
) -> metrics::SyncStatus {
let head_lag = current_slot.saturating_sub(head_slot);
let network_lag = current_slot.saturating_sub(max_seen_slot);

if network_lag > NETWORK_STALL_THRESHOLD {
self.syncing = false;
} else if self.syncing {
self.syncing = head_lag > SYNC_LAG_THRESHOLD.saturating_sub(SYNC_HYSTERESIS_BAND);
} else {
self.syncing = head_lag > SYNC_LAG_THRESHOLD;
}

if self.syncing {
metrics::SyncStatus::Syncing
} else {
metrics::SyncStatus::Synced
}
}
}

/// Milliseconds until the next interval boundary, measured relative to genesis.
fn ms_until_next_interval(now_ms: u64, genesis_time_ms: u64) -> u64 {
Expand Down Expand Up @@ -104,6 +146,7 @@ impl BlockChain {
last_tick_instant: None,
attestation_committee_count,
pre_merge_coverage: None,
sync_status: SyncStatusTracker::default(),
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -168,6 +211,9 @@ pub struct BlockChainServer {
/// single-threaded message loop, so no synchronization is needed.
/// Observability-only.
pre_merge_coverage: Option<coverage::CoverageSnapshot>,

/// Stateful sync heuristic used by `lean_node_sync_status`.
sync_status: SyncStatusTracker,
}

impl BlockChainServer {
Expand Down Expand Up @@ -215,6 +261,7 @@ impl BlockChainServer {

// Update current slot metric
metrics::update_current_slot(slot);
self.update_sync_status(slot);

// Snapshot the aggregator flag once per tick so all read sites within
// the tick see a consistent value even if the admin API toggles it
Expand Down Expand Up @@ -840,6 +887,15 @@ impl BlockChainServer {
let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation"));
}

fn update_sync_status(&mut self, current_slot: u64) {
let head_slot = self.store.head_slot();
let max_seen_slot = self.store.max_live_chain_slot().unwrap_or(head_slot);
let status = self
.sync_status
.update(current_slot, head_slot, max_seen_slot);
metrics::set_node_sync_status(status);
}
}

// Protocol trait for internal messages only (tick scheduling).
Expand Down Expand Up @@ -997,3 +1053,63 @@ impl Handler<AggregationDeadline> for BlockChainServer {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn sync_status_allows_lag_through_threshold() {
let mut tracker = SyncStatusTracker::default();

for lag in 0..=SYNC_LAG_THRESHOLD {
assert_eq!(
tracker.update(10 + lag, 10, 10 + lag),
metrics::SyncStatus::Synced
);
}
}

#[test]
fn sync_status_detects_local_lag_when_fresh_blocks_are_known() {
let mut tracker = SyncStatusTracker::default();
let current_slot = 10 + SYNC_LAG_THRESHOLD + 1;

assert_eq!(
tracker.update(current_slot, 10, current_slot),
metrics::SyncStatus::Syncing
);
}

#[test]
fn sync_status_treats_stale_known_blocks_as_network_stall() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(100, 0, 0), metrics::SyncStatus::Synced);
}

#[test]
fn sync_status_hysteresis_prevents_flapping() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 11, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 13, 15), metrics::SyncStatus::Synced);
}

#[test]
fn network_stall_reopens_sync_status() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(20, 0, 20), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(30, 0, 20), metrics::SyncStatus::Synced);
}

#[test]
fn future_head_saturates_lag_at_zero() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(15, 20, 20), metrics::SyncStatus::Synced);
}
}
1 change: 1 addition & 0 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ static LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED: std::sync::LazyLock<Histogram> =
// --- Sync Status ---

/// Node synchronization status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncStatus {
Idle,
Syncing,
Expand Down
10 changes: 10 additions & 0 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,16 @@ impl Store {
.collect()
}

/// Return the highest slot in the live chain.
pub fn max_live_chain_slot(&self) -> Option<u64> {
let view = self.backend.begin_read().expect("read view");
view.prefix_iterator(Table::LiveChain, &[])
.expect("iterator")
.filter_map(Result::ok)
.map(|(key, _)| decode_live_chain_key(&key).0)
.max()
}

/// Get all known block roots as HashSet.
///
/// Useful for checking block existence without deserializing.
Expand Down