diff --git a/config/settings.go b/config/settings.go index 6f2e7ab..06f085d 100644 --- a/config/settings.go +++ b/config/settings.go @@ -42,6 +42,11 @@ type Settings struct { // txs that would exceed it at their scheduled instant are dropped and // counted rather than throttling the arrival clock. Ignored in closed-loop. MaxInFlight int `json:"maxInFlight,omitempty"` + // ScheduleLagVoidThreshold is the fraction of the arrival interval (1/λ) that + // schedule_lag_p99 may reach before an open-loop run is VOID. Zero + // uses the provisional built-in default; set via config to retune without a + // rebuild. Ignored in closed-loop. + ScheduleLagVoidThreshold float64 `json:"scheduleLagVoidThreshold,omitempty"` } // Arrival model identifiers for the ArrivalModel setting. @@ -94,7 +99,8 @@ func DefaultSettings() Settings { NumBlocksToWrite: 100, PostSummaryFlushDelay: Duration(25 * time.Second), ArrivalModel: ArrivalModelClosedLoop, - MaxInFlight: 10_000, + MaxInFlight: 10_000, + ScheduleLagVoidThreshold: 0, } } @@ -150,6 +156,7 @@ func InitializeViper(cmd *cobra.Command) error { viper.SetDefault("postSummaryFlushDelay", defaults.PostSummaryFlushDelay.ToDuration()) viper.SetDefault("arrivalModel", defaults.ArrivalModel) viper.SetDefault("maxInFlight", defaults.MaxInFlight) + viper.SetDefault("scheduleLagVoidThreshold", defaults.ScheduleLagVoidThreshold) return nil } @@ -193,7 +200,8 @@ func ResolveSettings() *Settings { TargetGas: viper.GetUint64("targetGas"), NumBlocksToWrite: viper.GetInt("numBlocksToWrite"), PostSummaryFlushDelay: Duration(viper.GetDuration("postSummaryFlushDelay")), - ArrivalModel: viper.GetString("arrivalModel"), - MaxInFlight: viper.GetInt("maxInFlight"), + ArrivalModel: viper.GetString("arrivalModel"), + MaxInFlight: viper.GetInt("maxInFlight"), + ScheduleLagVoidThreshold: viper.GetFloat64("scheduleLagVoidThreshold"), } } diff --git a/main.go b/main.go index 4b0101c..0d3eccc 100644 --- a/main.go +++ b/main.go @@ -261,8 +261,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { }) } - // The --track-receipts flag now enables the block-indexed inclusion - // tracker (the lossy per-tx receipt path is retired). + // --track-receipts enables the block-indexed inclusion tracker. // Not wired under --dry-run: simulated sends never hit the chain, so they // would all reap as expired and pollute the inclusion stats. inclusion := utils.None[*stats.InclusionTracker]() @@ -323,6 +322,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { switch { case openLoop && cfg.Settings.TxsDir == "": dispatcher.SetOpenLoop(sharedLimiter, cfg.Settings.MaxInFlight) + // Arm the unsampled over-bound counter for this fixed-λ open-loop run: + // same VOID bound the verdict uses (threshold × 1/λ), known here at run + // start. Skipped under RampUp (verdict is N/A — no single 1/λ) and on + // non-fixed-λ runs, so the counter stays inert where it isn't judged. + if bound := stats.ScheduleLagBound(cfg.Settings.TPS, cfg.Settings.ScheduleLagVoidThreshold); bound > 0 && !cfg.Settings.RampUp { + collector.SetScheduleLagBound(bound) + } log.Printf("📤 Arrival model: open_loop (max in-flight: %d)", cfg.Settings.MaxInFlight) case openLoop: // open_loop was requested but the txs-writer path has no arrival clock, @@ -396,11 +402,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { ramper.LogFinalStats() } summary := stats.RunSummary{ArrivalModel: config.ArrivalModelClosedLoop} + var admitted uint64 if dispatcher != nil { summary.ArrivalModel = string(dispatcher.ArrivalModel()) dstats := dispatcher.GetStats() summary.Dropped = dstats.Dropped summary.Failed = dstats.Failed + admitted = dstats.TotalSent + dstats.Failed if summary.Dropped > 0 { log.Printf("⚠️ Open-loop dropped %d txs (in-flight saturated; not throttled)", summary.Dropped) } @@ -420,6 +428,44 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error { log.Printf("📦 Inclusion: included=%d expired=%d dropped_at_cap=%d inflight_at_shutdown=%d", incl.Included, incl.Expired, incl.DroppedAtCap, incl.InflightAtShutdown) } + // Open-loop self-check: compute schedule_lag_p99 and the run verdict. Gated + // on the model the run actually used (summary.ArrivalModel, not the requested + // flag — the txs-writer path downgrades to closed_loop). + openLoopRun := summary.ArrivalModel == config.ArrivalModelOpenLoop + lagTotal, lagOverBound, lagMax := collector.ScheduleLagTail() + verdict := stats.EvaluateScheduleLag(stats.ScheduleLagInputs{ + Samples: collector.ScheduleLagSamples(), + TargetTPS: cfg.Settings.TPS, + OpenLoop: openLoopRun, + Ramped: cfg.Settings.RampUp, + Admitted: admitted, + Threshold: cfg.Settings.ScheduleLagVoidThreshold, + OverBoundCount: lagOverBound, + OverBoundTotal: lagTotal, + MaxLag: lagMax, + }) + summary.ScheduleLagP99 = verdict.ScheduleLagP99 + summary.ScheduleLagMax = verdict.MaxLag + summary.ScheduleLagOverBoundCount = verdict.OverBoundCount + summary.ScheduleLagTotal = verdict.OverBoundTotal + summary.Verdict = verdict.Verdict + summary.VoidReason = verdict.VoidReason + if verdict.Anomaly { + log.Printf("🚨 no schedule_lag samples despite %d admitted txs — recorder may be mis-wired", admitted) + } + switch verdict.Verdict { + case stats.VerdictVoid: + log.Printf("⚠️ VOID: %s (schedule_lag_p99=%s, samples=%d)", + verdict.VoidReason, verdict.ScheduleLagP99.Round(time.Microsecond), verdict.SampleCount) + case stats.VerdictNA: + log.Printf("🧪 Run verdict: N/A — %s | schedule_lag_p99=%s (samples=%d)", + verdict.NAReason, verdict.ScheduleLagP99.Round(time.Microsecond), verdict.SampleCount) + default: + log.Printf("🧪 Run verdict: %s | schedule_lag_p99=%s (samples=%d, arrival_interval=%s)", + verdict.Verdict, verdict.ScheduleLagP99.Round(time.Microsecond), + verdict.SampleCount, verdict.ArrivalInterval.Round(time.Microsecond)) + } + collector.EmitRunSummary(ctx, summary) if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 { log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d) diff --git a/sender/worker.go b/sender/worker.go index 21d33e4..d26e2c0 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -169,6 +169,13 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro startTime := time.Now() // Sole owner between dequeue and hand-off: stamp is race-free (see LoadTx). tx.AttemptedSendTime = startTime + // schedule_lag self-check: only open-loop txs carry a true scheduled + // instant. A zero IntendedSendTime (prewarm) is excluded here; the + // closed-loop enqueue time is excluded at the run level (the verdict gates + // on the arrival model, see stats.EvaluateScheduleLag). + if !tx.IntendedSendTime.IsZero() { + w.cfg.Collector.RecordScheduleLag(startTime.Sub(tx.IntendedSendTime)) + } err = w.sendTransaction(ctx, client, tx) // OnComplete must fire only after the real send returns — that is what // bounds true unacked in-flight (see doc.go). Nil on closed-loop/batch. @@ -177,8 +184,8 @@ func (w *Worker) runTxSender(ctx context.Context, client *ethclient.Client) erro } w.cfg.Collector.RecordTransaction(tx.Scenario.Name, w.cfg.Endpoint, time.Since(startTime), err == nil) // Register at send-completion, only on success: registered ⊆ succeeded. - // (The tracker is wired only for live runs — see main.go; DryRun never - // gets a tracker, so simulated sends are not inclusion-tracked.) + // The tracker is present only for live runs (wired in main.go; never under + // DryRun). if err == nil { if t, ok := w.cfg.Inclusion.Get(); ok { t.Register(tx) diff --git a/stats/collector.go b/stats/collector.go index 3c2ea7e..9523d7f 100644 --- a/stats/collector.go +++ b/stats/collector.go @@ -2,11 +2,20 @@ package stats import ( "fmt" + "math/rand" "sort" "sync" "time" ) +// scheduleLagReservoirCap bounds the schedule_lag sample set. Open-loop runs can +// emit millions of txs; storing every lag is unbounded memory. A reservoir of +// this size keeps a uniform random sample of the full run (Algorithm R), so the +// p99 stays representative regardless of run length — unlike tail-trimming, +// which would bias the percentile toward the run's final window. ~16k * +// time.Duration (8B) ≈ 128KB, negligible against the tx working set. +const scheduleLagReservoirCap = 16384 + // Collector tracks comprehensive statistics for load testing type Collector struct { mu sync.RWMutex @@ -34,6 +43,26 @@ type Collector struct { totalTxs uint64 lastWindowTime time.Time + // schedule_lag reservoir: a bounded uniform sample of per-tx send lag + // (AttemptedSendTime − IntendedSendTime), used for the open-loop self-check + // verdict. scheduleLagSeen is the total count of recorded samples (the + // reservoir's n), needed for Algorithm R replacement probability. + scheduleLag []time.Duration + scheduleLagSeen uint64 + scheduleLagRand *rand.Rand + + // Unsampled schedule_lag tail signal: the reservoir is a uniform sample, so a + // sub-percentile late-run tail can stay under the whole-run p99 yet still mean + // the generator fell behind. These exact (un-sampled) counters give the + // verdict a tail-degradation signal the reservoir cannot dilute. + // scheduleLagBound is the VOID bound (threshold × 1/λ) set once at run start + // for a fixed-λ open-loop run; zero leaves over-bound counting inert (ramped / + // closed-loop / no-λ runs are N/A anyway). scheduleLagMax is the largest lag + // ever recorded, surfaced for diagnostics. + scheduleLagBound time.Duration + scheduleLagOverBound uint64 + scheduleLagMax time.Duration + // Configuration maxLatencyHistory int // Limit latency history to prevent memory leaks } @@ -56,9 +85,77 @@ func NewCollector() *Collector { startTime: time.Now(), lastWindowTime: time.Now(), maxLatencyHistory: 10000, // Keep last 10k latencies per endpoint + scheduleLag: make([]time.Duration, 0, scheduleLagReservoirCap), + // Local source: this is a self-check sample, not security-sensitive, and a + // per-collector source avoids contending the global rand mutex on the hot + // send path. + scheduleLagRand: rand.New(rand.NewSource(time.Now().UnixNano())), //nolint:gosec // sampling, not crypto + } +} + +// RecordScheduleLag records one open-loop send lag (AttemptedSendTime − +// IntendedSendTime) into the bounded reservoir. The worker calls it right after +// stamping AttemptedSendTime, only when IntendedSendTime is set (open-loop txs; +// closed-loop/prewarm pass a zero IntendedSendTime and are excluded by the +// caller). Negative lags (clock skew between scheduler and worker reads) are +// clamped to zero so they cannot deflate the p99. +func (c *Collector) RecordScheduleLag(lag time.Duration) { + if lag < 0 { + lag = 0 + } + c.mu.Lock() + defer c.mu.Unlock() + + if lag > c.scheduleLagMax { + c.scheduleLagMax = lag + } + if c.scheduleLagBound > 0 && lag > c.scheduleLagBound { + c.scheduleLagOverBound++ + } + + c.scheduleLagSeen++ + if len(c.scheduleLag) < scheduleLagReservoirCap { + c.scheduleLag = append(c.scheduleLag, lag) + return + } + // Reservoir full: replace a uniformly random slot with probability + // cap/seen (Algorithm R), keeping the retained set a uniform sample. + j := c.scheduleLagRand.Int63n(int64(c.scheduleLagSeen)) + if j < int64(scheduleLagReservoirCap) { + c.scheduleLag[j] = lag } } +// ScheduleLagSamples returns a copy of the current schedule_lag reservoir. Call +// at run end to feed EvaluateScheduleLag; the copy keeps the caller's percentile +// sort off the live slice. +func (c *Collector) ScheduleLagSamples() []time.Duration { + c.mu.RLock() + defer c.mu.RUnlock() + out := make([]time.Duration, len(c.scheduleLag)) + copy(out, c.scheduleLag) + return out +} + +// SetScheduleLagBound arms the unsampled over-bound counter with the run's VOID +// bound (threshold × 1/λ). Call once at run start, only for a fixed-λ open-loop +// run; left unset (or set to <=0), over-bound counting stays inert. +func (c *Collector) SetScheduleLagBound(bound time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.scheduleLagBound = bound +} + +// ScheduleLagTail reports the exact (un-sampled) tail figures: total lags +// recorded, how many exceeded the VOID bound, and the max lag observed. total is +// the true count, distinct from the bounded reservoir's len. overBound is zero +// when no bound was armed. +func (c *Collector) ScheduleLagTail() (total, overBound uint64, max time.Duration) { + c.mu.RLock() + defer c.mu.RUnlock() + return c.scheduleLagSeen, c.scheduleLagOverBound, c.scheduleLagMax +} + // RecordTransaction records a transaction attempt func (c *Collector) RecordTransaction(scenario, endpoint string, latency time.Duration, success bool) { c.mu.Lock() diff --git a/stats/metrics.go b/stats/metrics.go index 70ffd46..5b27dab 100644 --- a/stats/metrics.go +++ b/stats/metrics.go @@ -87,6 +87,27 @@ var ( "run_inflight_at_shutdown", metric.WithDescription("In-flight inclusion registry size at run end (emitted once at run end)"), metric.WithUnit("{transactions}"))) + + // Open-loop self-check. Emitted once at run end on every run; the + // verdict label distinguishes VALID / VOID / N/A so a generator-bound run is + // queryable, not just a log line. + runScheduleLagP99 = must(meter.Float64Gauge( + "run_schedule_lag_p99", + metric.WithDescription("p99 of per-tx send lag (attempted − intended) over this open-loop run (emitted once at run end)"), + metric.WithUnit("s"))) + + // Unsampled tail signal: the reservoir p99 above can dilute a sub-percentile + // late-run tail, so the verdict also gates on the exact over-bound fraction. + // Max is diagnostic; fraction is the gate. + runScheduleLagMax = must(meter.Float64Gauge( + "run_schedule_lag_max", + metric.WithDescription("max per-tx send lag (attempted − intended) over this open-loop run, un-sampled (emitted once at run end)"), + metric.WithUnit("s"))) + + runScheduleLagOverBoundFraction = must(meter.Float64Gauge( + "run_schedule_lag_over_bound_fraction", + metric.WithDescription("exact fraction of sends whose lag exceeded the VOID bound over this open-loop run (emitted once at run end)"), + metric.WithUnit("1"))) ) // meteredInclusionTrackers backs the inclusion_inflight gauge: each tracker diff --git a/stats/run_summary.go b/stats/run_summary.go index e222142..591d211 100644 --- a/stats/run_summary.go +++ b/stats/run_summary.go @@ -30,7 +30,6 @@ type RunSummary struct { // registered == Included + Expired + InflightAtShutdown, with // registered ⊆ succeeded. InclusionTracked disambiguates a not-tracked run // (all zero, flag false) from a tracked run with no inclusions yet. - // TODO(PLT-467): owns run-summary schema versioning for these fields. InclusionTracked bool // Included is the count of txs the tracker observed on-chain (stamped). Included uint64 @@ -41,6 +40,26 @@ type RunSummary struct { DroppedAtCap uint64 // InflightAtShutdown is len(inflight) read after workers and tracker joined. InflightAtShutdown uint64 + + // Open-loop self-check: schedule_lag = AttemptedSendTime − + // IntendedSendTime per tx. A p99 above the threshold fraction of the arrival + // interval (1/λ) means the generator could not keep its own schedule, so the + // run was generator-bound, not open-loop, and is VOID. Reported on every run + // regardless of verdict; Verdict is N/A for closed-loop or non-fixed-λ runs. + ScheduleLagP99 time.Duration + // ScheduleLagMax is the largest single send lag recorded (un-sampled), + // surfaced for diagnostics; the verdict gates on a fraction, not this max. + ScheduleLagMax time.Duration + // ScheduleLagOverBoundCount / ScheduleLagTotal are the exact (un-sampled) + // count of sends past the VOID bound and the total recorded; their ratio is + // the tail-degradation signal the sampled p99 cannot dilute. Total zero / no + // bound armed means the figures are inert (non-fixed-λ run). + ScheduleLagOverBoundCount uint64 + ScheduleLagTotal uint64 + // Verdict is VerdictValid, VerdictVoid, or VerdictNA. + Verdict string + // VoidReason explains a VOID verdict; empty otherwise. + VoidReason string } // EmitRunSummary records the run-summary gauges. Call once at shutdown. @@ -61,4 +80,20 @@ func (c *Collector) EmitRunSummary(ctx context.Context, summary RunSummary) { if summary.InclusionTracked { runInflightAtShutdown.Record(ctx, int64(summary.InflightAtShutdown)) } + runScheduleLagP99.Record(ctx, summary.ScheduleLagP99.Seconds(), + metric.WithAttributes( + attribute.String("arrival_model", summary.ArrivalModel), + attribute.String("verdict", summary.Verdict))) + runScheduleLagMax.Record(ctx, summary.ScheduleLagMax.Seconds(), + metric.WithAttributes( + attribute.String("arrival_model", summary.ArrivalModel), + attribute.String("verdict", summary.Verdict))) + var overBoundFraction float64 + if summary.ScheduleLagTotal > 0 { + overBoundFraction = float64(summary.ScheduleLagOverBoundCount) / float64(summary.ScheduleLagTotal) + } + runScheduleLagOverBoundFraction.Record(ctx, overBoundFraction, + metric.WithAttributes( + attribute.String("arrival_model", summary.ArrivalModel), + attribute.String("verdict", summary.Verdict))) } diff --git a/stats/schedule_lag_test.go b/stats/schedule_lag_test.go new file mode 100644 index 0000000..90b96f1 --- /dev/null +++ b/stats/schedule_lag_test.go @@ -0,0 +1,90 @@ +package stats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRecordScheduleLag_SamplesRoundTrip(t *testing.T) { + c := NewCollector() + c.RecordScheduleLag(1 * time.Millisecond) + c.RecordScheduleLag(2 * time.Millisecond) + c.RecordScheduleLag(3 * time.Millisecond) + + got := c.ScheduleLagSamples() + require.ElementsMatch(t, []time.Duration{1 * time.Millisecond, 2 * time.Millisecond, 3 * time.Millisecond}, got) + + // Returned slice is a copy: mutating it must not affect the collector. + got[0] = 999 * time.Second + require.NotContains(t, c.ScheduleLagSamples(), 999*time.Second) +} + +// Negative lags (scheduler/worker clock-read skew) clamp to zero so they cannot +// deflate the p99. +func TestRecordScheduleLag_NegativeClampsToZero(t *testing.T) { + c := NewCollector() + c.RecordScheduleLag(-5 * time.Millisecond) + require.Equal(t, []time.Duration{0}, c.ScheduleLagSamples()) +} + +// The reservoir is bounded: recording far past the cap never grows the sample +// set beyond scheduleLagReservoirCap. +func TestRecordScheduleLag_ReservoirBounded(t *testing.T) { + c := NewCollector() + for i := range scheduleLagReservoirCap * 4 { + c.RecordScheduleLag(time.Duration(i) * time.Nanosecond) + } + require.Len(t, c.ScheduleLagSamples(), scheduleLagReservoirCap) +} + +// End-to-end through the collector: a known sample set yields the expected p99 +// verdict, proving the record → sample → evaluate path agrees. +func TestRecordScheduleLag_FeedsVerdict(t *testing.T) { + c := NewCollector() + for range 99 { + c.RecordScheduleLag(100 * time.Microsecond) + } + c.RecordScheduleLag(50 * time.Millisecond) + + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: c.ScheduleLagSamples(), TargetTPS: 100, OpenLoop: true, Admitted: 100, + }) + require.Equal(t, VerdictVoid, v.Verdict) +} + +// The unsampled tail counters are exact, not reservoir-diluted: with the bound +// armed, every over-bound lag is counted regardless of reservoir replacement, +// and the max is the true max. +func TestRecordScheduleLag_UnsampledTailCounters(t *testing.T) { + c := NewCollector() + // Bound = 10% of 1/100 = 1ms (matches ScheduleLagBound(100, 0.10)). + c.SetScheduleLagBound(ScheduleLagBound(100, 0.10)) + + // Record far more than the reservoir cap so sampling is in play. + const over = 50 + for range scheduleLagReservoirCap * 2 { + c.RecordScheduleLag(100 * time.Microsecond) // under bound + } + for range over { + c.RecordScheduleLag(5 * time.Millisecond) // over the 1ms bound + } + c.RecordScheduleLag(80 * time.Millisecond) // the max + + total, overBound, max := c.ScheduleLagTail() + require.Equal(t, uint64(scheduleLagReservoirCap*2+over+1), total) + require.Equal(t, uint64(over+1), overBound) // exact, not sampled + require.Equal(t, 80*time.Millisecond, max) +} + +// Without an armed bound the over-bound counter stays inert (ramped / +// closed-loop / no-λ runs), but the max is still tracked for diagnostics. +func TestRecordScheduleLag_OverBoundInertWhenUnset(t *testing.T) { + c := NewCollector() + c.RecordScheduleLag(500 * time.Millisecond) + total, overBound, max := c.ScheduleLagTail() + require.Equal(t, uint64(1), total) + require.Equal(t, uint64(0), overBound) // no bound armed → inert + require.Equal(t, 500*time.Millisecond, max) +} diff --git a/stats/verdict.go b/stats/verdict.go new file mode 100644 index 0000000..3a8c3c7 --- /dev/null +++ b/stats/verdict.go @@ -0,0 +1,210 @@ +package stats + +import ( + "fmt" + "sort" + "time" +) + +// scheduleLagVoidThreshold is the fraction of the arrival interval (1/λ) that +// schedule_lag_p99 may reach before the run is VOID: a p99 send lag above this +// fraction means the generator could not keep up with its own schedule, so the +// load was generator-bound, not open-loop, and the run does not measure the SUT. +// Provisional value — tune from first calibration run. +const scheduleLagVoidThreshold = 0.10 + +// scheduleLagOverBoundFraction is the share of recorded sends that may exceed the +// VOID bound before the run is VOID on the unsampled tail signal. The whole-run +// p99 is computed from a uniform reservoir sample, so a sub-percentile late-run +// tail (the generator hiccupping near the end of a long run) can stay under the +// p99 yet still mean the generator fell behind. This exact (un-sampled) fraction +// catches that tail; it is a fraction, not a single max-lag, so a lone GC-pause +// outlier does not trip it. Provisional — tune from first calibration run. +const scheduleLagOverBoundFraction = 0.005 + +// Verdict labels for a run's open-loop self-check. +const ( + VerdictValid = "VALID" + VerdictVoid = "VOID" + // VerdictNA marks a run where the self-check does not apply: closed-loop, a + // ramped λ (no single 1/λ), no fixed arrival rate, or a fixed-λ run that + // recorded zero schedule_lag samples (cannot prove open-loop either way). + // schedule_lag_p99 is still reported, but no pass/fail gate is rendered. + VerdictNA = "N/A" +) + +// ScheduleLagVerdict is the self-check result that proves an open-loop run was +// actually open-loop. schedule_lag = AttemptedSendTime − IntendedSendTime per +// tx; its p99 is checked against threshold × (1/λ). It is computed on every run +// and reported regardless of outcome. +type ScheduleLagVerdict struct { + // Verdict is VerdictValid, VerdictVoid, or VerdictNA. + Verdict string + // VoidReason is a human-readable explanation, empty unless Verdict is VOID. + VoidReason string + // NAReason explains an N/A verdict (why no gate applies); empty otherwise. + NAReason string + // Anomaly is true when the inputs are self-inconsistent — admitted txs but + // zero schedule_lag samples — so the caller can log loudly: the recorder is + // likely mis-wired rather than the run being clean. + Anomaly bool + // ScheduleLagP99 is the 99th-percentile send lag across sampled open-loop + // txs; zero when no open-loop samples were recorded. + ScheduleLagP99 time.Duration + // SampleCount is the number of schedule_lag samples the verdict is based on. + SampleCount int + // ArrivalInterval is 1/λ, the bound's reference interval; zero when λ is not + // a single fixed rate (e.g. ramping with no configured TPS). + ArrivalInterval time.Duration + // Threshold is the fraction of ArrivalInterval used as the VOID boundary. + Threshold float64 + // OverBoundCount is the exact (un-sampled) count of sends whose lag exceeded + // the VOID bound; OverBoundTotal is the exact total recorded. Their ratio is + // the tail-degradation signal the reservoir p99 cannot dilute. + OverBoundCount uint64 + OverBoundTotal uint64 + // MaxLag is the largest lag recorded over the run (un-sampled), surfaced for + // diagnostics; it is not a gate on its own (a fraction is, to survive a lone + // outlier). + MaxLag time.Duration +} + +// ScheduleLagInputs carries the verdict inputs. It replaces a long positional +// signature (the tail figures pushed it past the point where adjacent bools and +// uints read clearly at the call site). +type ScheduleLagInputs struct { + // Samples is the reservoir copy used for the p99. + Samples []time.Duration + // TargetTPS is the configured λ; <=0 means no fixed rate → N/A. + TargetTPS float64 + // OpenLoop and Ramped gate applicability: only a fixed-λ open-loop, + // non-ramped run is evaluated. + OpenLoop bool + Ramped bool + // Admitted is the count of admitted txs, used only to flag the + // admitted-but-no-samples anomaly. + Admitted uint64 + // Threshold is the VOID fraction of 1/λ for the p99 bound; <=0 falls back to + // the provisional default. + Threshold float64 + // OverBoundCount / OverBoundTotal / MaxLag are the collector's exact + // (un-sampled) tail figures (see Collector.ScheduleLagTail). + OverBoundCount uint64 + OverBoundTotal uint64 + MaxLag time.Duration +} + +// EvaluateScheduleLag computes the open-loop self-check verdict. p99 is the +// sorted-slice percentile of the reservoir sample, matching the repo's block-time +// percentile idiom; the run is also VOID on the exact (un-sampled) over-bound +// fraction, the tail signal the reservoir cannot dilute. +// +// The verdict is N/A — reported, never a gate — when the model is not open-loop, +// when the run ramped λ (a ramp has no single 1/λ to bound against, and the +// ramper drives the live limit so the configured λ is stale), or when λ is not a +// single fixed rate (TargetTPS <= 0). A fixed-λ open-loop run that recorded zero +// schedule_lag samples is also N/A, not VALID: zero samples cannot distinguish a +// SUT that kept up from a recorder that never fired. When Admitted > 0 yet no +// samples landed, Anomaly is set so the caller logs the mis-wiring loudly. +// schedule_lag_p99 is still reported in every case. +func EvaluateScheduleLag(in ScheduleLagInputs) ScheduleLagVerdict { + threshold := in.Threshold + if threshold <= 0 { + threshold = scheduleLagVoidThreshold + } + + v := ScheduleLagVerdict{ + Verdict: VerdictNA, + ScheduleLagP99: scheduleLagPercentile(in.Samples, 99), + SampleCount: len(in.Samples), + Threshold: threshold, + OverBoundCount: in.OverBoundCount, + OverBoundTotal: in.OverBoundTotal, + MaxLag: in.MaxLag, + } + + if !in.OpenLoop { + v.NAReason = "closed-loop run: open-loop self-check does not apply" + return v + } + if in.Ramped { + v.NAReason = "ramped λ has no single arrival interval" + return v + } + if in.TargetTPS <= 0 { + v.NAReason = "no fixed arrival rate (λ): nothing to bound against" + return v + } + + arrivalInterval := time.Duration(float64(time.Second) / in.TargetTPS) + v.ArrivalInterval = arrivalInterval + + // Zero samples is N/A, not VALID: it cannot tell a SUT that kept up from a + // recorder that never fired or a run that dropped every tick. Admitted txs + // with no samples is an outright anomaly — flag it for the caller. + if len(in.Samples) == 0 { + v.NAReason = "no schedule_lag samples recorded" + v.Anomaly = in.Admitted > 0 + return v + } + + bound := ScheduleLagBound(in.TargetTPS, threshold) + + // Whole-run p99 over bound: the run was generator-bound across the sample. + if v.ScheduleLagP99 > bound { + v.Verdict = VerdictVoid + v.VoidReason = formatP99VoidReason(v.ScheduleLagP99, bound, threshold, arrivalInterval) + return v + } + // Unsampled tail: a sub-percentile share over the bound that the reservoir + // p99 diluted. Checked only when the bound was armed (OverBoundTotal > 0). + if in.OverBoundTotal > 0 { + if frac := float64(in.OverBoundCount) / float64(in.OverBoundTotal); frac > scheduleLagOverBoundFraction { + v.Verdict = VerdictVoid + v.VoidReason = formatTailVoidReason(in.OverBoundCount, in.OverBoundTotal, frac, bound) + return v + } + } + v.Verdict = VerdictValid + return v +} + +// ScheduleLagBound is the VOID bound, threshold × 1/λ, for a fixed-λ open-loop +// run. Returns zero when there is no single fixed rate (targetTPS <= 0), so the +// caller leaves the collector's over-bound counter inert. threshold <= 0 falls +// back to the provisional default, matching EvaluateScheduleLag. +func ScheduleLagBound(targetTPS, threshold float64) time.Duration { + if targetTPS <= 0 { + return 0 + } + if threshold <= 0 { + threshold = scheduleLagVoidThreshold + } + arrivalInterval := time.Duration(float64(time.Second) / targetTPS) + return time.Duration(threshold * float64(arrivalInterval)) +} + +func formatP99VoidReason(p99, bound time.Duration, threshold float64, arrivalInterval time.Duration) string { + return fmt.Sprintf( + "generator-bound: schedule_lag_p99 %s exceeds %s bound (%.0f%% of arrival interval %s) — load was not open-loop", + p99.Round(time.Microsecond), bound.Round(time.Microsecond), threshold*100, arrivalInterval.Round(time.Microsecond)) +} + +func formatTailVoidReason(overBound, total uint64, frac float64, bound time.Duration) string { + return fmt.Sprintf( + "tail degradation: %.2f%% of sends (%d/%d) exceeded the %s bound — generator fell behind on a sub-percentile tail the p99 missed", + frac*100, overBound, total, bound.Round(time.Microsecond)) +} + +// scheduleLagPercentile returns the percentile of a copy-then-sort of samples, +// reusing the repo's calculatePercentile index rule. Copies so the caller's +// slice order is preserved. +func scheduleLagPercentile(samples []time.Duration, percentile int) time.Duration { + if len(samples) == 0 { + return 0 + } + sorted := make([]time.Duration, len(samples)) + copy(sorted, samples) + sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) + return calculatePercentile(sorted, percentile) +} diff --git a/stats/verdict_test.go b/stats/verdict_test.go new file mode 100644 index 0000000..9bb123b --- /dev/null +++ b/stats/verdict_test.go @@ -0,0 +1,188 @@ +package stats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// lags builds a sample slice from millisecond values for readability. +func lags(ms ...int) []time.Duration { + out := make([]time.Duration, len(ms)) + for i, m := range ms { + out[i] = time.Duration(m) * time.Millisecond + } + return out +} + +// At 100 TPS the arrival interval is 10ms, so the VOID bound at the default 10% +// threshold is 1ms. An over-driven run whose p99 sits well above that is VOID. +func TestEvaluateScheduleLag_OverDrivenIsVoid(t *testing.T) { + // 100 samples mostly small, but the top tail (p99 index = 99) is large. + samples := make([]time.Duration, 0, 100) + for range 99 { + samples = append(samples, 100*time.Microsecond) + } + samples = append(samples, 50*time.Millisecond) // the p99 element + + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: true, Admitted: 100, + }) + + require.Equal(t, VerdictVoid, v.Verdict) + require.NotEmpty(t, v.VoidReason) + require.Equal(t, 50*time.Millisecond, v.ScheduleLagP99) + require.Equal(t, 10*time.Millisecond, v.ArrivalInterval) +} + +// A healthy run keeps p99 below 10% of the 10ms interval (1ms) → VALID. +func TestEvaluateScheduleLag_HealthyIsValid(t *testing.T) { + samples := lags(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) // all 0ms, p99 = 0 + samples = append(samples, 200*time.Microsecond) + + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: true, Admitted: 100, + }) + + require.Equal(t, VerdictValid, v.Verdict) + require.Empty(t, v.VoidReason) + require.Less(t, v.ScheduleLagP99, time.Millisecond) +} + +// p99 must match the repo's sorted-slice index rule for a known set. +func TestEvaluateScheduleLag_P99ComputedCorrectly(t *testing.T) { + // 100 samples 1ms..100ms; index = (100*99)/100 = 99 → sorted[99] = 100ms. + samples := make([]time.Duration, 0, 100) + for i := 1; i <= 100; i++ { + samples = append(samples, time.Duration(i)*time.Millisecond) + } + + // targetTPS=0 keeps verdict N/A but still reports p99. + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 0, OpenLoop: true, Admitted: 100, + }) + require.Equal(t, 100*time.Millisecond, v.ScheduleLagP99) + require.Equal(t, 100, v.SampleCount) +} + +// Closed-loop runs are reported but never gated: N/A regardless of lag size. +func TestEvaluateScheduleLag_ClosedLoopIsNA(t *testing.T) { + samples := lags(500, 500, 500) // huge lag, would be VOID if open-loop + + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: false, Admitted: 3, + }) + + require.Equal(t, VerdictNA, v.Verdict) + require.Empty(t, v.VoidReason) + require.Equal(t, 500*time.Millisecond, v.ScheduleLagP99) // still reported +} + +// Open-loop with no fixed λ (TPS=0) cannot bound against 1/λ → N/A. +func TestEvaluateScheduleLag_NoFixedRateIsNA(t *testing.T) { + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: lags(100, 200, 300), TargetTPS: 0, OpenLoop: true, Admitted: 3, + }) + require.Equal(t, VerdictNA, v.Verdict) + require.Equal(t, time.Duration(0), v.ArrivalInterval) +} + +// A ramped run drives λ via the limiter, so the configured TPS is stale and +// there is no single 1/λ to gate against — N/A regardless of TPS. +func TestEvaluateScheduleLag_RampedIsNA(t *testing.T) { + // TPS>0 but ramped: must still be N/A, not gated against the stale 1/TPS. + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: lags(500, 500, 500), TargetTPS: 100, OpenLoop: true, Ramped: true, Admitted: 3, + }) + require.Equal(t, VerdictNA, v.Verdict) + require.Empty(t, v.VoidReason) + require.Equal(t, "ramped λ has no single arrival interval", v.NAReason) + require.Equal(t, time.Duration(0), v.ArrivalInterval) +} + +// No samples on a fixed-λ run is N/A, not VALID: it cannot distinguish a SUT +// that kept up from a recorder that never fired. +func TestEvaluateScheduleLag_NoSamplesIsNA(t *testing.T) { + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: nil, TargetTPS: 100, OpenLoop: true, Admitted: 0, + }) + require.Equal(t, VerdictNA, v.Verdict) + require.Equal(t, "no schedule_lag samples recorded", v.NAReason) + require.False(t, v.Anomaly) // zero admitted: no anomaly, just an empty run + require.Equal(t, time.Duration(0), v.ScheduleLagP99) +} + +// Admitted txs but zero samples is an anomaly: the recorder likely never fired. +func TestEvaluateScheduleLag_AdmittedButNoSamplesIsAnomaly(t *testing.T) { + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: nil, TargetTPS: 100, OpenLoop: true, Admitted: 5000, + }) + require.Equal(t, VerdictNA, v.Verdict) + require.Equal(t, "no schedule_lag samples recorded", v.NAReason) + require.True(t, v.Anomaly) +} + +// A configured threshold overrides the default boundary. +func TestEvaluateScheduleLag_ConfiguredThreshold(t *testing.T) { + samples := lags(2) // p99 = 2ms; interval at 100 TPS = 10ms + // 10% bound = 1ms → VOID; 50% bound = 5ms → VALID. + require.Equal(t, VerdictVoid, EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: true, Admitted: 1, Threshold: 0.10, + }).Verdict) + require.Equal(t, VerdictValid, EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: true, Admitted: 1, Threshold: 0.50, + }).Verdict) +} + +// A late-run sub-percentile tail: whole-run p99 sits UNDER the bound (the +// reservoir diluted the tail), but the exact over-bound fraction exceeds the +// threshold → VOID with the tail reason. At 100 TPS / 10% the bound is 1ms. +func TestEvaluateScheduleLag_TailDegradationIsVoid(t *testing.T) { + // p99 of the sample is comfortably under the 1ms bound. + samples := lags(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + // 0.8% of 100k sends exceeded the bound — above the 0.5% fraction. + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: true, Admitted: 100_000, + OverBoundCount: 800, OverBoundTotal: 100_000, MaxLag: 80 * time.Millisecond, + }) + require.Equal(t, VerdictVoid, v.Verdict) + require.Contains(t, v.VoidReason, "tail degradation") + require.Contains(t, v.VoidReason, "0.80%") + require.Less(t, v.ScheduleLagP99, time.Millisecond) // p99 alone would pass + require.Equal(t, 80*time.Millisecond, v.MaxLag) // surfaced for diagnostics +} + +// A single over-bound outlier (one GC pause) well under the fraction must NOT +// trip the tail gate: the run stays VALID. This is why the gate is a fraction, +// not maxLag alone. +func TestEvaluateScheduleLag_LoneOutlierStaysValid(t *testing.T) { + samples := lags(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + // 1 / 100k = 0.001% over bound, far below the 0.5% fraction. + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: true, Admitted: 100_000, + OverBoundCount: 1, OverBoundTotal: 100_000, MaxLag: 200 * time.Millisecond, + }) + require.Equal(t, VerdictValid, v.Verdict) + require.Empty(t, v.VoidReason) + require.Equal(t, 200*time.Millisecond, v.MaxLag) // still surfaced +} + +// A clean run with no over-bound sends is VALID; the tail gate is a no-op. +func TestEvaluateScheduleLag_NoOverBoundIsValid(t *testing.T) { + samples := lags(0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + v := EvaluateScheduleLag(ScheduleLagInputs{ + Samples: samples, TargetTPS: 100, OpenLoop: true, Admitted: 100_000, + OverBoundCount: 0, OverBoundTotal: 100_000, MaxLag: 500 * time.Microsecond, + }) + require.Equal(t, VerdictValid, v.Verdict) + require.Empty(t, v.VoidReason) +} + +// ScheduleLagBound returns threshold × 1/λ, zero when λ is not fixed, and falls +// back to the default threshold — the single source the collector arms from. +func TestScheduleLagBound(t *testing.T) { + require.Equal(t, time.Millisecond, ScheduleLagBound(100, 0.10)) // 10% of 10ms + require.Equal(t, time.Millisecond, ScheduleLagBound(100, 0)) // default 0.10 + require.Equal(t, time.Duration(0), ScheduleLagBound(0, 0.10)) // no fixed λ +}