Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions crates/cold/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,20 @@ impl BlockData {

impl From<ExecutedBlock> for BlockData {
fn from(block: ExecutedBlock) -> Self {
Self::new(
block.header,
block.transactions,
block.receipts,
block.signet_events,
block.zenith_header,
)
// Destructure so adding a new `ExecutedBlock` field is a compile
// error here until the author decides whether it belongs in cold.
// `bundle` and `journal_hash` are hot-only: they live in
// `PlainAccountState`/`PlainStorageState` and `JournalHashes`.
let ExecutedBlock {
header,
transactions,
receipts,
signet_events,
zenith_header,
bundle: _,
journal_hash: _,
} = block;
Self::new(header, transactions, receipts, signet_events, zenith_header)
}
}

Expand Down
20 changes: 20 additions & 0 deletions crates/hot-mdbx/src/db_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ impl FixedSizeInfo {
_ => None,
}
}

/// Canonical mapping from the `(dual_key, fixed_val)` size hints accepted
/// by [`queue_raw_create`] to the [`FixedSizeInfo`] implied by them. Used
/// when persisting FSI for a newly-created table.
///
/// [`queue_raw_create`]: signet_hot::model::HotKvWrite::queue_raw_create
pub(crate) const fn from_create_args(
dual_key: Option<usize>,
fixed_val: Option<usize>,
) -> Self {
match (dual_key, fixed_val) {
(Some(key2_size), Some(value_size)) => {
Self::DupFixed { key2_size, total_size: key2_size + value_size }
}
(Some(key2_size), None) => Self::DupSort { key2_size },
(None, _) => Self::None,
}
}
}

impl ValSer for FixedSizeInfo {
Expand Down Expand Up @@ -208,6 +226,7 @@ mod tests {
("TableG", FixedSizeInfo::None),
("TableH", FixedSizeInfo::None),
("TableI", FixedSizeInfo::None),
("TableJ", FixedSizeInfo::None),
];
let cache = FsiCache::new(known);

Expand All @@ -233,6 +252,7 @@ mod tests {
("T7", FixedSizeInfo::None),
("T8", FixedSizeInfo::None),
("T9", FixedSizeInfo::None),
("T10", FixedSizeInfo::None),
];
let cache = FsiCache::new(known);

Expand Down
29 changes: 8 additions & 21 deletions crates/hot-mdbx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,9 @@ mod utils;

use signet_hot::{
model::{HotKv, HotKvError, HotKvWrite},
tables::{
AccountChangeSets, AccountsHistory, Bytecodes, HeaderNumbers, Headers, NUM_TABLES,
PlainAccountState, PlainStorageState, StorageChangeSets, StorageHistory, Table,
},
tables::{NUM_TABLES, STANDARD_TABLES},
};

/// The known table names, used to pre-populate the FSI cache at open time.
const KNOWN_TABLE_NAMES: [&str; NUM_TABLES] = [
Headers::NAME,
HeaderNumbers::NAME,
Bytecodes::NAME,
PlainAccountState::NAME,
PlainStorageState::NAME,
AccountsHistory::NAME,
AccountChangeSets::NAME,
StorageHistory::NAME,
StorageChangeSets::NAME,
];

/// 1 KB in bytes
pub const KILOBYTE: usize = 1024;
/// 1 MB in bytes
Expand Down Expand Up @@ -458,14 +442,17 @@ fn create_tables_and_populate_cache(env: &Environment) -> Result<FsiCache, MdbxE
Ok(FsiCache::new(known))
}

/// Read FSI entries for all known tables from the metadata table.
/// Read FSI entries for all standard tables from the metadata table.
///
/// Iterates [`STANDARD_TABLES`] and reads each table's on-disk FSI.
fn read_known_fsi<K: signet_libmdbx::TransactionKind>(
tx: &Tx<K>,
) -> Result<[(&'static str, FixedSizeInfo); NUM_TABLES], MdbxError> {
let mut known = [("", FixedSizeInfo::None); NUM_TABLES];
for (i, &name) in KNOWN_TABLE_NAMES.iter().enumerate() {
known[i] = (name, tx.read_fsi_from_table(name)?);
}
STANDARD_TABLES.iter().enumerate().try_for_each(|(index, table)| -> Result<(), MdbxError> {
known[index] = (table.name, tx.read_fsi_from_table(table.name)?);
Ok(())
})?;
Ok(known)
}

Expand Down
1 change: 1 addition & 0 deletions crates/hot-mdbx/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod tests {
use signet_hot::{
KeySer, MAX_KEY_SIZE, ValSer,
conformance::{conformance, test_unwind_conformance},
db::UnsafeDbWrite,
model::{
DualKeyTraverse, DualTableTraverse, HotKv, HotKvRead, HotKvWrite, TableTraverse,
TableTraverseMut,
Expand Down
20 changes: 6 additions & 14 deletions crates/hot-mdbx/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,22 +288,14 @@ macro_rules! impl_hot_kv_write {
dual_key: Option<usize>,
fixed_val: Option<usize>,
) -> Result<(), Self::Error> {
let mut flags = signet_libmdbx::DatabaseFlags::default();

let mut fsi = FixedSizeInfo::None;
let fsi = FixedSizeInfo::from_create_args(dual_key, fixed_val);

if let Some(key2_size) = dual_key {
let mut flags = signet_libmdbx::DatabaseFlags::default();
if fsi.is_dupsort() {
flags.set(signet_libmdbx::DatabaseFlags::DUP_SORT, true);
if let Some(value_size) = fixed_val {
flags.set(signet_libmdbx::DatabaseFlags::DUP_FIXED, true);
fsi = FixedSizeInfo::DupFixed {
key2_size,
total_size: key2_size + value_size,
};
} else {
// DUPSORT without DUP_FIXED - variable value size
fsi = FixedSizeInfo::DupSort { key2_size };
}
}
if fsi.is_dup_fixed() {
flags.set(signet_libmdbx::DatabaseFlags::DUP_FIXED, true);
}

self.inner.create_db(Some(table), flags)?;
Expand Down
1 change: 1 addition & 0 deletions crates/hot/src/conformance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub fn conformance<T: HotKv>(hot_kv: &T) {
test_storage_history(hot_kv);
test_account_changes(hot_kv);
test_storage_changes(hot_kv);
test_journal_hash_roundtrip(hot_kv);
test_missing_reads(hot_kv);
test_cursor_iter_k2(hot_kv);
test_cursor_iter_k2_single(hot_kv);
Expand Down
39 changes: 39 additions & 0 deletions crates/hot/src/conformance/roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,42 @@ pub fn test_storage_changes<T: HotKv>(hot_kv: &T) {
}
}

/// Test writing, reading, and overwriting journal hashes.
pub fn test_journal_hash_roundtrip<T: HotKv>(hot_kv: &T) {
let hash_a = b256!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
let hash_b = b256!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
let hash_c = b256!("0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc");

// Write hashes at two block numbers.
{
let writer = hot_kv.writer().unwrap();
writer.put_journal_hash(7, &hash_a).unwrap();
writer.put_journal_hash(8, &hash_b).unwrap();
writer.commit().unwrap();
}

// Read back.
{
let reader = hot_kv.reader().unwrap();
assert_eq!(reader.get_journal_hash(7).unwrap(), Some(hash_a));
assert_eq!(reader.get_journal_hash(8).unwrap(), Some(hash_b));
assert_eq!(reader.get_journal_hash(9).unwrap(), None);
}

// Overwrite block 7 - producers retrying a block must be able to replace
// the previous entry.
{
let writer = hot_kv.writer().unwrap();
writer.put_journal_hash(7, &hash_c).unwrap();
writer.commit().unwrap();
}
{
let reader = hot_kv.reader().unwrap();
assert_eq!(reader.get_journal_hash(7).unwrap(), Some(hash_c));
assert_eq!(reader.get_journal_hash(8).unwrap(), Some(hash_b));
}
}

/// Test that missing reads return None
pub fn test_missing_reads<T: HotKv>(hot_kv: &T) {
let missing_addr = address!("0x9999999999999999999999999999999999999999");
Expand Down Expand Up @@ -288,4 +324,7 @@ pub fn test_missing_reads<T: HotKv>(hot_kv: &T) {

// Missing storage change
assert!(reader.get_storage_change(999999, &missing_addr, &missing_slot).unwrap().is_none());

// Missing journal hash
assert!(reader.get_journal_hash(999999).unwrap().is_none());
}
25 changes: 23 additions & 2 deletions crates/hot/src/conformance/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub fn make_account_info(nonce: u64, balance: U256, code_hash: Option<B256>) ->
/// - Headers and header number mappings
/// - Account and storage change sets
/// - Account and storage history indices
/// - Journal hashes
pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
// Test addresses
let addr1 = address!("0x1111111111111111111111111111111111111111");
Expand Down Expand Up @@ -412,10 +413,21 @@ pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
blocks.push((sealed, bundle));
}

// Store A: Append all 5 blocks, then unwind to block 1
// Journal hashes, one per block; verifies unwind_above also drops these.
let journal_hashes: Vec<B256> = (0..blocks.len())
.map(|index| {
let byte = u8::try_from(index + 1).expect("test block count fits in u8");
B256::from([byte; 32])
})
.collect();

// Store A: Append all 5 blocks plus journal hashes, then unwind to block 1
{
let writer = store_a.writer().unwrap();
writer.append_blocks(blocks.iter().map(|(h, b)| (h, b))).unwrap();
for ((header, _), hash) in blocks.iter().zip(&journal_hashes) {
writer.put_journal_hash(header.number, hash).unwrap();
}
writer.commit().unwrap();
}
{
Expand All @@ -424,10 +436,13 @@ pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
writer.commit().unwrap();
}

// Store B: Append only blocks 0, 1
// Store B: Append only blocks 0, 1 plus their journal hashes
{
let writer = store_b.writer().unwrap();
writer.append_blocks(blocks[0..2].iter().map(|(h, b)| (h, b))).unwrap();
for ((header, _), hash) in blocks[0..2].iter().zip(&journal_hashes[0..2]) {
writer.put_journal_hash(header.number, hash).unwrap();
}
writer.commit().unwrap();
}

Expand All @@ -454,6 +469,12 @@ pub fn test_unwind_conformance<Kv: HotKv>(store_a: &Kv, store_b: &Kv) {
collect_single_table::<tables::PlainAccountState, _>(&reader_b),
);

assert_single_tables_equal::<tables::JournalHashes>(
"JournalHashes",
collect_single_table::<tables::JournalHashes, _>(&reader_a),
collect_single_table::<tables::JournalHashes, _>(&reader_b),
);

// Note: Bytecodes are not removed on unwind (they're content-addressed),
// so store_a may have more bytecodes than store_b. We skip this comparison.
// assert_single_tables_equal::<tables::Bytecodes>(...)
Expand Down
91 changes: 90 additions & 1 deletion crates/hot/src/db/consistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,22 @@ pub trait HistoryWrite: UnsafeDbWrite + UnsafeHistoryWrite {
/// - Headers and header number mappings
/// - Account and storage change sets
/// - Account and storage history indices
/// - Journal hashes
fn unwind_above(&self, block: BlockNumber) -> Result<(), HistoryError<Self::Error>> {
let first_block = block + 1;
// Nothing can sit above `u64::MAX`; bail out before any of the
// `block + 1` arithmetic below has a chance to overflow.
let Some(first_block) = block.checked_add(1) else {
return Ok(());
};

// Clean journal hashes independently of the Headers table. Direct
// callers of `put_journal_hash` are not forced to pair writes with a
// header, so the upper bound here is `u64::MAX` rather than
// `last_block_number()`. The range delete is a no-op when the table
// has no entries above `block`.
self.traverse_mut::<tables::JournalHashes>()?
.delete_range_inclusive(first_block..=u64::MAX)?;

let Some(last_block) = self.last_block_number()? else {
return Ok(());
};
Expand Down Expand Up @@ -345,3 +359,78 @@ pub trait HistoryWrite: UnsafeDbWrite + UnsafeHistoryWrite {
}

impl<T> HistoryWrite for T where T: UnsafeDbWrite + UnsafeHistoryWrite {}

#[cfg(test)]
mod tests {
use super::HistoryWrite;
use crate::{
db::{HistoryRead, HotDbRead, UnsafeDbWrite, UnsafeHistoryWrite},
mem::MemKv,
model::HotKv,
};
use alloy::{
consensus::{Header, Sealable},
primitives::{Address, B256, U256},
};
use signet_storage_types::Account;

/// Regression: `unwind_above(u64::MAX)` must be a no-op rather than
/// overflowing `block + 1` and wiping data in `JournalHashes`,
/// `Headers`/`HeaderNumbers`, and the change-set tables (which the
/// wrapped `first_block = 0` would have caused to be range-deleted).
#[test]
fn unwind_above_u64_max_is_noop() {
let store = MemKv::new();
let hash = B256::with_last_byte(0x42);
let header = Header { number: 7, ..Default::default() }.seal_slow();
let header_hash = header.hash();
let address = Address::with_last_byte(0xab);
let account = Account { nonce: 1, balance: U256::from(99), bytecode_hash: None };

let writer = store.writer().unwrap();
writer.put_header(&header).unwrap();
writer.write_account_prestate(7, address, &account).unwrap();
writer.put_journal_hash(7, &hash).unwrap();
writer.commit().unwrap();

let writer = store.writer().unwrap();
writer.unwind_above(u64::MAX).unwrap();
writer.commit().unwrap();

let reader = store.reader().unwrap();
assert_eq!(reader.get_journal_hash(7).unwrap(), Some(hash));
assert_eq!(reader.get_header(7).unwrap().expect("header survives").number, 7);
assert_eq!(reader.get_header_number(&header_hash).unwrap(), Some(7));
assert_eq!(reader.get_account_change(7, &address).unwrap(), Some(account));
}

/// Boundary: `unwind_above(u64::MAX - 1)` must delete every table's
/// entry at `u64::MAX`. Exercises the inclusive upper bound of the range
/// delete on `JournalHashes`, `Headers`/`HeaderNumbers`, and the change
/// sets at the extreme.
#[test]
fn unwind_above_below_u64_max_deletes_max_entry() {
let store = MemKv::new();
let hash = B256::with_last_byte(0xab);
let header = Header { number: u64::MAX, ..Default::default() }.seal_slow();
let header_hash = header.hash();
let address = Address::with_last_byte(0xcd);
let account = Account { nonce: 3, balance: U256::from(7), bytecode_hash: None };

let writer = store.writer().unwrap();
writer.put_header(&header).unwrap();
writer.write_account_prestate(u64::MAX, address, &account).unwrap();
writer.put_journal_hash(u64::MAX, &hash).unwrap();
writer.commit().unwrap();

let writer = store.writer().unwrap();
writer.unwind_above(u64::MAX - 1).unwrap();
writer.commit().unwrap();

let reader = store.reader().unwrap();
assert!(reader.get_journal_hash(u64::MAX).unwrap().is_none());
assert!(reader.get_header(u64::MAX).unwrap().is_none());
assert!(reader.get_header_number(&header_hash).unwrap().is_none());
assert!(reader.get_account_change(u64::MAX, &address).unwrap().is_none());
}
}
5 changes: 5 additions & 0 deletions crates/hot/src/db/inconsistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ pub trait UnsafeDbWrite: HotKvWrite + super::sealed::Sealed {
self.queue_delete::<tables::HeaderNumbers>(hash)
}

/// Write the keccak256 of the wire-encoded `Journal::V1` bytes for a block.
fn put_journal_hash(&self, number: u64, hash: &B256) -> Result<(), Self::Error> {
self.queue_put::<tables::JournalHashes>(&number, hash)
}

/// Commit the write transaction.
fn commit(self) -> Result<(), Self::Error>
where
Expand Down
Loading