diff --git a/bin/refresh-all-issues b/bin/refresh-all-issues index 7123fa26..7f591c15 100755 --- a/bin/refresh-all-issues +++ b/bin/refresh-all-issues @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.allIssues(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/bin/refresh-all-pulls b/bin/refresh-all-pulls index ebaa025e..e042fb30 100755 --- a/bin/refresh-all-pulls +++ b/bin/refresh-all-pulls @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.allPulls(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/bin/refresh-open-issues b/bin/refresh-open-issues index 86f56d4a..29d1ef1e 100755 --- a/bin/refresh-open-issues +++ b/bin/refresh-open-issues @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.openIssues(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/bin/refresh-open-pulls b/bin/refresh-open-pulls index fef36e78..602ee1ef 100755 --- a/bin/refresh-open-pulls +++ b/bin/refresh-open-pulls @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.openPulls(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/config.example.js b/config.example.js index 97789c12..de99381a 100644 --- a/config.example.js +++ b/config.example.js @@ -50,6 +50,11 @@ module.exports = { callbackURL: "http://localhost:" + port + "/auth/github/callback", // An API token for the backend to make API requests with. token: "oauth api token for server-side api calls", + // How much of the token's hourly GitHub API quota to reserve for live + // webhook/socket traffic. Bulk backfills (bin/refresh-*) pace themselves to + // stay above this floor and pause when remaining quota hits it, so a large + // backfill never starves normal operation. Defaults to 1000 if omitted. + bulkReserve: 1000, // This will need to be the same secret you use on the Webhooks page for // the repo Pulldasher is going to monitor. hook_secret: diff --git a/lib/git-manager.js b/lib/git-manager.js index b9ae5259..d142f6bd 100644 --- a/lib/git-manager.js +++ b/lib/git-manager.js @@ -14,6 +14,7 @@ import Label from "../models/label.js"; import Status from "../models/status.js"; import Signature from "../models/signature.js"; import getLogin from "./get-user-login.js"; +import { noopPacer } from "./pacer.js"; const MyOctokit = Octokit.plugin(throttling, retry); const gitDebug = debug("pulldasher:github"); @@ -74,6 +75,34 @@ github.hook.error("request", (error, options) => { throw error; }); +/** + * Feed a pacer the quota headers from every GitHub response, so it can pace bulk + * work against real consumption — crucially the per-pull/issue fan-out deep in + * `parse`/`parseIssue`, which is the bulk of a backfill's spend and which the + * caller can't see to observe itself. Only the CLI backfill bins call this (once + * at startup, with their pacer); the server never installs an observer, so its + * webhook/socket responses aren't tracked and nothing is paced. + * + * Installs the hooks at most once per process: they close over the first + * pacer, and a second call would only stack duplicate observers on the shared + * client (double-counting every response). The bins call this exactly once. + */ +let rateLimitObserved = false; +export function observeRateLimit(pacer) { + if (rateLimitObserved) { + return; + } + rateLimitObserved = true; + github.hook.after("request", (response) => { + pacer.observe(response.headers); + }); + github.hook.error("request", (error) => { + // A failed response still reports the current quota. + pacer.observe(error.response && error.response.headers); + throw error; + }); +} + const githubRest = github.rest; export default { @@ -98,9 +127,9 @@ export default { * * Returns a promise which resolves to an array of all open pull requests */ - getOpenPulls: function (repo) { + getOpenPulls: function (repo, pacer = noopPacer) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "open" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo), pacer), "Getting open pulls in repo %s", repo ); @@ -111,9 +140,9 @@ export default { * * Returns a promise which resolves to an array of all pull requests */ - getAllPulls: function (repo) { + getAllPulls: function (repo, pacer = noopPacer) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "all" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo), pacer), "Getting all pulls in repo %s", repo ); @@ -131,11 +160,10 @@ export default { * * Returns a promise which resolves to an array of all open issues */ - getOpenIssues: function (repo) { + getOpenIssues: function (repo, pacer = noopPacer) { const searchParams = params({ state: "open" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams, pacer) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting open issues in repo %s", @@ -148,11 +176,10 @@ export default { * * Returns a promise which resolves to an array of all issues */ - getAllIssues: function (repo) { + getAllIssues: function (repo, pacer = noopPacer) { const searchParams = params({ state: "all" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams, pacer) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting all issues in repo %s", @@ -645,3 +672,35 @@ function logErrors(promise, ...messageAndArgs) { throw err; }); } + +/** + * Paginate a bulk listing, pacing between pages so a full-repo list can't burst + * through the quota. Page one fetches un-paced and seeds the pacer's quota view + * (the bin's observeRateLimit hook records each response); each `gate()` then + * spaces the fetch of the next page. The gate fronts a fetch, so it only runs + * when another page follows — gating after the last page would delay the result + * (or, below the reserve floor, pause for a full reset window) with no fetch + * left to pace. The `pacer` is injected by the caller — the CLI backfill bins + * pass a real one; everything else takes the default no-op, so the pages fetch + * unpaced (the server's startup open-pulls listing, and the live webhook/socket + * path, which never lists whole repos anyway). + */ +async function pacedPaginate(route, parameters, pacer = noopPacer) { + const items = []; + for await (const response of github.paginate.iterator(route, parameters)) { + items.push(...response.data); + if (hasNextPage(response)) { + await pacer.gate(); + } + } + return items; +} + +/** + * Whether a paginated response links to a further page — the same `rel="next"` + * Link-header signal Octokit's own iterator uses to decide whether to continue. + */ +function hasNextPage(response) { + const link = response.headers && response.headers.link; + return Boolean(link) && link.includes('rel="next"'); +} diff --git a/lib/pacer.js b/lib/pacer.js new file mode 100644 index 00000000..9cbfb61c --- /dev/null +++ b/lib/pacer.js @@ -0,0 +1,147 @@ +import config from "./config-loader.js"; +import debug from "./debug.js"; +import Promise from "bluebird"; + +const pacerDebug = debug("pulldasher:pacer"); + +// Only log a paced delay once it's long enough to explain a visibly slow +// backfill — short spacing isn't worth the noise. +const LOG_DELAY_THRESHOLD_MS = 5000; + +/** + * A pacer proactively paces bulk GitHub requests so a backfill yields to live + * traffic instead of draining the shared token's quota to zero. It is a + * per-process instance the entry point constructs and installs: only the CLI + * backfill bins build a real one (`createPacer`); the server installs nothing + * and runs against `noopPacer`, so live webhook/socket refreshes are never + * delayed. Rate state is global to the token, so within a process every bulk + * call gates against the same view, fed by `observe` on every response (live + * calls included). + * + * `observe(headers)` — record the latest quota from any response (free). + * `gate()` — await this before each *bulk* unit of work (a consumer drain or a + * pagination page); it spreads bulk work across the reset window and blocks + * entirely while below the reserve floor. + * + * Pacing is calibrated by actual consumption, not by gate count: each gate + * advances a token-bucket cursor by the number of requests spent since the last + * gate (`x-ratelimit-used` delta), so a pull whose processing fans out to a + * dozen calls correctly waits a dozen intervals — and a live-traffic spike, + * which also bumps `used`, pushes the cursor out and makes bulk yield. + */ +export function createPacer({ reserve = config.github.bulkReserve ?? 1000 } = {}) { + let remaining = null; + let reset = null; + let used = null; + let lastUsed = null; + let nextSlot = 0; + + return { + observe: function (headers) { + if (!headers) { + return; + } + if (headers["x-ratelimit-remaining"] !== undefined) { + remaining = Number(headers["x-ratelimit-remaining"]); + } + if (headers["x-ratelimit-reset"] !== undefined) { + reset = Number(headers["x-ratelimit-reset"]); + } + if (headers["x-ratelimit-used"] !== undefined) { + used = Number(headers["x-ratelimit-used"]); + } + }, + + gate: function () { + const now = Date.now(); + const result = computePace({ + remaining, + reset, + used, + reserve, + now, + nextSlot, + lastUsed, + }); + nextSlot = result.nextSlot; + lastUsed = result.lastUsed; + const delayMs = result.delayMs; + + // The floor-pause is the only positive delay that isn't even-spread pacing. + if (delayMs > 0 && remaining != null && remaining <= reserve) { + pacerDebug( + "bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset", + remaining, + reserve, + Math.round(delayMs / 1000) + ); + } else if (delayMs >= LOG_DELAY_THRESHOLD_MS) { + pacerDebug( + "pacing bulk: waiting %sms (remaining %s)", + Math.round(delayMs), + remaining + ); + } + + return Promise.delay(delayMs); + }, + }; +} + +// The Null Object pacer the server and webhook/socket path run against: every +// call is total, so call sites stay unconditional (no null guards on the hot +// Octokit hooks or the queue consumer). +export const noopPacer = { + observe: function () {}, + gate: function () { + return Promise.resolve(); + }, +}; + +/** + * Pure pacing decision. Given the latest quota view and a token-bucket cursor, + * returns how long the next bulk request should wait, the advanced cursor, and + * the `used` baseline to diff against next time. + * + * - unknown quota, or a `reset` already in the past (stale window) → allow + * immediately; the next response refreshes the view. + * - `remaining ≤ reserve` → pause until `reset` (let the window refill). + * - otherwise → even-spread: the spendable budget (`remaining − reserve`) + * divided across the time left in the window gives the per-request interval. + * Advance the cursor by `spent × interval`, where `spent` is the requests + * consumed since the last gate (`used − lastUsed`), so the spacing tracks + * real consumption — a multi-call item or a live spike pushes the next slot + * further out. A shrinking `remaining` also widens the interval, so bulk + * yields on both signals. + */ +export function computePace({ + remaining, + reset, + used, + reserve, + now, + nextSlot, + lastUsed, +}) { + const baseline = used ?? lastUsed; + // No quota view, or a window that already rolled over (our `remaining` is + // stale): proceed now and let the next response refresh the view. + const allowNow = { delayMs: 0, nextSlot: now, lastUsed: baseline }; + if (remaining == null || reset == null) { + return allowNow; + } + const resetMs = reset * 1000; + if (resetMs <= now) { + return allowNow; + } + if (remaining <= reserve) { + return { delayMs: resetMs - now, nextSlot: resetMs, lastUsed: baseline }; + } + const interval = (resetMs - now) / (remaining - reserve); + // Clamp at 0: across a window rollover the server resets `used` to ~0 while + // our `lastUsed` still holds the pre-reset high, so the delta can go negative. + const spent = + used == null || lastUsed == null ? 0 : Math.max(0, used - lastUsed); + const slot = Math.max(now, nextSlot + spent * interval); + return { delayMs: slot - now, nextSlot: slot, lastUsed: baseline }; +} diff --git a/lib/refresh.js b/lib/refresh.js index 16b64b90..047f9f65 100644 --- a/lib/refresh.js +++ b/lib/refresh.js @@ -1,59 +1,137 @@ -import gitManager from "./git-manager.js"; +import gitManager, { observeRateLimit } from "./git-manager.js"; import dbManager from "./db-manager.js"; import utils from "./utils.js"; import NotifyQueue from "notify-queue"; import debug from "./debug.js"; import Promise from "bluebird"; - -// Queues for making all refreshes be synchronous, one at a time. -var issueQueue = new NotifyQueue(); -var pullQueue = new NotifyQueue(); +import { createPacer, noopPacer } from "./pacer.js"; const refreshDebug = debug("pulldasher:refresh"); -export default { - /////// Issues ///////// - - issue: function refreshIssue(repo, number) { - refreshDebug("refresh issue %s", number); - return gitManager.getIssue(repo, number).then(pushOnQueue(issueQueue)); - }, - - allIssues: function refreshAllIssues(repos) { - refreshDebug("refresh all issues"); - return utils - .forEachRepo(gitManager.getAllIssues, { repos: repos }) - .then(drainThrough(issueQueue)); - }, - - openIssues: function refreshOpenIssues(repos) { - refreshDebug("refresh all open issues"); - return utils - .forEachRepo(gitManager.getOpenIssues, { repos: repos }) - .then(drainThrough(issueQueue)); - }, - - /////// Pulls ///////// - - pull: function refreshPull(repo, number) { - refreshDebug("refresh pull %s", number); - return gitManager.getPull(repo, number).then(pushOnQueue(pullQueue)); - }, - - allPulls: function refreshAllPulls(repos) { - refreshDebug("refresh all pull"); - return utils - .forEachRepo(gitManager.getAllPulls, { repos: repos }) - .then(drainThrough(pullQueue)); - }, - - openPulls: function refreshOpenPulls(repos) { - refreshDebug("refresh all open pulls"); - return utils - .forEachRepo(gitManager.getOpenPulls, { repos: repos }) - .then(drainThrough(pullQueue)); - }, -}; +/** + * Build a refresh API over its own pair of serial queues. The injected `pacer` + * gates the *consumer* — the drain, where the API call actually spends quota + * and reports the rate-limit headers — before each bulk work item. It defaults + * to a no-op, so the server (webhook/socket refreshes and the startup open-pulls + * refresh) runs unpaced. Only the CLI backfill bins pass a real pacer, and in + * that process the consumer drains nothing but bulk, so parking it on a paced or + * floor-paused slot starves nothing. Single-item webhook/socket refreshes push + * straight through; the gate only fronts work items, never the batch-done + * sentinel. + */ +export function createRefresh({ pacer = noopPacer } = {}) { + // Queues for making all refreshes be synchronous, one at a time. + const issueQueue = new NotifyQueue(); + const pullQueue = new NotifyQueue(); + + issueQueue.pop( + makeQueueConsumer(pacer, processIssueItem, { + parseIssue: gitManager.parseIssue, + updateAllIssueData: dbManager.updateAllIssueData, + }) + ); + + pullQueue.pop( + makeQueueConsumer(pacer, processPullItem, { + parse: gitManager.parse, + updateAllPullData: dbManager.updateAllPullData, + }) + ); + + return { + /////// Issues ///////// + + issue: function refreshIssue(repo, number) { + refreshDebug("refresh issue %s", number); + return gitManager.getIssue(repo, number).then(pushOnQueue(issueQueue)); + }, + + allIssues: function refreshAllIssues(repos) { + refreshDebug("refresh all issues"); + return utils + .forEachRepo((repo) => gitManager.getAllIssues(repo, pacer), { + repos: repos, + }) + .then(drainThrough(issueQueue)); + }, + + openIssues: function refreshOpenIssues(repos) { + refreshDebug("refresh all open issues"); + return utils + .forEachRepo((repo) => gitManager.getOpenIssues(repo, pacer), { + repos: repos, + }) + .then(drainThrough(issueQueue)); + }, + + /////// Pulls ///////// + + pull: function refreshPull(repo, number) { + refreshDebug("refresh pull %s", number); + return gitManager.getPull(repo, number).then(pushOnQueue(pullQueue)); + }, + + allPulls: function refreshAllPulls(repos) { + refreshDebug("refresh all pull"); + return utils + .forEachRepo((repo) => gitManager.getAllPulls(repo, pacer), { + repos: repos, + }) + .then(drainThrough(pullQueue)); + }, + + openPulls: function refreshOpenPulls(repos) { + refreshDebug("refresh all open pulls"); + return utils + .forEachRepo((repo) => gitManager.getOpenPulls(repo, pacer), { + repos: repos, + }) + .then(drainThrough(pullQueue)); + }, + }; +} + +// The default instance is unpaced (no-op pacer): the server imports this for +// its startup open-pulls refresh and webhook/socket refreshes. The CLI backfill +// bins build their own paced instance via createPacedRefresh. +export default createRefresh(); + +/** + * Build the refresh API a CLI backfill bin runs against. One per-process pacer + * is installed as a rate-limit observer on the GitHub client (so every + * response's quota headers feed it) and threaded into the queue consumers, so + * the whole bulk drain — the per-item fan-out, where the spend actually lands — + * and the list pagination pace against the shared token's quota, reserving + * headroom for the live server's webhook/socket refreshes (a separate process, + * same token). + */ +export function createPacedRefresh() { + const pacer = createPacer(); + observeRateLimit(pacer); + return createRefresh({ pacer }); +} + +/** + * Build a queue pop consumer. The queue holds two kinds of entries: work items + * `{ response, onFailure }` and a bare function batch-done sentinel (pushed + * after a batch). A sentinel just runs and advances — never gated. For a work + * item, await the pacer's slot *first* — the drain is where the API call spends + * quota — then hand the unwrapped response, plus the item's own onFailure + * collector, to the process function. onFailure rides on the item (not the + * fixed deps) so it stays scoped to the run that enqueued it: a bulk run + * collects its own failures, webhook/socket refreshes push null and don't + * accumulate. + */ +export function makeQueueConsumer(pacer, processItem, deps) { + return async function (item, next) { + if (typeof item === "function") { + item(); + return next(); + } + await pacer.gate(); + processItem(item.response, next, { ...deps, onFailure: item.onFailure }); + }; +} /** * Returns a function that will: @@ -61,12 +139,13 @@ export default { * and return a promise that is fulfilled when the item is fully processed. * * Single-item refreshes (webhooks, socket) have no end-of-run report, so no - * failure collector is attached. + * failure collector is attached — and they aren't quota-paced, so live traffic + * is never delayed. */ function pushOnQueue(queue) { return function (githubResponse) { - queue.push({ response: githubResponse, onFailure: null }); return new Promise(function (resolve) { + queue.push({ response: githubResponse, onFailure: null }); queue.push(resolve); }); }; @@ -80,7 +159,9 @@ function pushOnQueue(queue) { * * Each item carries the run's `onFailure` collector (null for single-item * webhook/socket refreshes), so a failed parse is recorded for that run's - * end-of-run report. + * end-of-run report. Enqueueing is unpaced — quota pacing happens at the + * consumer (see createRefresh), so the queue can fill freely while the drain + * spaces the actual API spend. */ function pushAllOnQueue(queue, onFailure) { return function (githubResponses) { @@ -205,34 +286,3 @@ export function processPullItem( next(); }); } - -// The queue holds two kinds of entries: a batch-done sentinel (the bare resolve -// function pushed after a batch, see pushAllOnQueue) and work items -// `{ response, onFailure }`. onFailure rides on the item rather than on the -// queue or the fixed pop deps, so it stays scoped to the run that enqueued it — -// a bulk run collects its own failures, while webhook/socket refreshes (which -// push onFailure null) don't accumulate. Unwrap so the process functions see -// one response plus their injected collaborators. -issueQueue.pop(function (item, next) { - if (typeof item === "function") { - item(); - return next(); - } - processIssueItem(item.response, next, { - parseIssue: gitManager.parseIssue, - updateAllIssueData: dbManager.updateAllIssueData, - onFailure: item.onFailure, - }); -}); - -pullQueue.pop(function (item, next) { - if (typeof item === "function") { - item(); - return next(); - } - processPullItem(item.response, next, { - parse: gitManager.parse, - updateAllPullData: dbManager.updateAllPullData, - onFailure: item.onFailure, - }); -}); diff --git a/test/pacer.test.js b/test/pacer.test.js new file mode 100644 index 00000000..6077151e --- /dev/null +++ b/test/pacer.test.js @@ -0,0 +1,131 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { computePace, createPacer, noopPacer } from "../lib/pacer.js"; + +const NOW = 1_700_000_000_000; // fixed clock (ms); reset values are NOW-relative +const RESERVE = 1000; +const resetIn = (seconds) => NOW / 1000 + seconds; + +// Before the first response is observed there's no quota view, so a bulk +// request must proceed immediately rather than stall forever. +test("computePace allows the request when quota is unknown", () => { + const { delayMs } = computePace({ + remaining: null, + reset: null, + used: null, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); +}); + +// A reset timestamp in the past means the window already rolled over and our +// remaining is stale — allow the request and let the next response refresh it. +test("computePace allows the request when the reset window is stale", () => { + const { delayMs } = computePace({ + remaining: 0, + reset: resetIn(-10), + used: 5000, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4000, + }); + assert.equal(delayMs, 0); +}); + +// At or below the reserve floor, bulk must pause until the quota refills so it +// stops competing with live traffic. +test("computePace pauses until reset when remaining is at or below the reserve", () => { + const { delayMs, nextSlot } = computePace({ + remaining: 500, + reset: resetIn(600), + used: 4500, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4490, + }); + assert.equal(delayMs, 600_000); + // The cursor jumps to the reset so post-refill pacing starts from the window edge. + assert.equal(nextSlot, NOW + 600_000); +}); + +// The real first-gate state: page one was fetched un-paced, so the after-hook +// `observe` has already populated the quota view (remaining/reset/used) — but no +// prior gate has set a `used` baseline. With nothing to diff against, the gate +// can't know how much was spent yet, so it proceeds immediately and just records +// the baseline (returned as lastUsed) for the next gate. +test("computePace does not delay the first gate, but records the usage baseline", () => { + const { delayMs, nextSlot, lastUsed } = computePace({ + remaining: 5000, + reset: resetIn(3600), + used: 100, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); + assert.equal(nextSlot, NOW); + assert.equal(lastUsed, 100); +}); + +// The cursor advances by the requests actually spent since the last gate, not +// by one per gate — so a multi-call item waits proportionally longer. +test("computePace advances the cursor by the requests spent since the last gate", () => { + const params = { + remaining: 1100, // budget 100 + reset: resetIn(100), // window 100_000ms → interval = 1000ms / request + used: undefined, // set per case + reserve: RESERVE, + now: NOW, + nextSlot: NOW, + lastUsed: 1000, + }; + + const oneCall = computePace({ ...params, used: 1001 }); + assert.equal(oneCall.delayMs, 1000); + + const eightCalls = computePace({ ...params, used: 1008 }); + assert.equal(eightCalls.delayMs, 8000); +}); + +// The no-op pacer the server/webhook path runs against: every call is total, so +// call sites never need null guards. gate resolves immediately, observe is inert. +test("noopPacer gate resolves and observe is harmless", async () => { + assert.doesNotThrow(() => noopPacer.observe({ "x-ratelimit-remaining": "5" })); + assert.doesNotThrow(() => noopPacer.observe(undefined)); + assert.equal(await noopPacer.gate(), undefined); +}); + +// What a real pacer does that the no-op can't: it spaces gates by the +// consumption it observes. The first gate (no spend yet) is free and sets the +// baseline; after a burst bumps `x-ratelimit-used`, the next gate must wait out +// the even-spread interval. Timing the actual delay also exercises the wrapper +// glue the pure computePace tests don't reach: header parsing, the used-delta +// cursor across calls, and Promise.delay. Only the lower bound is asserted — +// timers fire late, never early, so this can't flake. +test("createPacer spaces consecutive gates by observed consumption", async () => { + const pacer = createPacer({ reserve: 10 }); + // Window ~1–2s out, budget of 10 spendable requests → ~100–200ms per request. + const reset = Math.ceil(Date.now() / 1000) + 2; + const headers = { + "x-ratelimit-remaining": "20", + "x-ratelimit-reset": String(reset), + "x-ratelimit-used": "100", + }; + + pacer.observe(headers); + await pacer.gate(); // first gate sets the baseline; no spend yet + + pacer.observe({ ...headers, "x-ratelimit-used": "101" }); // one request spent + const beforeSecond = Date.now(); + await pacer.gate(); + assert.ok( + Date.now() - beforeSecond >= 80, + "second gate should pace out the request spent since the last gate" + ); +}); diff --git a/test/refresh-queue.test.js b/test/refresh-queue.test.js index a181c55d..aaef7018 100644 --- a/test/refresh-queue.test.js +++ b/test/refresh-queue.test.js @@ -1,6 +1,10 @@ import { test } from "node:test"; import assert from "node:assert/strict"; -import { processIssueItem, processPullItem } from "../lib/refresh.js"; +import { + processIssueItem, + processPullItem, + makeQueueConsumer, +} from "../lib/refresh.js"; // A transient failure parsing one item must not abort the sweep: the handler // logs, records the failure via the injected onFailure, and still calls next() @@ -42,3 +46,42 @@ test("processPullItem records the failure and continues past a failing parse", a assert.equal(nextCalled, 1); assert.deepEqual(failures, [{ repo: "test/repo-a", number: 9 }]); }); + +// Bulk pacing lives at the consumer: a work item awaits the pacer's slot before +// processing (the drain is where the API call spends quota), while the bare +// batch-done sentinel runs straight through, never gated. The item's onFailure +// is merged into the injected deps so it stays scoped to the run. +test("makeQueueConsumer gates a work item but never the batch-done sentinel", async () => { + let gateCalls = 0; + const pacer = { + gate: () => { + gateCalls++; + return Promise.resolve(); + }, + }; + const processed = []; + const consume = makeQueueConsumer( + pacer, + (response, next, deps) => { + processed.push({ response, deps }); + next(); + }, + { parse: "injected-dep" } + ); + + let nextCalled = 0; + const onFailure = () => {}; + await consume({ response: { number: 1 }, onFailure }, () => nextCalled++); + assert.equal(gateCalls, 1); + assert.equal(nextCalled, 1); + assert.deepEqual(processed[0].response, { number: 1 }); + assert.equal(processed[0].deps.parse, "injected-dep"); + assert.equal(processed[0].deps.onFailure, onFailure); + + let sentinelRan = 0; + await consume(() => sentinelRan++, () => nextCalled++); + assert.equal(sentinelRan, 1); + assert.equal(gateCalls, 1); // sentinel was not gated + assert.equal(nextCalled, 2); + assert.equal(processed.length, 1); // sentinel did not reach processItem +});