From 111145233c874e17437eca70e006a393c4cd8d41 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Mon, 15 Jun 2026 14:59:32 -0700 Subject: [PATCH 1/6] =?UTF-8?q?feat(stats):=20block-indexed=20tx=E2=86=92i?= =?UTF-8?q?nclusion=20tracker=20(PLT-459)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Build the submit→inclusion correlation that did not exist: a new InclusionTracker indexes txHash→inclusion per block (one BlockByNumber per block, O(blocks) not O(txs)), matches against a bounded in-flight registry the worker populates at send-completion, and stamps LoadTx.InclusionTime from the including block's header-arrival wall clock (single-clock with IntendedSendTime; not header.Time, not fetch time). Retires the lossy per-tx receipt polling (watchTransactions/ waitForReceipt/sentTxs) and the coordinated-omission-contaminated receiptLatency metric; --track-receipts now enables the tracker. Conservation (asserting test): registered == included + expired + inflight_at_shutdown, with registered ⊆ succeeded; dropped-at-cap and WS-gap misses are surfaced (counters), never leaked. No backfill — WS gaps degrade conservatively. Bounded+reaped map, -race clean. Co-Authored-By: Claude Opus 4.8 --- main.go | 38 +++- sender/doc.go | 29 ++- sender/metrics.go | 6 - sender/scheduler_realworker_test.go | 4 +- sender/sharded_sender.go | 9 +- sender/worker.go | 96 ++-------- sender/worker_test.go | 78 ++++++++ stats/inclusion_tracker.go | 243 +++++++++++++++++++++++++ stats/inclusion_tracker_test.go | 268 ++++++++++++++++++++++++++++ stats/metrics.go | 61 +++++++ stats/run_summary.go | 19 ++ types/scenario.go | 10 +- 12 files changed, 764 insertions(+), 97 deletions(-) create mode 100644 stats/inclusion_tracker.go create mode 100644 stats/inclusion_tracker_test.go diff --git a/main.go b/main.go index 5baa309..7624d79 100644 --- a/main.go +++ b/main.go @@ -34,6 +34,11 @@ var ( configFile string ) +// inclusionReapAfter bounds how long an un-included tx stays in the inclusion +// registry before it is reaped as expired. A calibration knob: too short +// undercounts slow inclusions, too long inflates the in-flight map. +const inclusionReapAfter = 30 * time.Second + var rootCmd = &cobra.Command{ Use: "seiload", Short: "Sei Chain Load Test v2", @@ -208,6 +213,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { logger := stats.NewLogger(collector, cfg.Settings.StatsInterval.ToDuration(), cfg.Settings.ReportPath, cfg.Settings.Debug) var ramper *sender.Ramper var dispatcher *sender.Dispatcher + var inclusionTracker *stats.InclusionTracker err = service.Run(ctx, func(ctx context.Context, s service.Scope) error { // Create the generator from the config struct @@ -258,8 +264,26 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { }) } + // The --track-receipts flag now enables the block-indexed inclusion + // tracker (the lossy per-tx receipt path is retired). maxInflight is sized + // off the open-loop --max-in-flight so the registry comfortably holds the + // admitted in-flight set; the ×4 headroom absorbs inclusion lag. + inclusion := utils.None[*stats.InclusionTracker]() + if len(cfg.Endpoints) > 0 && cfg.Settings.TrackReceipts { + const maxInflightMultiple = 4 + inclusionTracker = stats.NewInclusionTracker( + cfg.SeiChainID, + inclusionReapAfter, + cfg.Settings.MaxInFlight*maxInflightMultiple, + ) + inclusion = utils.Some(inclusionTracker) + s.SpawnBgNamed("inclusion tracker", func() error { + return inclusionTracker.Run(ctx, cfg.Endpoints[0]) + }) + } + // Create the sender from the config struct - snd, err := sender.NewShardedSender(cfg, sharedLimiter, collector) + snd, err := sender.NewShardedSender(cfg, sharedLimiter, collector, inclusion) if err != nil { return fmt.Errorf("failed to create sender: %w", err) } @@ -386,6 +410,18 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { log.Printf("⚠️ Open-loop %d txs failed to send (admitted but errored; not lost)", summary.Failed) } } + // Read AFTER service.Run returns: both workers and the tracker have joined, + // so inflightAtShutdown is final and the conservation identity holds. + if inclusionTracker != nil { + incl := inclusionTracker.Summary() + summary.InclusionTracked = true + summary.Included = incl.Included + summary.Expired = incl.Expired + summary.DroppedAtCap = incl.DroppedAtCap + summary.InflightAtShutdown = incl.InflightAtShutdown + log.Printf("📦 Inclusion: included=%d expired=%d dropped_at_cap=%d inflight_at_shutdown=%d", + incl.Included, incl.Expired, incl.DroppedAtCap, incl.InflightAtShutdown) + } collector.EmitRunSummary(ctx, summary) if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 { log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d) diff --git a/sender/doc.go b/sender/doc.go index cf55988..8555b0d 100644 --- a/sender/doc.go +++ b/sender/doc.go @@ -4,8 +4,9 @@ // the [Dispatcher] times their arrival and hands each off to a [TxSender]; the // [ShardedSender] routes each tx to one of N per-endpoint [Worker]s by shard; // the worker's send loop stamps the attempt and calls the go-ethereum client -// (eth_sendRawTransaction). Receipts, when tracked, are -// polled on a separate worker loop. A shared [golang.org/x/time/rate.Limiter] is +// (eth_sendRawTransaction). Inclusion, when tracked, is observed by the +// block-indexed [stats.InclusionTracker] (see Inclusion stage below), not by +// per-tx receipt polling. A shared [golang.org/x/time/rate.Limiter] is // the single rate authority for the whole pipeline; the [Ramper] drives its // limit up or down via SetLimit. // @@ -84,6 +85,30 @@ // run's arrival model, not of any per-tx field. Latency and schedule-lag consumers // must gate on the run-level arrival model. // +// # Inclusion stage +// +// When enabled (--track-receipts), the worker hands each successful send to the +// [stats.InclusionTracker] at send-completion (after OnComplete, only on a nil +// send error). The tracker subscribes to new heads, fetches each arriving +// block's body once (O(blocks), not O(txs)), and stamps InclusionTime on every +// matched in-flight tx with the block's header-ARRIVAL time. Un-included txs are +// reaped as expired after reapAfter. +// +// Conservation. registered == included + expired + inflight_at_shutdown, and +// registered ⊆ succeeded (only successful sends are registered). The inclusion +// denominator is succeeded (txs_accepted), never a minted "registered" series; +// dropped_at_cap txs are excluded from it. inflight_at_shutdown is read only +// after both the workers and the tracker have joined. +// +// Accepted boundaries. (1) WS gaps degrade conservatively: a missed head is +// counted (block_gaps) but never backfilled, so its txs reap as expired — +// an undercount of inclusions, never a miscount. (2) Reorgs use +// first-observation-wins (stamp + delete); the inclusion-time error is bounded +// by reorg_depth × block_time, with no canonical reconciliation. (3) A single +// fetch endpoint (Endpoints[0], shared with the block collector) adds a small +// read load. (4) InclusionTime is the header-arrival wall clock, not fetch +// completion and not header.Time. +// // Detection and baseline. schedule_lag (AttemptedSendTime minus IntendedSendTime) // is the primary coordinated-omission gate: it shows when sends fall behind the // arrival schedule even before any tx is shed. The drop count measures only diff --git a/sender/metrics.go b/sender/metrics.go index 93888fe..0fe8fd5 100644 --- a/sender/metrics.go +++ b/sender/metrics.go @@ -23,12 +23,6 @@ var ( metric.WithUnit("s"), metric.WithExplicitBucketBoundaries(0.1, 0.2, 0.3, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0))) - receiptLatency = must(meter.Float64Histogram( - "receipt_latency", - metric.WithDescription("Latency from transaction submission to receipt confirmation in seconds"), - metric.WithUnit("s"), - metric.WithExplicitBucketBoundaries(0.1, 0.2, 0.3, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0))) - txsAccepted = must(meter.Int64Counter( "txs_accepted", metric.WithDescription("Transactions successfully submitted to an endpoint"), diff --git a/sender/scheduler_realworker_test.go b/sender/scheduler_realworker_test.go index 0f5e99c..63ac9a2 100644 --- a/sender/scheduler_realworker_test.go +++ b/sender/scheduler_realworker_test.go @@ -217,8 +217,8 @@ func (g *signedTxGenerator) issuedCount() int { // newRealWorker builds the production Worker against the given endpoint, in the // open-loop configuration (SkipRateLimit=true so the scheduler owns the clock, -// TrackReceipts=false so watchTransactions returns immediately and we exercise -// only the send path). It is the real TxSender the scheduler drives. +// no inclusion tracker so we exercise only the send path). It is the real +// TxSender the scheduler drives. func newRealWorker(endpoint string, tasks, buffer int) *Worker { return NewWorker(&WorkerConfig{ ID: 0, diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index 0342d4f..dafdc97 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -9,6 +9,7 @@ import ( "github.com/sei-protocol/sei-load/config" "github.com/sei-protocol/sei-load/stats" "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils" "github.com/sei-protocol/sei-load/utils/service" ) @@ -17,8 +18,10 @@ type ShardedSender struct { workers []*Worker } -// NewShardedSender creates a new sharded sender with workers for each endpoint -func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector *stats.Collector) (*ShardedSender, error) { +// NewShardedSender creates a new sharded sender with workers for each endpoint. +// inclusion, when present, is shared across all workers so each routes its +// successful sends to the one tracker. +func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector *stats.Collector, inclusion utils.Option[*stats.InclusionTracker]) (*ShardedSender, error) { if len(cfg.Endpoints) == 0 { return nil, fmt.Errorf("no endpoints configured") } @@ -36,11 +39,11 @@ func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector * BufferSize: cfg.Settings.BufferSize, Tasks: cfg.Settings.TasksPerEndpoint, DryRun: cfg.Settings.DryRun, - TrackReceipts: cfg.Settings.TrackReceipts, Debug: cfg.Settings.Debug, Collector: collector, Limiter: limiter, SkipRateLimit: skipRateLimit, + Inclusion: inclusion, }) } diff --git a/sender/worker.go b/sender/worker.go index f3ece7c..7fdb24d 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -2,7 +2,6 @@ package sender import ( "context" - "errors" "fmt" "log" "net" @@ -10,7 +9,6 @@ import ( "net/url" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" @@ -36,20 +34,21 @@ type WorkerConfig struct { Tasks int DryRun bool Debug bool - TrackReceipts bool Collector *stats.Collector Limiter *rate.Limiter // Shared rate authority; nil disables gating. // SkipRateLimit opts a worker out of limiter gating. Zero value (false) is the // safe default (gate when Limiter is set); set true only in open-loop, where // the scheduler owns the clock (see doc.go). SkipRateLimit bool + // Inclusion, when present, receives each successful send at send-completion so + // the tracker can stamp InclusionTime (see doc.go). None disables tracking. + Inclusion utils.Option[*stats.InclusionTracker] } // Worker handles sending transactions to a specific endpoint type Worker struct { - cfg *WorkerConfig - txChan chan *types.LoadTx - sentTxs chan *types.LoadTx + cfg *WorkerConfig + txChan chan *types.LoadTx } // HttpClientOption configures the Transport used by newHttpClient. @@ -124,9 +123,8 @@ func newRPCClient(ctx context.Context, endpoint string, opts ...HttpClientOption // NewWorker creates a new worker for a specific endpoint func NewWorker(cfg *WorkerConfig) *Worker { w := &Worker{ - cfg: cfg, - txChan: make(chan *types.LoadTx, cfg.BufferSize), - sentTxs: make(chan *types.LoadTx, cfg.BufferSize), + cfg: cfg, + txChan: make(chan *types.LoadTx, cfg.BufferSize), } meterWorkerQueueLength(w) return w @@ -144,7 +142,7 @@ func (w *Worker) Run(ctx context.Context) error { for range w.cfg.Tasks { s.Spawn(func() error { return w.runTxSender(ctx, client) }) } - return w.watchTransactions(ctx, client) + return nil }) } @@ -153,73 +151,6 @@ func (w *Worker) Send(ctx context.Context, tx *types.LoadTx) error { return utils.Send(ctx, w.txChan, tx) } -func (w *Worker) watchTransactions(ctx context.Context, eth *ethclient.Client) error { - if w.cfg.DryRun || !w.cfg.TrackReceipts { - return nil - } - for ctx.Err() == nil { - tx, err := utils.Recv(ctx, w.sentTxs) - if err != nil { - return err - } - // Cancel per-iteration; defer would leak contexts under sustained load. - waitCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - if err := w.waitForReceipt(waitCtx, eth, tx); err != nil { - log.Printf("❌ %v", err) - } - cancel() - } - return ctx.Err() -} - -func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *types.LoadTx) (_err error) { - ctx, span := tracer.Start(ctx, "sender.check_receipt", trace.WithAttributes( - attribute.String("seiload.scenario", tx.Scenario.Name), - attribute.String("seiload.endpoint", w.cfg.Endpoint), - attribute.Int("seiload.worker_id", w.cfg.ID), - attribute.String("seiload.chain_id", w.cfg.SeiChainID), - )) - defer func(start time.Time) { - if _err != nil { - span.RecordError(_err) - } - span.End() - // Record inside the span ctx so exemplars link to the trace. - // worker_id stays off the histogram (cardinality); available via span. - receiptLatency.Record(ctx, time.Since(start).Seconds(), - metric.WithAttributes( - attribute.String("scenario", tx.Scenario.Name), - attribute.String("endpoint", w.cfg.Endpoint), - attribute.String("chain_id", w.cfg.SeiChainID), - statusAttrFromError(_err)), - ) - }(time.Now()) - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - for ctx.Err() == nil { - if _, err := utils.Recv(ctx, ticker.C); err != nil { - return fmt.Errorf("timeout waiting for receipt for tx %s", tx.EthTx.Hash().Hex()) - } - receipt, err := eth.TransactionReceipt(ctx, tx.EthTx.Hash()) - if err != nil { - if errors.Is(err, ethereum.NotFound) { - continue - } - log.Printf("❌ error getting receipt for tx %s: %v", tx.EthTx.Hash().Hex(), err) - continue - } - // Receipt found - log status and return - if receipt.Status != 1 { - return fmt.Errorf("tx %s failed", tx.EthTx.Hash().Hex()) - } - if w.cfg.Debug { - log.Printf("✅ tx %s, %s, gas=%d succeeded\n", tx.Scenario.Name, tx.EthTx.Hash().Hex(), receipt.GasUsed) - } - return nil - } - return ctx.Err() -} - // runTxSender is the main worker loop that processes transactions func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) error { for ctx.Err() == nil { @@ -245,6 +176,12 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro tx.OnComplete(err) } w.cfg.Collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) + // Register at send-completion, only on success: registered ⊆ succeeded. + if err == nil { + if t, ok := w.cfg.Inclusion.Get(); ok { + t.Register(tx) + } + } if err != nil { log.Printf("%v", err) } @@ -265,7 +202,8 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, span.RecordError(_err) } span.End() - // See receiptLatency above re: span-context recording + no worker_id. + // Record inside the span ctx so exemplars link to the trace; worker_id + // stays off the histogram (cardinality), available via the span. sendLatency.Record(ctx, time.Since(start).Seconds(), metric.WithAttributes( attribute.String("scenario", tx.Scenario.Name), @@ -291,8 +229,6 @@ func (w *Worker) sendTransaction(ctx context.Context, client *ethclient.Client, attribute.String("endpoint", w.cfg.Endpoint), attribute.String("scenario", tx.Scenario.Name), )) - - utils.SendOrDrop(w.sentTxs, tx) // non-blocking handoff to receipt poller return nil } diff --git a/sender/worker_test.go b/sender/worker_test.go index 2bc6cf6..c9b4e94 100644 --- a/sender/worker_test.go +++ b/sender/worker_test.go @@ -2,18 +2,23 @@ package sender import ( "context" + "math/big" "net/http" "net/http/httptest" "strings" + "sync/atomic" "testing" "time" + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/require" "golang.org/x/time/rate" "github.com/sei-protocol/sei-load/stats" "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils" ) // drainWorkerWithLimiter runs runTxSender (DryRun: no RPC) over txCount queued @@ -73,6 +78,79 @@ func TestRunTxSender_SkipRateLimitBypassesLimiter(t *testing.T) { "SkipRateLimit must bypass the limiter") } +// dryRunTx builds a minimal LoadTx with a real eth tx so EthTx.Hash() works. +func dryRunTx(nonce uint64) *types.LoadTx { + eth := ethtypes.NewTx(ðtypes.LegacyTx{ + Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000, + To: &common.Address{}, Value: big.NewInt(0), + }) + return &types.LoadTx{EthTx: eth, Scenario: &types.TxScenario{Name: "incl"}} +} + +// inflightCount reads the tracker's registry size via its Summary (read after a +// drain, so inflight is the registered-minus-terminal count). +func inflightCount(tr *stats.InclusionTracker) uint64 { + return tr.Summary().InflightAtShutdown +} + +// TestRunTxSender_RegistersSuccessfulSend asserts the inclusion hand-off: +// a successful (DryRun) send registers the tx with the tracker, and Register +// runs strictly AFTER OnComplete (the permit-release ordering in doc.go). +func TestRunTxSender_RegistersSuccessfulSend(t *testing.T) { + tracker := stats.NewInclusionTracker("test-chain", time.Hour, 100) + collector := stats.NewCollector() + w := NewWorker(&WorkerConfig{ + ID: 0, Endpoint: "dryrun", BufferSize: 4, Tasks: 1, DryRun: true, + Collector: collector, SkipRateLimit: true, + Inclusion: utils.Some(tracker), + }) + + // Single tx so the registry starts empty: at OnComplete time inflight must + // still be 0, proving Register runs strictly after OnComplete. + var inflightAtComplete atomic.Int64 + tx := dryRunTx(0) + tx.OnComplete = func(error) { + inflightAtComplete.Store(int64(inflightCount(tracker))) + } + w.txChan <- tx + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + go func() { + for collector.GetStats().TotalTxs < 1 { + time.Sleep(time.Millisecond) + } + cancel() + }() + _ = w.runTxSender(ctx, nil) + + require.Equal(t, int64(0), inflightAtComplete.Load(), + "Register must fire after OnComplete (registry empty at OnComplete time)") + require.Equal(t, uint64(1), inflightCount(tracker), + "a successful send registers exactly once") +} + +// TestRunTxSender_NoInclusionTracker confirms a None tracker is a safe no-op. +func TestRunTxSender_NoInclusionTracker(t *testing.T) { + collector := stats.NewCollector() + w := NewWorker(&WorkerConfig{ + ID: 0, Endpoint: "dryrun", BufferSize: 2, Tasks: 1, DryRun: true, + Collector: collector, SkipRateLimit: true, + Inclusion: utils.None[*stats.InclusionTracker](), + }) + w.txChan <- dryRunTx(0) + + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + go func() { + for collector.GetStats().TotalTxs < 1 { + time.Sleep(time.Millisecond) + } + cancel() + }() + require.NotPanics(t, func() { _ = w.runTxSender(ctx, nil) }) +} + func TestNewHttpTransport_Defaults(t *testing.T) { tr := newHttpTransport() diff --git a/stats/inclusion_tracker.go b/stats/inclusion_tracker.go new file mode 100644 index 0000000..590d8c1 --- /dev/null +++ b/stats/inclusion_tracker.go @@ -0,0 +1,243 @@ +package stats + +import ( + "context" + "fmt" + "log" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils" + "github.com/sei-protocol/sei-load/utils/service" +) + +// blockSource yields the tx hashes of a single block by number. Consumer-side +// interface so tests can drive matching without a live chain. +type blockSource interface { + BlockTxHashes(ctx context.Context, n uint64) ([]common.Hash, error) +} + +// ethBlockSource is the production blockSource backed by an ethclient. +type ethBlockSource struct{ client *ethclient.Client } + +func (s ethBlockSource) BlockTxHashes(ctx context.Context, n uint64) ([]common.Hash, error) { + block, err := s.client.BlockByNumber(ctx, new(big.Int).SetUint64(n)) + if err != nil { + return nil, err + } + txs := block.Transactions() + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + return hashes, nil +} + +type entry struct { + tx *types.LoadTx + registeredAt time.Time +} + +type inclusionState struct { + inflight map[common.Hash]*entry + included uint64 + expired uint64 + droppedAtCap uint64 + inflightAtShutdown uint64 +} + +// InclusionTracker matches arriving blocks against in-flight txs to stamp +// InclusionTime. Conservation: registered == included + expired + +// inflight_at_shutdown, and registered ⊆ succeeded (see sender/doc.go). +type InclusionTracker struct { + seiChainID string + reapAfter time.Duration + maxInflight int + source blockSource + state utils.Mutex[*inclusionState] +} + +// NewInclusionTracker builds a tracker bounded at maxInflight in-flight txs that +// reaps un-included txs after reapAfter. The block source is the production +// ethclient impl; tests inject via newInclusionTrackerWithSource. +func NewInclusionTracker(seiChainID string, reapAfter time.Duration, maxInflight int) *InclusionTracker { + t := &InclusionTracker{ + seiChainID: seiChainID, + reapAfter: reapAfter, + maxInflight: maxInflight, + state: utils.NewMutex(&inclusionState{ + inflight: make(map[common.Hash]*entry), + }), + } + meterInclusionInflight(t) + return t +} + +func newInclusionTrackerWithSource(t *InclusionTracker, source blockSource) *InclusionTracker { + t.source = source + return t +} + +// Register hands ownership of tx's InclusionTime to the tracker. Caller must +// invoke it only for successful sends, at send-completion (see worker.go), so +// registered ⊆ succeeded holds. At cap the tx is dropped and counted. +func (t *InclusionTracker) Register(tx *types.LoadTx) { + hash := tx.EthTx.Hash() + for s := range t.state.Lock() { + // Cap check and insert share one critical section: race-free admission. + if len(s.inflight) >= t.maxInflight { + s.droppedAtCap++ + inclusionOutcome.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("chain_id", t.seiChainID), + attribute.String("outcome", "dropped_at_cap"), + )) + return + } + s.inflight[hash] = &entry{tx: tx, registeredAt: time.Now()} + } +} + +// Run subscribes to new heads and matches each arriving block once. +func (t *InclusionTracker) Run(ctx context.Context, firstEndpoint string) error { + wsEndpoint := utils.GetWSEndpoint(firstEndpoint) + if t.source == nil { + client, err := ethclient.Dial(firstEndpoint) + if err != nil { + return fmt.Errorf("inclusion tracker: dial %s: %w", firstEndpoint, err) + } + defer client.Close() + t.source = ethBlockSource{client: client} + } + return service.Run(ctx, func(ctx context.Context, s service.Scope) error { + client, err := ethclient.Dial(wsEndpoint) + if err != nil { + return fmt.Errorf("inclusion tracker: connect WebSocket %s: %w", wsEndpoint, err) + } + headers := make(chan *ethtypes.Header) + sub, err := client.SubscribeNewHead(ctx, headers) + if err != nil { + return fmt.Errorf("inclusion tracker: subscribe new heads: %w", err) + } + defer sub.Unsubscribe() + s.SpawnBg(func() error { + subErr, err := utils.Recv(ctx, sub.Err()) + if err != nil { + return err + } + return subErr + }) + s.Spawn(func() error { return t.reapLoop(ctx) }) + + var lastSeen uint64 // 0 = unset; first head seeds it (no backfill). + for ctx.Err() == nil { + header, err := utils.Recv(ctx, headers) + if err != nil { + return err + } + lastSeen = t.processHead(ctx, header.Number.Uint64(), time.Now(), lastSeen) + } + return ctx.Err() + }) +} + +// processHead handles one arriving head: counts any gap (no backfill), matches +// the block, and returns the new lastSeen. lastSeen==0 seeds on the first head. +func (t *InclusionTracker) processHead(ctx context.Context, num uint64, arrival time.Time, lastSeen uint64) uint64 { + if lastSeen != 0 && num > lastSeen+1 { + inclusionBlockGaps.Add(ctx, int64(num-lastSeen-1), metric.WithAttributes( + attribute.String("chain_id", t.seiChainID))) + } + t.matchBlock(ctx, num, arrival) + return num +} + +// matchBlock fetches block num once and stamps every in-flight tx it includes +// with the header-arrival time. +func (t *InclusionTracker) matchBlock(ctx context.Context, num uint64, arrival time.Time) { + // Explicit per-iteration cancel (not deferred-in-loop): bound the fetch. + fetchCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + hashes, err := t.source.BlockTxHashes(fetchCtx, num) + cancel() + if err != nil { + log.Printf("inclusion tracker: fetch block %d: %v", num, err) + return + } + for s := range t.state.Lock() { + for _, h := range hashes { + e, ok := s.inflight[h] + if !ok { + continue + } + // Single writer of InclusionTime, under the lock; first observation + // wins (delete-on-touch) — see reorg note in sender/doc.go. + e.tx.InclusionTime = arrival + delete(s.inflight, h) + s.included++ + inclusionLatency.Record(ctx, arrival.Sub(e.tx.IntendedSendTime).Seconds(), + metric.WithAttributes(attribute.String("chain_id", t.seiChainID))) + } + } +} + +func (t *InclusionTracker) reapLoop(ctx context.Context) error { + ticker := time.NewTicker(t.reapAfter) + defer ticker.Stop() + for ctx.Err() == nil { + if _, err := utils.Recv(ctx, ticker.C); err != nil { + log.Printf("inclusion tracker: reap loop: %v", err) + continue + } + t.reap() + } + return ctx.Err() +} + +// reap evicts txs in-flight longer than reapAfter as expired. Delete-on-touch +// under the lock races safely against matchBlock: whoever holds the lock first +// wins, no double count. +func (t *InclusionTracker) reap() { + cutoff := time.Now().Add(-t.reapAfter) + for s := range t.state.Lock() { + for h, e := range s.inflight { + if e.registeredAt.After(cutoff) { + continue + } + delete(s.inflight, h) + s.expired++ + inclusionOutcome.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("chain_id", t.seiChainID), + attribute.String("outcome", "expired"), + )) + } + } +} + +// InclusionSummary is the conservation tally. Read only after both workers and +// the tracker have joined, so inflightAtShutdown is final. +type InclusionSummary struct { + Included uint64 + Expired uint64 + DroppedAtCap uint64 + InflightAtShutdown uint64 +} + +// Summary snapshots the final tally; call once at shutdown after joins. +func (t *InclusionTracker) Summary() InclusionSummary { + for s := range t.state.Lock() { + s.inflightAtShutdown = uint64(len(s.inflight)) + return InclusionSummary{ + Included: s.included, + Expired: s.expired, + DroppedAtCap: s.droppedAtCap, + InflightAtShutdown: s.inflightAtShutdown, + } + } + panic("unreachable") +} diff --git a/stats/inclusion_tracker_test.go b/stats/inclusion_tracker_test.go new file mode 100644 index 0000000..a9ee82e --- /dev/null +++ b/stats/inclusion_tracker_test.go @@ -0,0 +1,268 @@ +package stats + +import ( + "context" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + + "github.com/sei-protocol/sei-load/types" +) + +// MockBlockSource is a deterministic blockSource for tests. Setter style mirrors +// MockBlockStats: SetBlock seeds a block's tx hashes; fetches are counted. +type MockBlockSource struct { + mu sync.Mutex + blocks map[uint64][]common.Hash + fetches atomic.Int64 + fetchErr error +} + +func NewMockBlockSource() *MockBlockSource { + return &MockBlockSource{blocks: make(map[uint64][]common.Hash)} +} + +func (m *MockBlockSource) SetBlock(n uint64, hashes ...common.Hash) *MockBlockSource { + m.mu.Lock() + defer m.mu.Unlock() + m.blocks[n] = hashes + return m +} + +func (m *MockBlockSource) BlockTxHashes(_ context.Context, n uint64) ([]common.Hash, error) { + m.fetches.Add(1) + if m.fetchErr != nil { + return nil, m.fetchErr + } + m.mu.Lock() + defer m.mu.Unlock() + return m.blocks[n], nil +} + +func (m *MockBlockSource) FetchCount() int64 { return m.fetches.Load() } + +// newTestTracker builds a tracker wired to a mock source, skipping the live dial. +func newTestTracker(t *testing.T, reapAfter time.Duration, maxInflight int, src blockSource) *InclusionTracker { + t.Helper() + return newInclusionTrackerWithSource( + NewInclusionTracker("test-chain", reapAfter, maxInflight), src) +} + +// loadTx builds a LoadTx with a deterministic hash from nonce and an intended +// send time, so latency math is exact. +func loadTx(nonce uint64, intended time.Time) *types.LoadTx { + eth := ethtypes.NewTx(ðtypes.LegacyTx{ + Nonce: nonce, + GasPrice: big.NewInt(1), + Gas: 21000, + To: &common.Address{}, + Value: big.NewInt(0), + }) + return &types.LoadTx{ + EthTx: eth, + Scenario: &types.TxScenario{Name: "test"}, + IntendedSendTime: intended, + } +} + +func inflightLen(t *testing.T, tr *InclusionTracker) int { + t.Helper() + for s := range tr.state.Lock() { + return len(s.inflight) + } + panic("unreachable") +} + +// Test 1: a match stamps InclusionTime to the injected arrival, advances the +// included count, and shrinks the map. +func TestInclusion_MatchStamps(t *testing.T) { + src := NewMockBlockSource() + tr := newTestTracker(t, time.Minute, 100, src) + + intended := time.Unix(1000, 0) + tx := loadTx(1, intended) + tr.Register(tx) + require.Equal(t, 1, inflightLen(t, tr)) + + arrival := time.Unix(1002, 0) + src.SetBlock(5, tx.EthTx.Hash()) + tr.matchBlock(context.Background(), 5, arrival) + + require.Equal(t, arrival, tx.InclusionTime, "InclusionTime is the header-arrival time") + require.Equal(t, 0, inflightLen(t, tr), "matched tx leaves the registry") + require.Equal(t, uint64(1), tr.Summary().Included) +} + +// Test 2: reaping evicts an un-included tx as expired and leaves no leak. +func TestInclusion_ReapExpires(t *testing.T) { + tr := newTestTracker(t, 10*time.Millisecond, 100, NewMockBlockSource()) + + tx := loadTx(1, time.Now()) + tr.Register(tx) + require.Equal(t, 1, inflightLen(t, tr)) + + time.Sleep(20 * time.Millisecond) + tr.reap() + + require.Equal(t, 0, inflightLen(t, tr), "reaped tx leaves the registry (no leak)") + require.True(t, tx.InclusionTime.IsZero(), "reaped tx is never stamped") + s := tr.Summary() + require.Equal(t, uint64(1), s.Expired) + require.Equal(t, uint64(0), s.Included) +} + +// Test 3: reap vs late inclusion, both orderings → no double count, no panic. +func TestInclusion_ReapVsLateInclusion(t *testing.T) { + t.Run("reap_first", func(t *testing.T) { + src := NewMockBlockSource() + tr := newTestTracker(t, time.Nanosecond, 100, src) + tx := loadTx(1, time.Unix(1000, 0)) + tr.Register(tx) + time.Sleep(time.Millisecond) + tr.reap() // wins: expired + src.SetBlock(5, tx.EthTx.Hash()) + tr.matchBlock(context.Background(), 5, time.Unix(1002, 0)) // no-op + s := tr.Summary() + require.Equal(t, uint64(1), s.Expired) + require.Equal(t, uint64(0), s.Included) + require.Equal(t, uint64(1), s.Expired+s.Included, "exactly one terminal state") + }) + t.Run("match_first", func(t *testing.T) { + src := NewMockBlockSource() + tr := newTestTracker(t, time.Nanosecond, 100, src) + tx := loadTx(1, time.Unix(1000, 0)) + tr.Register(tx) + src.SetBlock(5, tx.EthTx.Hash()) + tr.matchBlock(context.Background(), 5, time.Unix(1002, 0)) // wins: included + time.Sleep(time.Millisecond) + tr.reap() // no-op + s := tr.Summary() + require.Equal(t, uint64(1), s.Included) + require.Equal(t, uint64(0), s.Expired) + require.Equal(t, uint64(1), s.Expired+s.Included, "exactly one terminal state") + }) +} + +// Test 4: at cap, Register drops-and-counts; the map never exceeds the cap. +func TestInclusion_BoundedCap(t *testing.T) { + const cap = 3 + tr := newTestTracker(t, time.Minute, cap, NewMockBlockSource()) + + for i := range uint64(10) { + tr.Register(loadTx(i, time.Now())) + require.LessOrEqual(t, inflightLen(t, tr), cap, "map never exceeds cap") + } + s := tr.Summary() + require.Equal(t, uint64(7), s.DroppedAtCap) + require.Equal(t, cap, inflightLen(t, tr)) +} + +// Test 5: conservation identity registered == included + expired + +// inflightAtShutdown, table-driven over register/match/reap mixes. +func TestInclusion_Conservation(t *testing.T) { + cases := []struct { + name string + registered int + matched int + reaped int + }{ + {"all_included", 5, 5, 0}, + {"all_expired", 5, 0, 5}, + {"mixed", 6, 2, 3}, + {"none_terminal", 4, 0, 0}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + src := NewMockBlockSource() + tr := newTestTracker(t, time.Hour, 100, src) + txs := make([]*types.LoadTx, tc.registered) + for i := range txs { + txs[i] = loadTx(uint64(i), time.Unix(1000, 0)) + tr.Register(txs[i]) + } + for i := 0; i < tc.matched; i++ { + src.SetBlock(uint64(i), txs[i].EthTx.Hash()) + tr.matchBlock(context.Background(), uint64(i), time.Unix(1001, 0)) + } + // Reap the next `reaped` txs by forcing their registeredAt past cutoff. + for s := range tr.state.Lock() { + old := time.Now().Add(-2 * time.Hour) + reaped := 0 + for i := tc.matched; i < tc.registered && reaped < tc.reaped; i++ { + if e, ok := s.inflight[txs[i].EthTx.Hash()]; ok { + e.registeredAt = old + reaped++ + } + } + } + tr.reap() + + s := tr.Summary() + require.Equal(t, uint64(tc.registered), + s.Included+s.Expired+s.InflightAtShutdown, + "registered == included + expired + inflight_at_shutdown") + }) + } +} + +// Test 6: processHead counts gaps and never backfills (no fetch of skipped +// heights). The fake records every fetch call. +func TestInclusion_GapNoBackfill(t *testing.T) { + src := NewMockBlockSource() + tr := newTestTracker(t, time.Minute, 100, src) + ctx := context.Background() + + last := tr.processHead(ctx, 10, time.Now(), 0) // seeds, no gap + last = tr.processHead(ctx, 11, time.Now(), last) + last = tr.processHead(ctx, 15, time.Now(), last) // gap 12,13,14 + _ = last + + require.Equal(t, int64(3), src.FetchCount(), + "exactly one fetch per arriving head; skipped heights never fetched") +} + +// Test 7: concurrent register/match/reap is race-free under -race. +func TestInclusion_ConcurrentRaceSafe(t *testing.T) { + src := NewMockBlockSource() + tr := newTestTracker(t, time.Millisecond, 10_000, src) + const n = 500 + txs := make([]*types.LoadTx, n) + for i := range txs { + txs[i] = loadTx(uint64(i), time.Unix(1000, 0)) + src.SetBlock(uint64(i), txs[i].EthTx.Hash()) + } + + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + for i := range txs { + tr.Register(txs[i]) + } + }() + go func() { + defer wg.Done() + for i := range txs { + tr.matchBlock(context.Background(), uint64(i), time.Unix(1001, 0)) + } + }() + go func() { + defer wg.Done() + for range 50 { + tr.reap() + time.Sleep(100 * time.Microsecond) + } + }() + wg.Wait() + tr.reap() + + s := tr.Summary() + require.Equal(t, uint64(n), s.Included+s.Expired+s.InflightAtShutdown, + "conservation holds under concurrency") +} diff --git a/stats/metrics.go b/stats/metrics.go index c66b879..a5d3884 100644 --- a/stats/metrics.go +++ b/stats/metrics.go @@ -1,7 +1,11 @@ package stats import ( + "context" + "sync" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) @@ -51,8 +55,65 @@ var ( "run_txs_failed_total", metric.WithDescription("Total open-loop transactions admitted and enqueued but whose send completed with an error (emitted once at run end)"), metric.WithUnit("{transactions}"))) + + // Inclusion tracker. inclusion_latency._count IS the included count, so no + // standalone included counter. Denominator for inclusion rate is the + // existing succeeded/txs_accepted series, never a new "registered" series. + inclusionLatency = must(meter.Float64Histogram( + "inclusion_latency", + metric.WithDescription("Latency from intended send to observed on-chain inclusion in seconds"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.5, 1, 2, 5, 10, 30, 60, 120))) + + inclusionOutcome = must(meter.Int64Counter( + "inclusion_outcome", + metric.WithDescription("In-flight txs that left the registry un-included, by outcome (expired, dropped_at_cap)"), + metric.WithUnit("{transactions}"))) + + inclusionBlockGaps = must(meter.Int64Counter( + "block_gaps", + metric.WithDescription("Missed block heights observed by the inclusion tracker (no backfill)"), + metric.WithUnit("{blocks}"))) + + // Run-summary: the only inclusion tally with no live series, since it is the + // terminal value of the inclusion_inflight gauge. Emitted once at run end. + runInflightAtShutdown = must(meter.Int64Gauge( + "run_inflight_at_shutdown", + metric.WithDescription("In-flight inclusion registry size at run end (emitted once at run end)"), + metric.WithUnit("{transactions}"))) ) +// meteredInclusionTrackers backs the inclusion_inflight gauge: each tracker +// registers so the callback can sample its in-flight map under lock. +var meteredInclusionTrackers = struct { + lock sync.RWMutex + trackers []*InclusionTracker +}{} + +func meterInclusionInflight(t *InclusionTracker) { + meteredInclusionTrackers.lock.Lock() + defer meteredInclusionTrackers.lock.Unlock() + meteredInclusionTrackers.trackers = append(meteredInclusionTrackers.trackers, t) +} + +func init() { + must(meter.Int64ObservableGauge( + "inclusion_inflight", + metric.WithDescription("Current size of the inclusion tracker's in-flight tx registry"), + metric.WithUnit("{transactions}"), + metric.WithInt64Callback(func(_ context.Context, observer metric.Int64Observer) error { + meteredInclusionTrackers.lock.RLock() + defer meteredInclusionTrackers.lock.RUnlock() + for _, t := range meteredInclusionTrackers.trackers { + for s := range t.state.Lock() { + observer.Observe(int64(len(s.inflight)), + metric.WithAttributes(attribute.String("chain_id", t.seiChainID))) + } + } + return nil + }))) +} + func must[V any](v V, err error) V { if err != nil { panic(err) diff --git a/stats/run_summary.go b/stats/run_summary.go index f3401d8..e222142 100644 --- a/stats/run_summary.go +++ b/stats/run_summary.go @@ -25,6 +25,22 @@ type RunSummary struct { // reported so the conservation invariant (see sender package doc: // scheduled = dropped + succeeded + failed) is auditable from the run summary. Failed uint64 + + // Inclusion-stage tally (see sender/doc.go). The conservation identity is + // registered == Included + Expired + InflightAtShutdown, with + // registered ⊆ succeeded. InclusionTracked disambiguates a not-tracked run + // (all zero, flag false) from a tracked run with no inclusions yet. + // TODO(PLT-467): owns run-summary schema versioning for these fields. + InclusionTracked bool + // Included is the count of txs the tracker observed on-chain (stamped). + Included uint64 + // Expired is the count of registered txs reaped un-included after reapAfter. + Expired uint64 + // DroppedAtCap is the count of successful sends rejected at the in-flight cap; + // excluded from the inclusion denominator (they were never registered). + DroppedAtCap uint64 + // InflightAtShutdown is len(inflight) read after workers and tracker joined. + InflightAtShutdown uint64 } // EmitRunSummary records the run-summary gauges. Call once at shutdown. @@ -42,4 +58,7 @@ func (c *Collector) EmitRunSummary(ctx context.Context, summary RunSummary) { metric.WithAttributes(attribute.String("arrival_model", summary.ArrivalModel))) runTxsFailedTotal.Record(ctx, int64(summary.Failed), metric.WithAttributes(attribute.String("arrival_model", summary.ArrivalModel))) + if summary.InclusionTracked { + runInflightAtShutdown.Record(ctx, int64(summary.InflightAtShutdown)) + } } diff --git a/types/scenario.go b/types/scenario.go index 8d02d8e..af2f852 100644 --- a/types/scenario.go +++ b/types/scenario.go @@ -13,7 +13,7 @@ import ( // LoadTx is a wrapper that has pre-encoded json rpc payload and eth transaction. // // Lifecycle field concurrency contract: a *LoadTx is passed by pointer through -// buffered channels (txChan, sentTxs). Each lifecycle field (the timestamps and +// the buffered txChan. Each lifecycle field (the timestamps and // SequenceIndex) is written at most once, by whichever goroutine owns the tx at // that stage, and is immutable thereafter; ownership transfers with the pointer // across the channels, so the writes need no locking. The open-loop scheduler @@ -46,7 +46,7 @@ type LoadTx struct { // model (see IntendedSendTime); the run's arrival model is authoritative. SequenceIndex uint64 // AttemptedSendTime is when the send was actually attempted, written by the - // worker goroutine that owns the tx between dequeue and the sentTxs hand-off. + // worker goroutine that owns the tx between dequeue and send completion. AttemptedSendTime time.Time // OnComplete, if set, is invoked exactly once when the network send attempt // for this tx finishes (after sendTransaction returns), with the send error @@ -59,7 +59,11 @@ type LoadTx struct { // owning goroutine before hand-off, per the lifecycle concurrency contract. OnComplete func(err error) // InclusionTime is when the tx was observed included on-chain, written only - // by the inclusion tracker. + // by the inclusion tracker (single writer, under its registry lock). The + // clock is the wall-clock instant the including block's newHead header + // ARRIVES at the tracker (time.Now() at header receipt), cached per block + // number and applied to every tx matched in that block — NOT the body-fetch + // completion time and NOT header.Time. InclusionTime time.Time } From 03fc59edb359c262d7f9535de3f55f5b88a5f244 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Mon, 15 Jun 2026 15:11:44 -0700 Subject: [PATCH 2/6] fix(stats): address PLT-459 cohort cross-review findings - Skip the inclusion-latency sample when IntendedSendTime is zero (prewarm txs are never scheduled) so the histogram isn't polluted with epoch-based durations (systems review). - Don't wire the inclusion tracker under --dry-run: simulated sends never hit the chain and would all reap as expired (security review LOW-1). - processHead short-circuits on a duplicate/out-of-order head (num <= lastSeen): no redundant re-fetch, no spurious gap count (systems nit). - Note the ~2x reapAfter worst-case eviction latency (security INFO-1). - Conservation test now exercises dropped_at_cap within the identity, proving it sits outside the registered set (systems nit). Co-Authored-By: Claude Opus 4.8 --- main.go | 4 +++- sender/worker.go | 20 ++++++++++--------- stats/inclusion_tracker.go | 15 ++++++++++++-- stats/inclusion_tracker_test.go | 35 ++++++++++++++++++++------------- 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/main.go b/main.go index 7624d79..956fd4c 100644 --- a/main.go +++ b/main.go @@ -268,8 +268,10 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { // tracker (the lossy per-tx receipt path is retired). maxInflight is sized // off the open-loop --max-in-flight so the registry comfortably holds the // admitted in-flight set; the ×4 headroom absorbs inclusion lag. + // Not wired under --dry-run: simulated sends never hit the chain, so they + // would all reap as expired and pollute the inclusion stats. inclusion := utils.None[*stats.InclusionTracker]() - if len(cfg.Endpoints) > 0 && cfg.Settings.TrackReceipts { + if len(cfg.Endpoints) > 0 && cfg.Settings.TrackReceipts && !cfg.Settings.DryRun { const maxInflightMultiple = 4 inclusionTracker = stats.NewInclusionTracker( cfg.SeiChainID, diff --git a/sender/worker.go b/sender/worker.go index 7fdb24d..21d33e4 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -27,15 +27,15 @@ import ( var tracer = otel.Tracer("github.com/sei-protocol/sei-load/sender") type WorkerConfig struct { - ID int - SeiChainID string - Endpoint string - BufferSize int - Tasks int - DryRun bool - Debug bool - Collector *stats.Collector - Limiter *rate.Limiter // Shared rate authority; nil disables gating. + ID int + SeiChainID string + Endpoint string + BufferSize int + Tasks int + DryRun bool + Debug bool + Collector *stats.Collector + Limiter *rate.Limiter // Shared rate authority; nil disables gating. // SkipRateLimit opts a worker out of limiter gating. Zero value (false) is the // safe default (gate when Limiter is set); set true only in open-loop, where // the scheduler owns the clock (see doc.go). @@ -177,6 +177,8 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro } w.cfg.Collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) // Register at send-completion, only on success: registered ⊆ succeeded. + // (The tracker is wired only for live runs — see main.go; DryRun never + // gets a tracker, so simulated sends are not inclusion-tracked.) if err == nil { if t, ok := w.cfg.Inclusion.Get(); ok { t.Register(tx) diff --git a/stats/inclusion_tracker.go b/stats/inclusion_tracker.go index 590d8c1..213a2a9 100644 --- a/stats/inclusion_tracker.go +++ b/stats/inclusion_tracker.go @@ -150,6 +150,9 @@ func (t *InclusionTracker) Run(ctx context.Context, firstEndpoint string) error // processHead handles one arriving head: counts any gap (no backfill), matches // the block, and returns the new lastSeen. lastSeen==0 seeds on the first head. func (t *InclusionTracker) processHead(ctx context.Context, num uint64, arrival time.Time, lastSeen uint64) uint64 { + if lastSeen != 0 && num <= lastSeen { + return lastSeen // duplicate or out-of-order head: no re-fetch, no spurious gap. + } if lastSeen != 0 && num > lastSeen+1 { inclusionBlockGaps.Add(ctx, int64(num-lastSeen-1), metric.WithAttributes( attribute.String("chain_id", t.seiChainID))) @@ -180,12 +183,20 @@ func (t *InclusionTracker) matchBlock(ctx context.Context, num uint64, arrival t e.tx.InclusionTime = arrival delete(s.inflight, h) s.included++ - inclusionLatency.Record(ctx, arrival.Sub(e.tx.IntendedSendTime).Seconds(), - metric.WithAttributes(attribute.String("chain_id", t.seiChainID))) + // Latency needs a submit reference; a zero IntendedSendTime means + // "not scheduled" (e.g. prewarm txs), so skip the sample rather than + // record a bogus epoch-based duration. See LoadTx contract. + if !e.tx.IntendedSendTime.IsZero() { + inclusionLatency.Record(ctx, arrival.Sub(e.tx.IntendedSendTime).Seconds(), + metric.WithAttributes(attribute.String("chain_id", t.seiChainID))) + } } } } +// reapLoop sweeps every reapAfter; worst-case eviction latency is ~2×reapAfter +// (a tx registered just after a tick waits a full period for the next sweep) — +// a calibration nuance, not a conservation concern. func (t *InclusionTracker) reapLoop(ctx context.Context) error { ticker := time.NewTicker(t.reapAfter) defer ticker.Stop() diff --git a/stats/inclusion_tracker_test.go b/stats/inclusion_tracker_test.go index a9ee82e..fb74cd6 100644 --- a/stats/inclusion_tracker_test.go +++ b/stats/inclusion_tracker_test.go @@ -167,21 +167,26 @@ func TestInclusion_BoundedCap(t *testing.T) { // inflightAtShutdown, table-driven over register/match/reap mixes. func TestInclusion_Conservation(t *testing.T) { cases := []struct { - name string - registered int - matched int - reaped int + name string + attempts int // Register calls + cap int // tracker maxInflight + matched int + reaped int }{ - {"all_included", 5, 5, 0}, - {"all_expired", 5, 0, 5}, - {"mixed", 6, 2, 3}, - {"none_terminal", 4, 0, 0}, + {"all_included", 5, 100, 5, 0}, + {"all_expired", 5, 100, 0, 5}, + {"mixed", 6, 100, 2, 3}, + {"none_terminal", 4, 100, 0, 0}, + // attempts (10) > cap (5): 5 are dropped_at_cap (never registered), the + // other 5 terminate as included/expired/inflight — proving dropped_at_cap + // sits OUTSIDE the registered identity. + {"with_cap_drops", 10, 5, 2, 2}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { src := NewMockBlockSource() - tr := newTestTracker(t, time.Hour, 100, src) - txs := make([]*types.LoadTx, tc.registered) + tr := newTestTracker(t, time.Hour, tc.cap, src) + txs := make([]*types.LoadTx, tc.attempts) for i := range txs { txs[i] = loadTx(uint64(i), time.Unix(1000, 0)) tr.Register(txs[i]) @@ -194,7 +199,7 @@ func TestInclusion_Conservation(t *testing.T) { for s := range tr.state.Lock() { old := time.Now().Add(-2 * time.Hour) reaped := 0 - for i := tc.matched; i < tc.registered && reaped < tc.reaped; i++ { + for i := tc.matched; i < tc.attempts && reaped < tc.reaped; i++ { if e, ok := s.inflight[txs[i].EthTx.Hash()]; ok { e.registeredAt = old reaped++ @@ -204,9 +209,11 @@ func TestInclusion_Conservation(t *testing.T) { tr.reap() s := tr.Summary() - require.Equal(t, uint64(tc.registered), - s.Included+s.Expired+s.InflightAtShutdown, - "registered == included + expired + inflight_at_shutdown") + // dropped_at_cap is excluded from the registered set, so every Register + // attempt is accounted by exactly one of the four buckets. + require.Equal(t, uint64(tc.attempts), + s.Included+s.Expired+s.InflightAtShutdown+s.DroppedAtCap, + "attempts == included + expired + inflight_at_shutdown + dropped_at_cap") }) } } From 203210283f0c2e46ae66dd69c9efe3b44874d84d Mon Sep 17 00:00:00 2001 From: bdchatham Date: Mon, 15 Jun 2026 15:16:30 -0700 Subject: [PATCH 3/6] fix(stats): floor inclusion-tracker cap to avoid zero-cap drop-all (Cursor) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A non-positive maxInflight made len(inflight) >= cap always true, so every Register dropped — silently disabling inclusion tracking when --max-in-flight is 0 (e.g. closed-loop + --track-receipts). NewInclusionTracker now floors a non-positive cap to defaultMaxInflight. Test added. Co-Authored-By: Claude Opus 4.8 --- stats/inclusion_tracker.go | 8 ++++++++ stats/inclusion_tracker_test.go | 11 +++++++++++ 2 files changed, 19 insertions(+) diff --git a/stats/inclusion_tracker.go b/stats/inclusion_tracker.go index 213a2a9..fdf255a 100644 --- a/stats/inclusion_tracker.go +++ b/stats/inclusion_tracker.go @@ -64,10 +64,18 @@ type InclusionTracker struct { state utils.Mutex[*inclusionState] } +// defaultMaxInflight bounds the registry when the caller passes a non-positive +// cap (e.g. --max-in-flight unset in closed-loop): a zero cap would otherwise +// make len(inflight) >= cap always true and drop every registration. +const defaultMaxInflight = 10_000 + // NewInclusionTracker builds a tracker bounded at maxInflight in-flight txs that // reaps un-included txs after reapAfter. The block source is the production // ethclient impl; tests inject via newInclusionTrackerWithSource. func NewInclusionTracker(seiChainID string, reapAfter time.Duration, maxInflight int) *InclusionTracker { + if maxInflight <= 0 { + maxInflight = defaultMaxInflight + } t := &InclusionTracker{ seiChainID: seiChainID, reapAfter: reapAfter, diff --git a/stats/inclusion_tracker_test.go b/stats/inclusion_tracker_test.go index fb74cd6..6b36d5e 100644 --- a/stats/inclusion_tracker_test.go +++ b/stats/inclusion_tracker_test.go @@ -163,6 +163,17 @@ func TestInclusion_BoundedCap(t *testing.T) { require.Equal(t, cap, inflightLen(t, tr)) } +// Test 4b: a non-positive cap falls back to the default bound instead of +// dropping every registration (len >= 0 is always true at cap 0). +func TestInclusion_NonPositiveCapFallsBack(t *testing.T) { + tr := newTestTracker(t, time.Minute, 0, NewMockBlockSource()) + for i := range uint64(5) { + tr.Register(loadTx(i, time.Now())) + } + require.Equal(t, 5, inflightLen(t, tr), "registrations are admitted, not all dropped") + require.Equal(t, uint64(0), tr.Summary().DroppedAtCap) +} + // Test 5: conservation identity registered == included + expired + // inflightAtShutdown, table-driven over register/match/reap mixes. func TestInclusion_Conservation(t *testing.T) { From b60ae781fcd072b45bd37babfcb2f4492f3eb434 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Mon, 15 Jun 2026 15:22:56 -0700 Subject: [PATCH 4/6] fix(stats): surface inclusion block-fetch errors (Cursor) A failed BlockByNumber left the block's txs unmatched while lastSeen still advanced, so they reaped as expired with no signal. Count failures in a block_fetch_errors metric and document the conservative-undercount boundary (same treatment as a WS gap); no retry, to avoid adding RPC load to a struggling SUT. Co-Authored-By: Claude Opus 4.8 --- sender/doc.go | 4 +++- stats/inclusion_tracker.go | 4 ++++ stats/metrics.go | 5 +++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sender/doc.go b/sender/doc.go index 8555b0d..55a3233 100644 --- a/sender/doc.go +++ b/sender/doc.go @@ -107,7 +107,9 @@ // by reorg_depth × block_time, with no canonical reconciliation. (3) A single // fetch endpoint (Endpoints[0], shared with the block collector) adds a small // read load. (4) InclusionTime is the header-arrival wall clock, not fetch -// completion and not header.Time. +// completion and not header.Time. (5) A failed block-body fetch is counted +// (block_fetch_errors) and not retried — that block's txs reap as expired, the +// same conservative undercount as a WS gap. // // Detection and baseline. schedule_lag (AttemptedSendTime minus IntendedSendTime) // is the primary coordinated-omission gate: it shows when sends fall behind the diff --git a/stats/inclusion_tracker.go b/stats/inclusion_tracker.go index fdf255a..353aecd 100644 --- a/stats/inclusion_tracker.go +++ b/stats/inclusion_tracker.go @@ -177,7 +177,11 @@ func (t *InclusionTracker) matchBlock(ctx context.Context, num uint64, arrival t hashes, err := t.source.BlockTxHashes(fetchCtx, num) cancel() if err != nil { + // No retry (avoids piling RPC onto a struggling SUT): the block's txs go + // unmatched and reap as expired. Surfaced so the undercount is visible. log.Printf("inclusion tracker: fetch block %d: %v", num, err) + inclusionBlockFetchErrors.Add(ctx, 1, metric.WithAttributes( + attribute.String("chain_id", t.seiChainID))) return } for s := range t.state.Lock() { diff --git a/stats/metrics.go b/stats/metrics.go index a5d3884..8020791 100644 --- a/stats/metrics.go +++ b/stats/metrics.go @@ -75,6 +75,11 @@ var ( metric.WithDescription("Missed block heights observed by the inclusion tracker (no backfill)"), metric.WithUnit("{blocks}"))) + inclusionBlockFetchErrors = must(meter.Int64Counter( + "block_fetch_errors", + metric.WithDescription("Block-body fetches that failed; the block's txs go unmatched and reap as expired (no retry)"), + metric.WithUnit("{blocks}"))) + // Run-summary: the only inclusion tally with no live series, since it is the // terminal value of the inclusion_inflight gauge. Emitted once at run end. runInflightAtShutdown = must(meter.Int64Gauge( From b68dbf67476fd405ca425fe8b0cf1f14afb05472 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Mon, 15 Jun 2026 16:09:35 -0700 Subject: [PATCH 5/6] fix(stats): address Cursor nits on the inclusion tracker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - inclusion_latency recorded only on open-loop runs: closed-loop IntendedSendTime is enqueue time, so arrival-IntendedSendTime would mix an enqueue→inclusion latency into the histogram. Tracker is told the model at construction; included/expired counts still accrue in both. - reapAfter is now configurable (--inclusion-reap-after / inclusionReapAfter, default 30s) so congested chains with >30s inclusion don't inflate expired. - README: --track-receipts now documents the block-indexed inclusion tracker, not per-tx receipts. Co-Authored-By: Claude Opus 4.8 --- README.md | 3 ++- config/settings.go | 15 ++++++++++++--- config/settings_test.go | 2 ++ main.go | 9 +++------ sender/doc.go | 4 +++- sender/worker_test.go | 2 +- stats/inclusion_tracker.go | 27 ++++++++++++++++++--------- stats/inclusion_tracker_test.go | 31 +++++++++++++++++++++++++++++-- stats/metrics.go | 7 ++++--- 9 files changed, 74 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index c89cf2c..78c97fd 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,8 @@ Edit `my-config.json`: | `--buffer-size, -b` | 1000 | Buffer size per worker | | `--dry-run` | false | Simulate without sending | | `--debug` | false | Log each transaction | -| `--track-receipts` | false | Track transaction receipts | +| `--track-receipts` | false | Enable the block-indexed tx→inclusion tracker (stamps InclusionTime; reports included/expired/inflight-at-shutdown) | +| `--inclusion-reap-after` | 30s | How long an un-included tx waits before reaping as expired (tune to expected inclusion time on congested chains) | | `--track-blocks` | false | Track block statistics | | `--track-user-latency` | false | Track user latency metrics | | `--prewarm` | false | Prewarm accounts before test | diff --git a/config/settings.go b/config/settings.go index c679cf0..6f2e7ab 100644 --- a/config/settings.go +++ b/config/settings.go @@ -12,9 +12,14 @@ import ( // Settings holds all CLI-configurable parameters type Settings struct { - TasksPerEndpoint int `json:"workers,omitempty"` - TPS float64 `json:"tps,omitempty"` - StatsInterval Duration `json:"statsInterval,omitempty"` + TasksPerEndpoint int `json:"workers,omitempty"` + TPS float64 `json:"tps,omitempty"` + StatsInterval Duration `json:"statsInterval,omitempty"` + // InclusionReapAfter bounds how long an un-included tx stays in the inclusion + // registry before it is reaped as expired. Tune to expected inclusion time on + // congested chains: too short reaps slow inclusions as expired (inflated + // un-included), too long inflates the in-flight map. + InclusionReapAfter Duration `json:"inclusionReapAfter,omitempty"` BufferSize int `json:"bufferSize,omitempty"` DryRun bool `json:"dryRun,omitempty"` Debug bool `json:"debug,omitempty"` @@ -74,6 +79,7 @@ func DefaultSettings() Settings { TasksPerEndpoint: 1, TPS: 0.0, StatsInterval: Duration(10 * time.Second), + InclusionReapAfter: Duration(30 * time.Second), BufferSize: 1000, DryRun: false, Debug: false, @@ -97,6 +103,7 @@ func InitializeViper(cmd *cobra.Command) error { // Bind flags to viper with error checking flagBindings := map[string]string{ "statsInterval": "stats-interval", + "inclusionReapAfter": "inclusion-reap-after", "bufferSize": "buffer-size", "tps": "tps", "dryRun": "dry-run", @@ -125,6 +132,7 @@ func InitializeViper(cmd *cobra.Command) error { // Set defaults in Viper defaults := DefaultSettings() viper.SetDefault("statsInterval", defaults.StatsInterval.ToDuration()) + viper.SetDefault("inclusionReapAfter", defaults.InclusionReapAfter.ToDuration()) viper.SetDefault("bufferSize", defaults.BufferSize) viper.SetDefault("tps", defaults.TPS) viper.SetDefault("dryRun", defaults.DryRun) @@ -171,6 +179,7 @@ func ResolveSettings() *Settings { TasksPerEndpoint: viper.GetInt("workers"), TPS: viper.GetFloat64("tps"), StatsInterval: Duration(viper.GetDuration("statsInterval")), + InclusionReapAfter: Duration(viper.GetDuration("inclusionReapAfter")), BufferSize: viper.GetInt("bufferSize"), DryRun: viper.GetBool("dryRun"), Debug: viper.GetBool("debug"), diff --git a/config/settings_test.go b/config/settings_test.go index 09762fa..43d476e 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -82,6 +82,7 @@ func TestArgumentPrecedence(t *testing.T) { // Add flags (with zero defaults to avoid precedence issues) cmd.Flags().Duration("stats-interval", 0, "Stats interval") + cmd.Flags().Duration("inclusion-reap-after", 0, "Inclusion reap after") cmd.Flags().Int("workers", 0, "Number of workers") cmd.Flags().Float64("tps", 0, "TPS") cmd.Flags().Bool("dry-run", false, "Dry run") @@ -130,6 +131,7 @@ func TestDefaultSettings(t *testing.T) { TasksPerEndpoint: 1, TPS: 0.0, StatsInterval: Duration(10 * time.Second), + InclusionReapAfter: Duration(30 * time.Second), BufferSize: 1000, DryRun: false, Debug: false, diff --git a/main.go b/main.go index 956fd4c..ada20bb 100644 --- a/main.go +++ b/main.go @@ -34,11 +34,6 @@ var ( configFile string ) -// inclusionReapAfter bounds how long an un-included tx stays in the inclusion -// registry before it is reaped as expired. A calibration knob: too short -// undercounts slow inclusions, too long inflates the in-flight map. -const inclusionReapAfter = 30 * time.Second - var rootCmd = &cobra.Command{ Use: "seiload", Short: "Sei Chain Load Test v2", @@ -58,6 +53,7 @@ without actually sending requests or deploying contracts.`, func init() { rootCmd.Flags().StringVarP(&configFile, "config", "c", "", "Path to configuration file (required)") rootCmd.Flags().DurationP("stats-interval", "s", 0, "Interval for logging statistics") + rootCmd.Flags().Duration("inclusion-reap-after", 30*time.Second, "How long an un-included tx stays in the inclusion registry before reaping as expired (tune to expected inclusion time on congested chains)") rootCmd.Flags().IntP("buffer-size", "b", 0, "Buffer size per worker") rootCmd.Flags().Float64P("tps", "t", 0, "Transactions per second (0 = no limit)") rootCmd.Flags().Bool("dry-run", false, "Mock deployment and requests") @@ -275,8 +271,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { const maxInflightMultiple = 4 inclusionTracker = stats.NewInclusionTracker( cfg.SeiChainID, - inclusionReapAfter, + cfg.Settings.InclusionReapAfter.ToDuration(), cfg.Settings.MaxInFlight*maxInflightMultiple, + cfg.Settings.ArrivalModel == config.ArrivalModelOpenLoop, ) inclusion = utils.Some(inclusionTracker) s.SpawnBgNamed("inclusion tracker", func() error { diff --git a/sender/doc.go b/sender/doc.go index 55a3233..2341737 100644 --- a/sender/doc.go +++ b/sender/doc.go @@ -92,7 +92,9 @@ // send error). The tracker subscribes to new heads, fetches each arriving // block's body once (O(blocks), not O(txs)), and stamps InclusionTime on every // matched in-flight tx with the block's header-ARRIVAL time. Un-included txs are -// reaped as expired after reapAfter. +// reaped as expired after reapAfter. inclusion_latency (arrival minus +// IntendedSendTime) is an open-loop-only measure; in closed-loop IntendedSendTime +// is enqueue time, so the latency sample is omitted (counts are tracked in both). // // Conservation. registered == included + expired + inflight_at_shutdown, and // registered ⊆ succeeded (only successful sends are registered). The inclusion diff --git a/sender/worker_test.go b/sender/worker_test.go index c9b4e94..f280772 100644 --- a/sender/worker_test.go +++ b/sender/worker_test.go @@ -97,7 +97,7 @@ func inflightCount(tr *stats.InclusionTracker) uint64 { // a successful (DryRun) send registers the tx with the tracker, and Register // runs strictly AFTER OnComplete (the permit-release ordering in doc.go). func TestRunTxSender_RegistersSuccessfulSend(t *testing.T) { - tracker := stats.NewInclusionTracker("test-chain", time.Hour, 100) + tracker := stats.NewInclusionTracker("test-chain", time.Hour, 100, true /* openLoop */) collector := stats.NewCollector() w := NewWorker(&WorkerConfig{ ID: 0, Endpoint: "dryrun", BufferSize: 4, Tasks: 1, DryRun: true, diff --git a/stats/inclusion_tracker.go b/stats/inclusion_tracker.go index 353aecd..94b563a 100644 --- a/stats/inclusion_tracker.go +++ b/stats/inclusion_tracker.go @@ -60,8 +60,13 @@ type InclusionTracker struct { seiChainID string reapAfter time.Duration maxInflight int - source blockSource - state utils.Mutex[*inclusionState] + // openLoop gates the inclusion_latency sample: only open-loop stamps a true + // arrival schedule on IntendedSendTime. In closed-loop IntendedSendTime is + // enqueue time, so arrival-IntendedSendTime would be an enqueue→inclusion + // latency that must not be mixed into the histogram. + openLoop bool + source blockSource + state utils.Mutex[*inclusionState] } // defaultMaxInflight bounds the registry when the caller passes a non-positive @@ -70,9 +75,10 @@ type InclusionTracker struct { const defaultMaxInflight = 10_000 // NewInclusionTracker builds a tracker bounded at maxInflight in-flight txs that -// reaps un-included txs after reapAfter. The block source is the production -// ethclient impl; tests inject via newInclusionTrackerWithSource. -func NewInclusionTracker(seiChainID string, reapAfter time.Duration, maxInflight int) *InclusionTracker { +// reaps un-included txs after reapAfter. openLoop gates the inclusion_latency +// sample (included/expired counts are tracked in both models). The block source +// is the production ethclient impl; tests inject via newInclusionTrackerWithSource. +func NewInclusionTracker(seiChainID string, reapAfter time.Duration, maxInflight int, openLoop bool) *InclusionTracker { if maxInflight <= 0 { maxInflight = defaultMaxInflight } @@ -80,6 +86,7 @@ func NewInclusionTracker(seiChainID string, reapAfter time.Duration, maxInflight seiChainID: seiChainID, reapAfter: reapAfter, maxInflight: maxInflight, + openLoop: openLoop, state: utils.NewMutex(&inclusionState{ inflight: make(map[common.Hash]*entry), }), @@ -195,10 +202,12 @@ func (t *InclusionTracker) matchBlock(ctx context.Context, num uint64, arrival t e.tx.InclusionTime = arrival delete(s.inflight, h) s.included++ - // Latency needs a submit reference; a zero IntendedSendTime means - // "not scheduled" (e.g. prewarm txs), so skip the sample rather than - // record a bogus epoch-based duration. See LoadTx contract. - if !e.tx.IntendedSendTime.IsZero() { + // Open-loop only: IntendedSendTime is a true arrival schedule there, so + // arrival-IntendedSendTime is a real inclusion latency. In closed-loop + // it is enqueue time and the sample is omitted. A zero IntendedSendTime + // means "not scheduled" (e.g. prewarm txs); skip rather than record a + // bogus epoch-based duration. See LoadTx contract. + if t.openLoop && !e.tx.IntendedSendTime.IsZero() { inclusionLatency.Record(ctx, arrival.Sub(e.tx.IntendedSendTime).Seconds(), metric.WithAttributes(attribute.String("chain_id", t.seiChainID))) } diff --git a/stats/inclusion_tracker_test.go b/stats/inclusion_tracker_test.go index 6b36d5e..e36fde1 100644 --- a/stats/inclusion_tracker_test.go +++ b/stats/inclusion_tracker_test.go @@ -47,11 +47,18 @@ func (m *MockBlockSource) BlockTxHashes(_ context.Context, n uint64) ([]common.H func (m *MockBlockSource) FetchCount() int64 { return m.fetches.Load() } -// newTestTracker builds a tracker wired to a mock source, skipping the live dial. +// newTestTracker builds an open-loop tracker wired to a mock source, skipping +// the live dial. Open-loop is the default so latency-bearing tests exercise the +// inclusion_latency path; closed-loop is covered explicitly via newTestTrackerLoop. func newTestTracker(t *testing.T, reapAfter time.Duration, maxInflight int, src blockSource) *InclusionTracker { + t.Helper() + return newTestTrackerLoop(t, reapAfter, maxInflight, src, true) +} + +func newTestTrackerLoop(t *testing.T, reapAfter time.Duration, maxInflight int, src blockSource, openLoop bool) *InclusionTracker { t.Helper() return newInclusionTrackerWithSource( - NewInclusionTracker("test-chain", reapAfter, maxInflight), src) + NewInclusionTracker("test-chain", reapAfter, maxInflight, openLoop), src) } // loadTx builds a LoadTx with a deterministic hash from nonce and an intended @@ -99,6 +106,26 @@ func TestInclusion_MatchStamps(t *testing.T) { require.Equal(t, uint64(1), tr.Summary().Included) } +// Test 1b: closed-loop still counts inclusions and stamps InclusionTime; only +// the inclusion_latency sample is gated off (IntendedSendTime is enqueue time +// there, so arrival-IntendedSendTime is not a real inclusion latency). The +// latency histogram is a global OTel instrument with no test-readable provider, +// so this asserts the observable contract: counts and stamping are unaffected. +func TestInclusion_ClosedLoopCountsNoLatency(t *testing.T) { + src := NewMockBlockSource() + tr := newTestTrackerLoop(t, time.Minute, 100, src, false /* closed-loop */) + require.False(t, tr.openLoop, "tracker built closed-loop: latency sample is gated") + + tx := loadTx(1, time.Unix(1000, 0)) + tr.Register(tx) + arrival := time.Unix(1002, 0) + src.SetBlock(5, tx.EthTx.Hash()) + tr.matchBlock(context.Background(), 5, arrival) + + require.Equal(t, arrival, tx.InclusionTime, "InclusionTime still stamped in closed-loop") + require.Equal(t, uint64(1), tr.Summary().Included, "included count tracked in closed-loop") +} + // Test 2: reaping evicts an un-included tx as expired and leaves no leak. func TestInclusion_ReapExpires(t *testing.T) { tr := newTestTracker(t, 10*time.Millisecond, 100, NewMockBlockSource()) diff --git a/stats/metrics.go b/stats/metrics.go index 8020791..70ffd46 100644 --- a/stats/metrics.go +++ b/stats/metrics.go @@ -56,9 +56,10 @@ var ( metric.WithDescription("Total open-loop transactions admitted and enqueued but whose send completed with an error (emitted once at run end)"), metric.WithUnit("{transactions}"))) - // Inclusion tracker. inclusion_latency._count IS the included count, so no - // standalone included counter. Denominator for inclusion rate is the - // existing succeeded/txs_accepted series, never a new "registered" series. + // Inclusion tracker. inclusion_latency is open-loop only (closed-loop + // IntendedSendTime is enqueue time, not a schedule); its _count is the + // included count only there. Denominator for inclusion rate is the existing + // succeeded/txs_accepted series, never a new "registered" series. inclusionLatency = must(meter.Float64Histogram( "inclusion_latency", metric.WithDescription("Latency from intended send to observed on-chain inclusion in seconds"), From 740eef636797f7743a72f186e73b7a307bf0f9c2 Mon Sep 17 00:00:00 2001 From: bdchatham Date: Mon, 15 Jun 2026 16:22:18 -0700 Subject: [PATCH 6/6] fix(stats): floor reapAfter + size inclusion cap by residency (Cursor) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - reapAfter<=0 (explicit --inclusion-reap-after=0) made reapLoop call time.NewTicker(0), which panics and crashes the tracker. Floor to defaultInclusionReapAfter (30s), mirroring the maxInflight<=0 floor. - The registry cap was MaxInFlight×4, but MaxInFlight bounds concurrent SENDS while a registry entry lives from send to inclusion (much longer). By Little's law size it as max(MaxInFlight×4, ceil(TPS×reapAfter×1.5)) so healthy high-TPS runs don't hit dropped_at_cap and undercount inclusion. - Document the late-register one-shot-match race as an accepted boundary (microsecond window vs block time; rare conservative undercount). Co-Authored-By: Claude Opus 4.8 --- main.go | 32 ++++++++++++++++++++++++++------ sender/doc.go | 5 ++++- stats/inclusion_tracker.go | 8 ++++++++ stats/inclusion_tracker_test.go | 16 ++++++++++++++++ 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index ada20bb..4b0101c 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "math" "net/http" "os" "os/signal" @@ -261,18 +262,16 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { } // The --track-receipts flag now enables the block-indexed inclusion - // tracker (the lossy per-tx receipt path is retired). maxInflight is sized - // off the open-loop --max-in-flight so the registry comfortably holds the - // admitted in-flight set; the ×4 headroom absorbs inclusion lag. + // tracker (the lossy per-tx receipt path is retired). // Not wired under --dry-run: simulated sends never hit the chain, so they // would all reap as expired and pollute the inclusion stats. inclusion := utils.None[*stats.InclusionTracker]() if len(cfg.Endpoints) > 0 && cfg.Settings.TrackReceipts && !cfg.Settings.DryRun { - const maxInflightMultiple = 4 + reapAfter := cfg.Settings.InclusionReapAfter.ToDuration() inclusionTracker = stats.NewInclusionTracker( cfg.SeiChainID, - cfg.Settings.InclusionReapAfter.ToDuration(), - cfg.Settings.MaxInFlight*maxInflightMultiple, + reapAfter, + inclusionRegistryCap(cfg.Settings.MaxInFlight, cfg.Settings.TPS, reapAfter), cfg.Settings.ArrivalModel == config.ArrivalModelOpenLoop, ) inclusion = utils.Some(inclusionTracker) @@ -433,6 +432,27 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { return err } +// inclusionRegistryCap sizes the inclusion registry. A registry entry lives from +// send-completion until block-match or reapAfter — far longer than a send is +// in-flight — so MaxInFlight (which bounds concurrent SENDS) under-sizes it. By +// Little's law the steady-state registry size ≈ sendRate × residency, so for a +// fixed rate the cap must come from TPS × reapAfter (×1.5 headroom for jitter), +// not send concurrency, or healthy high-TPS runs hit dropped_at_cap and +// undercount inclusion. We take the MAX of that term and the legacy MaxInFlight×4 +// floor. For TPS<=0 (a ramped run with no fixed rate known at config time) the +// Little's-law term is 0 and we fall back to the floor; if the ramp peak exceeds +// it the run surfaces dropped_at_cap (un-defer: derive from the ramp peak then). +func inclusionRegistryCap(maxInFlight int, tps float64, reapAfter time.Duration) int { + const maxInflightMultiple = 4 + const headroom = 1.5 + floor := maxInFlight * maxInflightMultiple + little := int(math.Ceil(tps * reapAfter.Seconds() * headroom)) + if little > floor { + return little + } + return floor +} + // loadConfig reads and parses the configuration file func loadConfig(filename string) (*config.LoadConfig, error) { data, err := os.ReadFile(filename) diff --git a/sender/doc.go b/sender/doc.go index 2341737..2d3f259 100644 --- a/sender/doc.go +++ b/sender/doc.go @@ -111,7 +111,10 @@ // read load. (4) InclusionTime is the header-arrival wall clock, not fetch // completion and not header.Time. (5) A failed block-body fetch is counted // (block_fetch_errors) and not retried — that block's txs reap as expired, the -// same conservative undercount as a WS gap. +// same conservative undercount as a WS gap. (6) A tx registered after its +// including block was already scanned is missed and reaps as expired — bounded +// by the microsecond register window versus block time, a rare conservative +// undercount, the same direction as a WS gap. // // Detection and baseline. schedule_lag (AttemptedSendTime minus IntendedSendTime) // is the primary coordinated-omission gate: it shows when sends fall behind the diff --git a/stats/inclusion_tracker.go b/stats/inclusion_tracker.go index 94b563a..0d4210f 100644 --- a/stats/inclusion_tracker.go +++ b/stats/inclusion_tracker.go @@ -74,6 +74,11 @@ type InclusionTracker struct { // make len(inflight) >= cap always true and drop every registration. const defaultMaxInflight = 10_000 +// defaultInclusionReapAfter floors a non-positive reapAfter: time.NewTicker +// panics on a <=0 period, so an explicit 0 (--inclusion-reap-after=0) would +// crash reapLoop. Defense-in-depth; the config default is already 30s. +const defaultInclusionReapAfter = 30 * time.Second + // NewInclusionTracker builds a tracker bounded at maxInflight in-flight txs that // reaps un-included txs after reapAfter. openLoop gates the inclusion_latency // sample (included/expired counts are tracked in both models). The block source @@ -82,6 +87,9 @@ func NewInclusionTracker(seiChainID string, reapAfter time.Duration, maxInflight if maxInflight <= 0 { maxInflight = defaultMaxInflight } + if reapAfter <= 0 { + reapAfter = defaultInclusionReapAfter + } t := &InclusionTracker{ seiChainID: seiChainID, reapAfter: reapAfter, diff --git a/stats/inclusion_tracker_test.go b/stats/inclusion_tracker_test.go index e36fde1..cf00cac 100644 --- a/stats/inclusion_tracker_test.go +++ b/stats/inclusion_tracker_test.go @@ -201,6 +201,22 @@ func TestInclusion_NonPositiveCapFallsBack(t *testing.T) { require.Equal(t, uint64(0), tr.Summary().DroppedAtCap) } +// Test 4c: a non-positive reapAfter is floored to a positive default. A zero +// period would otherwise panic time.NewTicker and crash reapLoop, so we also +// confirm reapLoop starts and stops cleanly with the floored value. +func TestInclusion_NonPositiveReapAfterFloored(t *testing.T) { + tr := newTestTracker(t, 0, 100, NewMockBlockSource()) + require.Positive(t, tr.reapAfter, "non-positive reapAfter is floored to a positive default") + require.Equal(t, defaultInclusionReapAfter, tr.reapAfter) + + // reapLoop must not panic on the floored period; cancel returns it promptly. + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- tr.reapLoop(ctx) }() + cancel() + require.ErrorIs(t, <-done, context.Canceled) +} + // Test 5: conservation identity registered == included + expired + // inflightAtShutdown, table-driven over register/match/reap mixes. func TestInclusion_Conservation(t *testing.T) {