Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ Edit `my-config.json`:
| `--buffer-size, -b` | 1000 | Buffer size per worker |
| `--dry-run` | false | Simulate without sending |
| `--debug` | false | Log each transaction |
| `--track-receipts` | false | Track transaction receipts |
| `--track-receipts` | false | Enable the block-indexed tx→inclusion tracker (stamps InclusionTime; reports included/expired/inflight-at-shutdown) |
| `--inclusion-reap-after` | 30s | How long an un-included tx waits before reaping as expired (tune to expected inclusion time on congested chains) |
| `--track-blocks` | false | Track block statistics |
| `--track-user-latency` | false | Track user latency metrics |
| `--prewarm` | false | Prewarm accounts before test |
Expand Down
15 changes: 12 additions & 3 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ import (

// Settings holds all CLI-configurable parameters
type Settings struct {
TasksPerEndpoint int `json:"workers,omitempty"`
TPS float64 `json:"tps,omitempty"`
StatsInterval Duration `json:"statsInterval,omitempty"`
TasksPerEndpoint int `json:"workers,omitempty"`
TPS float64 `json:"tps,omitempty"`
StatsInterval Duration `json:"statsInterval,omitempty"`
// InclusionReapAfter bounds how long an un-included tx stays in the inclusion
// registry before it is reaped as expired. Tune to expected inclusion time on
// congested chains: too short reaps slow inclusions as expired (inflated
// un-included), too long inflates the in-flight map.
InclusionReapAfter Duration `json:"inclusionReapAfter,omitempty"`
BufferSize int `json:"bufferSize,omitempty"`
DryRun bool `json:"dryRun,omitempty"`
Debug bool `json:"debug,omitempty"`
Expand Down Expand Up @@ -74,6 +79,7 @@ func DefaultSettings() Settings {
TasksPerEndpoint: 1,
TPS: 0.0,
StatsInterval: Duration(10 * time.Second),
InclusionReapAfter: Duration(30 * time.Second),
BufferSize: 1000,
DryRun: false,
Debug: false,
Expand All @@ -97,6 +103,7 @@ func InitializeViper(cmd *cobra.Command) error {
// Bind flags to viper with error checking
flagBindings := map[string]string{
"statsInterval": "stats-interval",
"inclusionReapAfter": "inclusion-reap-after",
"bufferSize": "buffer-size",
"tps": "tps",
"dryRun": "dry-run",
Expand Down Expand Up @@ -125,6 +132,7 @@ func InitializeViper(cmd *cobra.Command) error {
// Set defaults in Viper
defaults := DefaultSettings()
viper.SetDefault("statsInterval", defaults.StatsInterval.ToDuration())
viper.SetDefault("inclusionReapAfter", defaults.InclusionReapAfter.ToDuration())
viper.SetDefault("bufferSize", defaults.BufferSize)
viper.SetDefault("tps", defaults.TPS)
viper.SetDefault("dryRun", defaults.DryRun)
Expand Down Expand Up @@ -171,6 +179,7 @@ func ResolveSettings() *Settings {
TasksPerEndpoint: viper.GetInt("workers"),
TPS: viper.GetFloat64("tps"),
StatsInterval: Duration(viper.GetDuration("statsInterval")),
InclusionReapAfter: Duration(viper.GetDuration("inclusionReapAfter")),
BufferSize: viper.GetInt("bufferSize"),
DryRun: viper.GetBool("dryRun"),
Debug: viper.GetBool("debug"),
Expand Down
2 changes: 2 additions & 0 deletions config/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestArgumentPrecedence(t *testing.T) {

// Add flags (with zero defaults to avoid precedence issues)
cmd.Flags().Duration("stats-interval", 0, "Stats interval")
cmd.Flags().Duration("inclusion-reap-after", 0, "Inclusion reap after")
cmd.Flags().Int("workers", 0, "Number of workers")
cmd.Flags().Float64("tps", 0, "TPS")
cmd.Flags().Bool("dry-run", false, "Dry run")
Expand Down Expand Up @@ -130,6 +131,7 @@ func TestDefaultSettings(t *testing.T) {
TasksPerEndpoint: 1,
TPS: 0.0,
StatsInterval: Duration(10 * time.Second),
InclusionReapAfter: Duration(30 * time.Second),
BufferSize: 1000,
DryRun: false,
Debug: false,
Expand Down
57 changes: 56 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"log"
"math"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -53,6 +54,7 @@ without actually sending requests or deploying contracts.`,
func init() {
rootCmd.Flags().StringVarP(&configFile, "config", "c", "", "Path to configuration file (required)")
rootCmd.Flags().DurationP("stats-interval", "s", 0, "Interval for logging statistics")
rootCmd.Flags().Duration("inclusion-reap-after", 30*time.Second, "How long an un-included tx stays in the inclusion registry before reaping as expired (tune to expected inclusion time on congested chains)")
rootCmd.Flags().IntP("buffer-size", "b", 0, "Buffer size per worker")
rootCmd.Flags().Float64P("tps", "t", 0, "Transactions per second (0 = no limit)")
rootCmd.Flags().Bool("dry-run", false, "Mock deployment and requests")
Expand Down Expand Up @@ -208,6 +210,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
logger := stats.NewLogger(collector, cfg.Settings.StatsInterval.ToDuration(), cfg.Settings.ReportPath, cfg.Settings.Debug)
var ramper *sender.Ramper
var dispatcher *sender.Dispatcher
var inclusionTracker *stats.InclusionTracker

err = service.Run(ctx, func(ctx context.Context, s service.Scope) error {
// Create the generator from the config struct
Expand Down Expand Up @@ -258,8 +261,27 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
})
}

// The --track-receipts flag now enables the block-indexed inclusion
// tracker (the lossy per-tx receipt path is retired).
// Not wired under --dry-run: simulated sends never hit the chain, so they
// would all reap as expired and pollute the inclusion stats.
inclusion := utils.None[*stats.InclusionTracker]()
if len(cfg.Endpoints) > 0 && cfg.Settings.TrackReceipts && !cfg.Settings.DryRun {
reapAfter := cfg.Settings.InclusionReapAfter.ToDuration()
inclusionTracker = stats.NewInclusionTracker(
cfg.SeiChainID,
reapAfter,
inclusionRegistryCap(cfg.Settings.MaxInFlight, cfg.Settings.TPS, reapAfter),
cfg.Settings.ArrivalModel == config.ArrivalModelOpenLoop,
)
inclusion = utils.Some(inclusionTracker)
s.SpawnBgNamed("inclusion tracker", func() error {
return inclusionTracker.Run(ctx, cfg.Endpoints[0])
})
}

// Create the sender from the config struct
snd, err := sender.NewShardedSender(cfg, sharedLimiter, collector)
snd, err := sender.NewShardedSender(cfg, sharedLimiter, collector, inclusion)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}
Expand Down Expand Up @@ -386,6 +408,18 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
log.Printf("⚠️ Open-loop %d txs failed to send (admitted but errored; not lost)", summary.Failed)
}
}
// Read AFTER service.Run returns: both workers and the tracker have joined,
// so inflightAtShutdown is final and the conservation identity holds.
if inclusionTracker != nil {
incl := inclusionTracker.Summary()
summary.InclusionTracked = true
summary.Included = incl.Included
summary.Expired = incl.Expired
summary.DroppedAtCap = incl.DroppedAtCap
summary.InflightAtShutdown = incl.InflightAtShutdown
log.Printf("📦 Inclusion: included=%d expired=%d dropped_at_cap=%d inflight_at_shutdown=%d",
incl.Included, incl.Expired, incl.DroppedAtCap, incl.InflightAtShutdown)
}
collector.EmitRunSummary(ctx, summary)
if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 {
log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d)
Expand All @@ -398,6 +432,27 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
return err
}

// inclusionRegistryCap sizes the inclusion registry. A registry entry lives from
// send-completion until block-match or reapAfter — far longer than a send is
// in-flight — so MaxInFlight (which bounds concurrent SENDS) under-sizes it. By
// Little's law the steady-state registry size ≈ sendRate × residency, so for a
// fixed rate the cap must come from TPS × reapAfter (×1.5 headroom for jitter),
// not send concurrency, or healthy high-TPS runs hit dropped_at_cap and
// undercount inclusion. We take the MAX of that term and the legacy MaxInFlight×4
// floor. For TPS<=0 (a ramped run with no fixed rate known at config time) the
// Little's-law term is 0 and we fall back to the floor; if the ramp peak exceeds
// it the run surfaces dropped_at_cap (un-defer: derive from the ramp peak then).
func inclusionRegistryCap(maxInFlight int, tps float64, reapAfter time.Duration) int {
const maxInflightMultiple = 4
const headroom = 1.5
floor := maxInFlight * maxInflightMultiple
little := int(math.Ceil(tps * reapAfter.Seconds() * headroom))
if little > floor {
return little
}
return floor
}

// loadConfig reads and parses the configuration file
func loadConfig(filename string) (*config.LoadConfig, error) {
data, err := os.ReadFile(filename)
Expand Down
36 changes: 34 additions & 2 deletions sender/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
// the [Dispatcher] times their arrival and hands each off to a [TxSender]; the
// [ShardedSender] routes each tx to one of N per-endpoint [Worker]s by shard;
// the worker's send loop stamps the attempt and calls the go-ethereum client
// (eth_sendRawTransaction). Receipts, when tracked, are
// polled on a separate worker loop. A shared [golang.org/x/time/rate.Limiter] is
// (eth_sendRawTransaction). Inclusion, when tracked, is observed by the
// block-indexed [stats.InclusionTracker] (see Inclusion stage below), not by
// per-tx receipt polling. A shared [golang.org/x/time/rate.Limiter] is
// the single rate authority for the whole pipeline; the [Ramper] drives its
// limit up or down via SetLimit.
//
Expand Down Expand Up @@ -84,6 +85,37 @@
// run's arrival model, not of any per-tx field. Latency and schedule-lag consumers
// must gate on the run-level arrival model.
//
// # Inclusion stage
//
// When enabled (--track-receipts), the worker hands each successful send to the
// [stats.InclusionTracker] at send-completion (after OnComplete, only on a nil
// send error). The tracker subscribes to new heads, fetches each arriving
// block's body once (O(blocks), not O(txs)), and stamps InclusionTime on every
// matched in-flight tx with the block's header-ARRIVAL time. Un-included txs are
// reaped as expired after reapAfter. inclusion_latency (arrival minus
// IntendedSendTime) is an open-loop-only measure; in closed-loop IntendedSendTime
// is enqueue time, so the latency sample is omitted (counts are tracked in both).
//
// Conservation. registered == included + expired + inflight_at_shutdown, and
// registered ⊆ succeeded (only successful sends are registered). The inclusion
// denominator is succeeded (txs_accepted), never a minted "registered" series;
// dropped_at_cap txs are excluded from it. inflight_at_shutdown is read only
// after both the workers and the tracker have joined.
//
// Accepted boundaries. (1) WS gaps degrade conservatively: a missed head is
// counted (block_gaps) but never backfilled, so its txs reap as expired —
// an undercount of inclusions, never a miscount. (2) Reorgs use
// first-observation-wins (stamp + delete); the inclusion-time error is bounded
// by reorg_depth × block_time, with no canonical reconciliation. (3) A single
// fetch endpoint (Endpoints[0], shared with the block collector) adds a small
// read load. (4) InclusionTime is the header-arrival wall clock, not fetch
// completion and not header.Time. (5) A failed block-body fetch is counted
// (block_fetch_errors) and not retried — that block's txs reap as expired, the
// same conservative undercount as a WS gap. (6) A tx registered after its
// including block was already scanned is missed and reaps as expired — bounded
// by the microsecond register window versus block time, a rare conservative
// undercount, the same direction as a WS gap.
//
// Detection and baseline. schedule_lag (AttemptedSendTime minus IntendedSendTime)
// is the primary coordinated-omission gate: it shows when sends fall behind the
// arrival schedule even before any tx is shed. The drop count measures only
Expand Down
6 changes: 0 additions & 6 deletions sender/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ var (
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.1, 0.2, 0.3, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0)))

receiptLatency = must(meter.Float64Histogram(
"receipt_latency",
metric.WithDescription("Latency from transaction submission to receipt confirmation in seconds"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.1, 0.2, 0.3, 0.5, 1.0, 2.0, 3.0, 5.0, 10.0, 20.0)))

txsAccepted = must(meter.Int64Counter(
"txs_accepted",
metric.WithDescription("Transactions successfully submitted to an endpoint"),
Expand Down
4 changes: 2 additions & 2 deletions sender/scheduler_realworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func (g *signedTxGenerator) issuedCount() int {

// newRealWorker builds the production Worker against the given endpoint, in the
// open-loop configuration (SkipRateLimit=true so the scheduler owns the clock,
// TrackReceipts=false so watchTransactions returns immediately and we exercise
// only the send path). It is the real TxSender the scheduler drives.
// no inclusion tracker so we exercise only the send path). It is the real
// TxSender the scheduler drives.
func newRealWorker(endpoint string, tasks, buffer int) *Worker {
return NewWorker(&WorkerConfig{
ID: 0,
Expand Down
9 changes: 6 additions & 3 deletions sender/sharded_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/sei-protocol/sei-load/config"
"github.com/sei-protocol/sei-load/stats"
"github.com/sei-protocol/sei-load/types"
"github.com/sei-protocol/sei-load/utils"
"github.com/sei-protocol/sei-load/utils/service"
)

Expand All @@ -17,8 +18,10 @@ type ShardedSender struct {
workers []*Worker
}

// NewShardedSender creates a new sharded sender with workers for each endpoint
func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector *stats.Collector) (*ShardedSender, error) {
// NewShardedSender creates a new sharded sender with workers for each endpoint.
// inclusion, when present, is shared across all workers so each routes its
// successful sends to the one tracker.
func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector *stats.Collector, inclusion utils.Option[*stats.InclusionTracker]) (*ShardedSender, error) {
if len(cfg.Endpoints) == 0 {
return nil, fmt.Errorf("no endpoints configured")
}
Expand All @@ -36,11 +39,11 @@ func NewShardedSender(cfg *config.LoadConfig, limiter *rate.Limiter, collector *
BufferSize: cfg.Settings.BufferSize,
Tasks: cfg.Settings.TasksPerEndpoint,
DryRun: cfg.Settings.DryRun,
TrackReceipts: cfg.Settings.TrackReceipts,
Debug: cfg.Settings.Debug,
Collector: collector,
Limiter: limiter,
SkipRateLimit: skipRateLimit,
Inclusion: inclusion,
})
}

Expand Down
Loading
Loading