Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type Settings struct {
// txs that would exceed it at their scheduled instant are dropped and
// counted rather than throttling the arrival clock. Ignored in closed-loop.
MaxInFlight int `json:"maxInFlight,omitempty"`
// ScheduleLagVoidThreshold is the fraction of the arrival interval (1/λ) that
// schedule_lag_p99 may reach before an open-loop run is VOID. Zero
// uses the provisional built-in default; set via config to retune without a
// rebuild. Ignored in closed-loop.
ScheduleLagVoidThreshold float64 `json:"scheduleLagVoidThreshold,omitempty"`
}

// Arrival model identifiers for the ArrivalModel setting.
Expand Down Expand Up @@ -94,7 +99,8 @@ func DefaultSettings() Settings {
NumBlocksToWrite: 100,
PostSummaryFlushDelay: Duration(25 * time.Second),
ArrivalModel: ArrivalModelClosedLoop,
MaxInFlight: 10_000,
MaxInFlight: 10_000,
ScheduleLagVoidThreshold: 0,
}
}

Expand Down Expand Up @@ -150,6 +156,7 @@ func InitializeViper(cmd *cobra.Command) error {
viper.SetDefault("postSummaryFlushDelay", defaults.PostSummaryFlushDelay.ToDuration())
viper.SetDefault("arrivalModel", defaults.ArrivalModel)
viper.SetDefault("maxInFlight", defaults.MaxInFlight)
viper.SetDefault("scheduleLagVoidThreshold", defaults.ScheduleLagVoidThreshold)
return nil
}

Expand Down Expand Up @@ -193,7 +200,8 @@ func ResolveSettings() *Settings {
TargetGas: viper.GetUint64("targetGas"),
NumBlocksToWrite: viper.GetInt("numBlocksToWrite"),
PostSummaryFlushDelay: Duration(viper.GetDuration("postSummaryFlushDelay")),
ArrivalModel: viper.GetString("arrivalModel"),
MaxInFlight: viper.GetInt("maxInFlight"),
ArrivalModel: viper.GetString("arrivalModel"),
MaxInFlight: viper.GetInt("maxInFlight"),
ScheduleLagVoidThreshold: viper.GetFloat64("scheduleLagVoidThreshold"),
}
}
50 changes: 48 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
})
}

// The --track-receipts flag now enables the block-indexed inclusion
// tracker (the lossy per-tx receipt path is retired).
// --track-receipts enables the block-indexed inclusion tracker.
// Not wired under --dry-run: simulated sends never hit the chain, so they
// would all reap as expired and pollute the inclusion stats.
inclusion := utils.None[*stats.InclusionTracker]()
Expand Down Expand Up @@ -323,6 +322,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
switch {
case openLoop && cfg.Settings.TxsDir == "":
dispatcher.SetOpenLoop(sharedLimiter, cfg.Settings.MaxInFlight)
// Arm the unsampled over-bound counter for this fixed-λ open-loop run:
// same VOID bound the verdict uses (threshold × 1/λ), known here at run
// start. Skipped under RampUp (verdict is N/A — no single 1/λ) and on
// non-fixed-λ runs, so the counter stays inert where it isn't judged.
if bound := stats.ScheduleLagBound(cfg.Settings.TPS, cfg.Settings.ScheduleLagVoidThreshold); bound > 0 && !cfg.Settings.RampUp {
collector.SetScheduleLagBound(bound)
}
log.Printf("📤 Arrival model: open_loop (max in-flight: %d)", cfg.Settings.MaxInFlight)
case openLoop:
// open_loop was requested but the txs-writer path has no arrival clock,
Expand Down Expand Up @@ -396,11 +402,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
ramper.LogFinalStats()
}
summary := stats.RunSummary{ArrivalModel: config.ArrivalModelClosedLoop}
var admitted uint64
if dispatcher != nil {
summary.ArrivalModel = string(dispatcher.ArrivalModel())
dstats := dispatcher.GetStats()
summary.Dropped = dstats.Dropped
summary.Failed = dstats.Failed
admitted = dstats.TotalSent + dstats.Failed
if summary.Dropped > 0 {
log.Printf("⚠️ Open-loop dropped %d txs (in-flight saturated; not throttled)", summary.Dropped)
}
Expand All @@ -420,6 +428,44 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
log.Printf("📦 Inclusion: included=%d expired=%d dropped_at_cap=%d inflight_at_shutdown=%d",
incl.Included, incl.Expired, incl.DroppedAtCap, incl.InflightAtShutdown)
}
// Open-loop self-check: compute schedule_lag_p99 and the run verdict. Gated
// on the model the run actually used (summary.ArrivalModel, not the requested
// flag — the txs-writer path downgrades to closed_loop).
openLoopRun := summary.ArrivalModel == config.ArrivalModelOpenLoop
lagTotal, lagOverBound, lagMax := collector.ScheduleLagTail()
verdict := stats.EvaluateScheduleLag(stats.ScheduleLagInputs{
Samples: collector.ScheduleLagSamples(),
TargetTPS: cfg.Settings.TPS,
OpenLoop: openLoopRun,
Ramped: cfg.Settings.RampUp,
Admitted: admitted,
Threshold: cfg.Settings.ScheduleLagVoidThreshold,
OverBoundCount: lagOverBound,
OverBoundTotal: lagTotal,
MaxLag: lagMax,
})
summary.ScheduleLagP99 = verdict.ScheduleLagP99
summary.ScheduleLagMax = verdict.MaxLag
summary.ScheduleLagOverBoundCount = verdict.OverBoundCount
summary.ScheduleLagTotal = verdict.OverBoundTotal
summary.Verdict = verdict.Verdict
summary.VoidReason = verdict.VoidReason
if verdict.Anomaly {
log.Printf("🚨 no schedule_lag samples despite %d admitted txs — recorder may be mis-wired", admitted)
}
switch verdict.Verdict {
case stats.VerdictVoid:
log.Printf("⚠️ VOID: %s (schedule_lag_p99=%s, samples=%d)",
verdict.VoidReason, verdict.ScheduleLagP99.Round(time.Microsecond), verdict.SampleCount)
case stats.VerdictNA:
log.Printf("🧪 Run verdict: N/A — %s | schedule_lag_p99=%s (samples=%d)",
verdict.NAReason, verdict.ScheduleLagP99.Round(time.Microsecond), verdict.SampleCount)
default:
log.Printf("🧪 Run verdict: %s | schedule_lag_p99=%s (samples=%d, arrival_interval=%s)",
verdict.Verdict, verdict.ScheduleLagP99.Round(time.Microsecond),
verdict.SampleCount, verdict.ArrivalInterval.Round(time.Microsecond))
}

collector.EmitRunSummary(ctx, summary)
if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 {
log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d)
Expand Down
11 changes: 9 additions & 2 deletions sender/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro
startTime := time.Now()
// Sole owner between dequeue and hand-off: stamp is race-free (see LoadTx).
tx.AttemptedSendTime = startTime
// schedule_lag self-check: only open-loop txs carry a true scheduled
// instant. A zero IntendedSendTime (prewarm) is excluded here; the
// closed-loop enqueue time is excluded at the run level (the verdict gates
// on the arrival model, see stats.EvaluateScheduleLag).
if !tx.IntendedSendTime.IsZero() {
w.cfg.Collector.RecordScheduleLag(startTime.Sub(tx.IntendedSendTime))
}
err = w.sendTransaction(ctx, client, tx)
// OnComplete must fire only after the real send returns — that is what
// bounds true unacked in-flight (see doc.go). Nil on closed-loop/batch.
Expand All @@ -177,8 +184,8 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro
}
w.cfg.Collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil)
// Register at send-completion, only on success: registered ⊆ succeeded.
// (The tracker is wired only for live runs — see main.go; DryRun never
// gets a tracker, so simulated sends are not inclusion-tracked.)
// The tracker is present only for live runs (wired in main.go; never under
// DryRun).
if err == nil {
if t, ok := w.cfg.Inclusion.Get(); ok {
t.Register(tx)
Expand Down
97 changes: 97 additions & 0 deletions stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@ package stats

import (
"fmt"
"math/rand"
"sort"
"sync"
"time"
)

// scheduleLagReservoirCap bounds the schedule_lag sample set. Open-loop runs can
// emit millions of txs; storing every lag is unbounded memory. A reservoir of
// this size keeps a uniform random sample of the full run (Algorithm R), so the
// p99 stays representative regardless of run length — unlike tail-trimming,
// which would bias the percentile toward the run's final window. ~16k *
// time.Duration (8B) ≈ 128KB, negligible against the tx working set.
const scheduleLagReservoirCap = 16384

// Collector tracks comprehensive statistics for load testing
type Collector struct {
mu sync.RWMutex
Expand Down Expand Up @@ -34,6 +43,26 @@ type Collector struct {
totalTxs uint64
lastWindowTime time.Time

// schedule_lag reservoir: a bounded uniform sample of per-tx send lag
// (AttemptedSendTime − IntendedSendTime), used for the open-loop self-check
// verdict. scheduleLagSeen is the total count of recorded samples (the
// reservoir's n), needed for Algorithm R replacement probability.
scheduleLag []time.Duration
scheduleLagSeen uint64
scheduleLagRand *rand.Rand

// Unsampled schedule_lag tail signal: the reservoir is a uniform sample, so a
// sub-percentile late-run tail can stay under the whole-run p99 yet still mean
// the generator fell behind. These exact (un-sampled) counters give the
// verdict a tail-degradation signal the reservoir cannot dilute.
// scheduleLagBound is the VOID bound (threshold × 1/λ) set once at run start
// for a fixed-λ open-loop run; zero leaves over-bound counting inert (ramped /
// closed-loop / no-λ runs are N/A anyway). scheduleLagMax is the largest lag
// ever recorded, surfaced for diagnostics.
scheduleLagBound time.Duration
scheduleLagOverBound uint64
scheduleLagMax time.Duration

// Configuration
maxLatencyHistory int // Limit latency history to prevent memory leaks
}
Expand All @@ -56,9 +85,77 @@ func NewCollector() *Collector {
startTime: time.Now(),
lastWindowTime: time.Now(),
maxLatencyHistory: 10000, // Keep last 10k latencies per endpoint
scheduleLag: make([]time.Duration, 0, scheduleLagReservoirCap),
// Local source: this is a self-check sample, not security-sensitive, and a
// per-collector source avoids contending the global rand mutex on the hot
// send path.
scheduleLagRand: rand.New(rand.NewSource(time.Now().UnixNano())), //nolint:gosec // sampling, not crypto
}
}

// RecordScheduleLag records one open-loop send lag (AttemptedSendTime −
// IntendedSendTime) into the bounded reservoir. The worker calls it right after
// stamping AttemptedSendTime, only when IntendedSendTime is set (open-loop txs;
// closed-loop/prewarm pass a zero IntendedSendTime and are excluded by the
// caller). Negative lags (clock skew between scheduler and worker reads) are
// clamped to zero so they cannot deflate the p99.
func (c *Collector) RecordScheduleLag(lag time.Duration) {
if lag < 0 {
lag = 0
}
c.mu.Lock()
defer c.mu.Unlock()

if lag > c.scheduleLagMax {
c.scheduleLagMax = lag
}
if c.scheduleLagBound > 0 && lag > c.scheduleLagBound {
c.scheduleLagOverBound++
}

c.scheduleLagSeen++
if len(c.scheduleLag) < scheduleLagReservoirCap {
c.scheduleLag = append(c.scheduleLag, lag)
return
}
// Reservoir full: replace a uniformly random slot with probability
// cap/seen (Algorithm R), keeping the retained set a uniform sample.
j := c.scheduleLagRand.Int63n(int64(c.scheduleLagSeen))
if j < int64(scheduleLagReservoirCap) {
c.scheduleLag[j] = lag
}
}

// ScheduleLagSamples returns a copy of the current schedule_lag reservoir. Call
// at run end to feed EvaluateScheduleLag; the copy keeps the caller's percentile
// sort off the live slice.
func (c *Collector) ScheduleLagSamples() []time.Duration {
c.mu.RLock()
defer c.mu.RUnlock()
out := make([]time.Duration, len(c.scheduleLag))
copy(out, c.scheduleLag)
return out
}

// SetScheduleLagBound arms the unsampled over-bound counter with the run's VOID
// bound (threshold × 1/λ). Call once at run start, only for a fixed-λ open-loop
// run; left unset (or set to <=0), over-bound counting stays inert.
func (c *Collector) SetScheduleLagBound(bound time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.scheduleLagBound = bound
}

// ScheduleLagTail reports the exact (un-sampled) tail figures: total lags
// recorded, how many exceeded the VOID bound, and the max lag observed. total is
// the true count, distinct from the bounded reservoir's len. overBound is zero
// when no bound was armed.
func (c *Collector) ScheduleLagTail() (total, overBound uint64, max time.Duration) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.scheduleLagSeen, c.scheduleLagOverBound, c.scheduleLagMax
}

// RecordTransaction records a transaction attempt
func (c *Collector) RecordTransaction(scenario, endpoint string, latency time.Duration, success bool) {
c.mu.Lock()
Expand Down
21 changes: 21 additions & 0 deletions stats/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ var (
"run_inflight_at_shutdown",
metric.WithDescription("In-flight inclusion registry size at run end (emitted once at run end)"),
metric.WithUnit("{transactions}")))

// Open-loop self-check. Emitted once at run end on every run; the
// verdict label distinguishes VALID / VOID / N/A so a generator-bound run is
// queryable, not just a log line.
runScheduleLagP99 = must(meter.Float64Gauge(
"run_schedule_lag_p99",
metric.WithDescription("p99 of per-tx send lag (attempted − intended) over this open-loop run (emitted once at run end)"),
metric.WithUnit("s")))

// Unsampled tail signal: the reservoir p99 above can dilute a sub-percentile
// late-run tail, so the verdict also gates on the exact over-bound fraction.
// Max is diagnostic; fraction is the gate.
runScheduleLagMax = must(meter.Float64Gauge(
"run_schedule_lag_max",
metric.WithDescription("max per-tx send lag (attempted − intended) over this open-loop run, un-sampled (emitted once at run end)"),
metric.WithUnit("s")))

runScheduleLagOverBoundFraction = must(meter.Float64Gauge(
"run_schedule_lag_over_bound_fraction",
metric.WithDescription("exact fraction of sends whose lag exceeded the VOID bound over this open-loop run (emitted once at run end)"),
metric.WithUnit("1")))
)

// meteredInclusionTrackers backs the inclusion_inflight gauge: each tracker
Expand Down
37 changes: 36 additions & 1 deletion stats/run_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type RunSummary struct {
// registered == Included + Expired + InflightAtShutdown, with
// registered ⊆ succeeded. InclusionTracked disambiguates a not-tracked run
// (all zero, flag false) from a tracked run with no inclusions yet.
// TODO(PLT-467): owns run-summary schema versioning for these fields.
InclusionTracked bool
// Included is the count of txs the tracker observed on-chain (stamped).
Included uint64
Expand All @@ -41,6 +40,26 @@ type RunSummary struct {
DroppedAtCap uint64
// InflightAtShutdown is len(inflight) read after workers and tracker joined.
InflightAtShutdown uint64

// Open-loop self-check: schedule_lag = AttemptedSendTime −
// IntendedSendTime per tx. A p99 above the threshold fraction of the arrival
// interval (1/λ) means the generator could not keep its own schedule, so the
// run was generator-bound, not open-loop, and is VOID. Reported on every run
// regardless of verdict; Verdict is N/A for closed-loop or non-fixed-λ runs.
ScheduleLagP99 time.Duration
// ScheduleLagMax is the largest single send lag recorded (un-sampled),
// surfaced for diagnostics; the verdict gates on a fraction, not this max.
ScheduleLagMax time.Duration
// ScheduleLagOverBoundCount / ScheduleLagTotal are the exact (un-sampled)
// count of sends past the VOID bound and the total recorded; their ratio is
// the tail-degradation signal the sampled p99 cannot dilute. Total zero / no
// bound armed means the figures are inert (non-fixed-λ run).
ScheduleLagOverBoundCount uint64
ScheduleLagTotal uint64
// Verdict is VerdictValid, VerdictVoid, or VerdictNA.
Verdict string
// VoidReason explains a VOID verdict; empty otherwise.
VoidReason string
}

// EmitRunSummary records the run-summary gauges. Call once at shutdown.
Expand All @@ -61,4 +80,20 @@ func (c *Collector) EmitRunSummary(ctx context.Context, summary RunSummary) {
if summary.InclusionTracked {
runInflightAtShutdown.Record(ctx, int64(summary.InflightAtShutdown))
}
runScheduleLagP99.Record(ctx, summary.ScheduleLagP99.Seconds(),
metric.WithAttributes(
attribute.String("arrival_model", summary.ArrivalModel),
attribute.String("verdict", summary.Verdict)))
runScheduleLagMax.Record(ctx, summary.ScheduleLagMax.Seconds(),
metric.WithAttributes(
attribute.String("arrival_model", summary.ArrivalModel),
attribute.String("verdict", summary.Verdict)))
var overBoundFraction float64
if summary.ScheduleLagTotal > 0 {
overBoundFraction = float64(summary.ScheduleLagOverBoundCount) / float64(summary.ScheduleLagTotal)
}
runScheduleLagOverBoundFraction.Record(ctx, overBoundFraction,
metric.WithAttributes(
attribute.String("arrival_model", summary.ArrivalModel),
attribute.String("verdict", summary.Verdict)))
}
Loading
Loading