From b4ca2de2ad9e36695bdfd319e0d40bb42ef8acd6 Mon Sep 17 00:00:00 2001 From: Shawn Tice Date: Mon, 22 Jun 2026 17:37:18 -0500 Subject: [PATCH 1/3] Pace bulk refresh against the GitHub API quota MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The refresh-* backfills and the startup open-pulls refresh share one GitHub token — and one hourly quota — with the live webhook/socket refreshes that keep Pulldasher current. A bulk run is a tight loop, so it drains the quota to zero and starves normal operation, taking Pulldasher down. The throttle plugin only reacts once the quota is already exhausted, which is too late to protect live traffic. Add a proactive pacer (lib/pacer.js). It observes x-ratelimit-* on every response (live calls included, so its view stays fresh) and gates only the bulk path: requests are spread across the rate-limit reset window, and bulk pauses entirely below a configurable reserve floor (config.github.bulkReserve, default 1000). The token-bucket cursor advances by the x-ratelimit-used delta between gates, so a pull's multi-call fan-out — and any live-traffic spike — pushes bulk's next slot out; bulk yields automatically on both signals. Gate at enqueue (pushAllOnQueue) and between fetch pages (pacedPaginate), never inside the shared queue consumer. The wait happens off the queue, so a paced or floor-paused backfill can't park the single consumer that webhook refreshes also depend on — live traffic keeps draining throughout. Co-Authored-By: Claude Opus 4.8 (1M context) --- config.example.js | 5 ++ lib/git-manager.js | 36 ++++++++++-- lib/pacer.js | 138 +++++++++++++++++++++++++++++++++++++++++++++ lib/refresh.js | 23 ++++++-- test/pacer.test.js | 94 ++++++++++++++++++++++++++++++ 5 files changed, 284 insertions(+), 12 deletions(-) create mode 100644 lib/pacer.js create mode 100644 test/pacer.test.js diff --git a/config.example.js b/config.example.js index 97789c12..de99381a 100644 --- a/config.example.js +++ b/config.example.js @@ -50,6 +50,11 @@ module.exports = { callbackURL: "http://localhost:" + port + "/auth/github/callback", // An API token for the backend to make API requests with. token: "oauth api token for server-side api calls", + // How much of the token's hourly GitHub API quota to reserve for live + // webhook/socket traffic. Bulk backfills (bin/refresh-*) pace themselves to + // stay above this floor and pause when remaining quota hits it, so a large + // backfill never starves normal operation. Defaults to 1000 if omitted. + bulkReserve: 1000, // This will need to be the same secret you use on the Webhooks page for // the repo Pulldasher is going to monitor. hook_secret: diff --git a/lib/git-manager.js b/lib/git-manager.js index 99d61223..60e9e934 100644 --- a/lib/git-manager.js +++ b/lib/git-manager.js @@ -14,6 +14,7 @@ import Label from "../models/label.js"; import Status from "../models/status.js"; import Signature from "../models/signature.js"; import getLogin from "./get-user-login.js"; +import pacer from "./pacer.js"; const MyOctokit = Octokit.plugin(throttling, retry); const gitDebug = debug("pulldasher:github"); @@ -61,6 +62,8 @@ const github = new MyOctokit({ // (5xx / network) — the throttle plugin already logs 4xx rate limits — and // rethrow untouched so plugin-retry's retry logic is unaffected. github.hook.error("request", (error, options) => { + // A failed response still reports the current quota; feed it to the pacer. + pacer.observe(error.response && error.response.headers); const status = error.status; if (status === undefined || status >= 500) { gitDebug( @@ -74,6 +77,13 @@ github.hook.error("request", (error, options) => { throw error; }); +// Track the shared quota from every response's x-ratelimit-* headers so the +// pacer (which gates only bulk requests) always has a current view — live +// webhook/socket calls update it too, which is how bulk yields to live traffic. +github.hook.after("request", (response) => { + pacer.observe(response.headers); +}); + const githubRest = github.rest; export default { @@ -100,7 +110,7 @@ export default { */ getOpenPulls: function (repo) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "open" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo)), "Getting open pulls in repo %s", repo ); @@ -113,7 +123,7 @@ export default { */ getAllPulls: function (repo) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "all" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo)), "Getting all pulls in repo %s", repo ); @@ -134,8 +144,7 @@ export default { getOpenIssues: function (repo) { const searchParams = params({ state: "open" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting open issues in repo %s", @@ -151,8 +160,7 @@ export default { getAllIssues: function (repo) { const searchParams = params({ state: "all" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting all issues in repo %s", @@ -636,3 +644,19 @@ function logErrors(promise, ...messageAndArgs) { throw err; }); } + +/** + * Paginate a bulk listing, pacing between pages so a full-repo list can't burst + * through the quota. Page one fetches un-paced and seeds the pacer's quota view + * (via the after-hook observe); each `gate()` then spaces the fetch of the next + * page. Bulk-only: the live webhook/socket path never lists whole repos, so + * gating every page is safe. + */ +async function pacedPaginate(route, parameters) { + const items = []; + for await (const response of github.paginate.iterator(route, parameters)) { + items.push(...response.data); + await pacer.gate(); + } + return items; +} diff --git a/lib/pacer.js b/lib/pacer.js new file mode 100644 index 00000000..c8f25d13 --- /dev/null +++ b/lib/pacer.js @@ -0,0 +1,138 @@ +import config from "./config-loader.js"; +import debug from "./debug.js"; +import Promise from "bluebird"; + +const pacerDebug = debug("pulldasher:pacer"); + +// Reserve a slice of the hourly quota for live (webhook/socket) traffic; bulk +// backfills pause rather than spend below it. See config.github.bulkReserve. +const reserve = config.github.bulkReserve ?? 1000; + +// Only log a paced delay once it's long enough to explain a visibly slow +// backfill — short spacing isn't worth the noise. +const LOG_DELAY_THRESHOLD_MS = 5000; + +/** + * Proactively paces bulk GitHub requests so a backfill yields to live traffic + * instead of draining the shared token's quota to zero. A single process-wide + * instance: rate state is global to the token, so every bulk call gates against + * the same view, fed by `observe` on every response (live calls included). + * + * `observe(headers)` — record the latest quota from any response (free). + * `gate()` — await this before pushing a *bulk* item; it spreads bulk work + * across the reset window and blocks entirely while below the reserve floor. + * + * Pacing is calibrated by actual consumption, not by gate count: each gate + * advances a token-bucket cursor by the number of requests spent since the last + * gate (`x-ratelimit-used` delta), so a pull whose processing fans out to a + * dozen calls correctly waits a dozen intervals — and a live-traffic spike, + * which also bumps `used`, pushes the cursor out and makes bulk yield. + */ +const pacer = { + remaining: null, + reset: null, + used: null, + lastUsed: null, + nextSlot: 0, + + observe: function (headers) { + if (!headers) { + return; + } + const remaining = headers["x-ratelimit-remaining"]; + const reset = headers["x-ratelimit-reset"]; + const used = headers["x-ratelimit-used"]; + if (remaining !== undefined) { + this.remaining = Number(remaining); + } + if (reset !== undefined) { + this.reset = Number(reset); + } + if (used !== undefined) { + this.used = Number(used); + } + }, + + gate: function () { + const now = Date.now(); + const { delayMs, nextSlot, lastUsed } = computePace({ + remaining: this.remaining, + reset: this.reset, + used: this.used, + reserve: reserve, + now: now, + nextSlot: this.nextSlot, + lastUsed: this.lastUsed, + }); + this.nextSlot = nextSlot; + this.lastUsed = lastUsed; + + // The floor-pause is the only positive delay that isn't even-spread pacing. + if (delayMs > 0 && this.remaining != null && this.remaining <= reserve) { + pacerDebug( + "bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset", + this.remaining, + reserve, + Math.round(delayMs / 1000) + ); + } else if (delayMs >= LOG_DELAY_THRESHOLD_MS) { + pacerDebug( + "pacing bulk: waiting %sms (remaining %s)", + Math.round(delayMs), + this.remaining + ); + } + + return Promise.delay(delayMs); + }, +}; + +export default pacer; + +/** + * Pure pacing decision. Given the latest quota view and a token-bucket cursor, + * returns how long the next bulk request should wait, the advanced cursor, and + * the `used` baseline to diff against next time. + * + * - unknown quota, or a `reset` already in the past (stale window) → allow + * immediately; the next response refreshes the view. + * - `remaining ≤ reserve` → pause until `reset` (let the window refill). + * - otherwise → even-spread: the spendable budget (`remaining − reserve`) + * divided across the time left in the window gives the per-request interval. + * Advance the cursor by `spent × interval`, where `spent` is the requests + * consumed since the last gate (`used − lastUsed`), so the spacing tracks + * real consumption — a multi-call item or a live spike pushes the next slot + * further out. A shrinking `remaining` also widens the interval, so bulk + * yields on both signals. + */ +export function computePace({ + remaining, + reset, + used, + reserve, + now, + nextSlot, + lastUsed, +}) { + const baseline = used ?? lastUsed; + // No quota view, or a window that already rolled over (our `remaining` is + // stale): proceed now and let the next response refresh the view. + const allowNow = { delayMs: 0, nextSlot: now, lastUsed: baseline }; + if (remaining == null || reset == null) { + return allowNow; + } + const resetMs = reset * 1000; + if (resetMs <= now) { + return allowNow; + } + if (remaining <= reserve) { + return { delayMs: resetMs - now, nextSlot: resetMs, lastUsed: baseline }; + } + const interval = (resetMs - now) / (remaining - reserve); + // Clamp at 0: across a window rollover the server resets `used` to ~0 while + // our `lastUsed` still holds the pre-reset high, so the delta can go negative. + const spent = + used == null || lastUsed == null ? 0 : Math.max(0, used - lastUsed); + const slot = Math.max(now, nextSlot + spent * interval); + return { delayMs: slot - now, nextSlot: slot, lastUsed: baseline }; +} diff --git a/lib/refresh.js b/lib/refresh.js index 16b64b90..9d19faee 100644 --- a/lib/refresh.js +++ b/lib/refresh.js @@ -4,6 +4,7 @@ import utils from "./utils.js"; import NotifyQueue from "notify-queue"; import debug from "./debug.js"; import Promise from "bluebird"; +import pacer from "./pacer.js"; // Queues for making all refreshes be synchronous, one at a time. var issueQueue = new NotifyQueue(); @@ -61,7 +62,8 @@ export default { * and return a promise that is fulfilled when the item is fully processed. * * Single-item refreshes (webhooks, socket) have no end-of-run report, so no - * failure collector is attached. + * failure collector is attached — and they aren't quota-paced, so live traffic + * is never delayed. */ function pushOnQueue(queue) { return function (githubResponse) { @@ -81,13 +83,20 @@ function pushOnQueue(queue) { * Each item carries the run's `onFailure` collector (null for single-item * webhook/socket refreshes), so a failed parse is recorded for that run's * end-of-run report. + * + * This is the bulk path, so it paces enqueueing against the GitHub quota: + * `pacer.gate()` waits for each item's slot *before* pushing it, keeping the + * shared queue shallow. The wait happens here, off the queue — so a paced or + * floor-paused backfill never blocks the single consumer, and live + * webhook/socket refreshes keep draining promptly. */ function pushAllOnQueue(queue, onFailure) { - return function (githubResponses) { + return async function (githubResponses) { + for (const githubResponse of githubResponses) { + await pacer.gate(); + queue.push({ response: githubResponse, onFailure: onFailure }); + } return new Promise(function (resolve) { - githubResponses.forEach(function (githubResponse) { - queue.push({ response: githubResponse, onFailure: onFailure }); - }); queue.push(resolve); }); }; @@ -212,7 +221,9 @@ export function processPullItem( // queue or the fixed pop deps, so it stays scoped to the run that enqueued it — // a bulk run collects its own failures, while webhook/socket refreshes (which // push onFailure null) don't accumulate. Unwrap so the process functions see -// one response plus their injected collaborators. +// one response plus their injected collaborators. (Quota pacing happens at +// enqueue time, in pushAllOnQueue, not here — so a paused backfill can't block +// the single consumer that webhook refreshes also depend on.) issueQueue.pop(function (item, next) { if (typeof item === "function") { item(); diff --git a/test/pacer.test.js b/test/pacer.test.js new file mode 100644 index 00000000..f9e449c2 --- /dev/null +++ b/test/pacer.test.js @@ -0,0 +1,94 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { computePace } from "../lib/pacer.js"; + +const NOW = 1_700_000_000_000; // fixed clock (ms); reset values are NOW-relative +const RESERVE = 1000; +const resetIn = (seconds) => NOW / 1000 + seconds; + +// Before the first response is observed there's no quota view, so a bulk +// request must proceed immediately rather than stall forever. +test("computePace allows the request when quota is unknown", () => { + const { delayMs } = computePace({ + remaining: null, + reset: null, + used: null, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); +}); + +// A reset timestamp in the past means the window already rolled over and our +// remaining is stale — allow the request and let the next response refresh it. +test("computePace allows the request when the reset window is stale", () => { + const { delayMs } = computePace({ + remaining: 0, + reset: resetIn(-10), + used: 5000, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4000, + }); + assert.equal(delayMs, 0); +}); + +// At or below the reserve floor, bulk must pause until the quota refills so it +// stops competing with live traffic. +test("computePace pauses until reset when remaining is at or below the reserve", () => { + const { delayMs, nextSlot } = computePace({ + remaining: 500, + reset: resetIn(600), + used: 4500, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4490, + }); + assert.equal(delayMs, 600_000); + // The cursor jumps to the reset so post-refill pacing starts from the window edge. + assert.equal(nextSlot, NOW + 600_000); +}); + +// The real first-gate state: page one was fetched un-paced, so the after-hook +// `observe` has already populated the quota view (remaining/reset/used) — but no +// prior gate has set a `used` baseline. With nothing to diff against, the gate +// can't know how much was spent yet, so it proceeds immediately and just records +// the baseline (returned as lastUsed) for the next gate. +test("computePace does not delay the first gate, but records the usage baseline", () => { + const { delayMs, nextSlot, lastUsed } = computePace({ + remaining: 5000, + reset: resetIn(3600), + used: 100, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); + assert.equal(nextSlot, NOW); + assert.equal(lastUsed, 100); +}); + +// The cursor advances by the requests actually spent since the last gate, not +// by one per gate — so a multi-call item waits proportionally longer. +test("computePace advances the cursor by the requests spent since the last gate", () => { + const params = { + remaining: 1100, // budget 100 + reset: resetIn(100), // window 100_000ms → interval = 1000ms / request + used: undefined, // set per case + reserve: RESERVE, + now: NOW, + nextSlot: NOW, + lastUsed: 1000, + }; + + const oneCall = computePace({ ...params, used: 1001 }); + assert.equal(oneCall.delayMs, 1000); + + const eightCalls = computePace({ ...params, used: 1008 }); + assert.equal(eightCalls.delayMs, 8000); +}); From 67ca46f54a64ce1839a596b11880168d04d39e84 Mon Sep 17 00:00:00 2001 From: Shawn Tice Date: Tue, 30 Jun 2026 11:56:14 -0500 Subject: [PATCH 2/3] Pace bulk backfills at the queue consumer, not at enqueue The first pass gated at enqueue (in `pushAllOnQueue`) and ran the server's startup `openPulls()` refresh through that same paced path. Both share the one queue consumer, so a floor-pause (waiting out the reserve once quota drops below the floor) could park the consumer that live webhook and socket refreshes also drain through, taking pulldasher down for the duration. Move the gate onto the consumer drain instead, where each pull's `parse()` fans out to the dozen-plus API calls that are the bulk of a backfill's spend. Only the CLI backfill bins install a real pacer (`createPacedRefresh`); the server, webhooks, and socket refreshes run an unpaced no-op pacer, so the gate never fronts live traffic. In a bin process the consumer drains nothing but bulk, so parking it on a paced slot starves nothing. Drop the mutable module-global pacer from `git-manager` in favor of explicit injection. `observeRateLimit(pacer)` installs a one-time quota observer on the shared Octokit client so the deep `parse()` fan-out (which the caller can't see) still feeds the pacer; the list functions take an optional pacer to pace pagination. `pacedPaginate` gates only between pages, never after the last. Co-Authored-By: Claude Opus 4.8 (1M context) --- bin/refresh-all-issues | 4 +- bin/refresh-all-pulls | 4 +- bin/refresh-open-issues | 4 +- bin/refresh-open-pulls | 4 +- lib/git-manager.js | 79 +++++++++---- lib/pacer.js | 137 +++++++++++----------- lib/refresh.js | 225 ++++++++++++++++++++++--------------- test/pacer.test.js | 39 ++++++- test/refresh-queue.test.js | 45 +++++++- 9 files changed, 356 insertions(+), 185 deletions(-) diff --git a/bin/refresh-all-issues b/bin/refresh-all-issues index 7123fa26..7f591c15 100755 --- a/bin/refresh-all-issues +++ b/bin/refresh-all-issues @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.allIssues(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/bin/refresh-all-pulls b/bin/refresh-all-pulls index ebaa025e..e042fb30 100755 --- a/bin/refresh-all-pulls +++ b/bin/refresh-all-pulls @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.allPulls(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/bin/refresh-open-issues b/bin/refresh-open-issues index 86f56d4a..29d1ef1e 100755 --- a/bin/refresh-open-issues +++ b/bin/refresh-open-issues @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.openIssues(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/bin/refresh-open-pulls b/bin/refresh-open-pulls index fef36e78..602ee1ef 100755 --- a/bin/refresh-open-pulls +++ b/bin/refresh-open-pulls @@ -1,11 +1,13 @@ #!/usr/bin/env node import { default as debugInit } from '../lib/debug.js'; -import refresh, { reportFailures } from '../lib/refresh.js'; +import { createPacedRefresh, reportFailures } from '../lib/refresh.js'; import selectReposArg from '../lib/select-repos-arg.js'; import db from '../lib/db.js'; debugInit('pulldasher:refresh*'); +const refresh = createPacedRefresh(); + refresh.openPulls(selectReposArg()) .done(function (result) { reportFailures(result); diff --git a/lib/git-manager.js b/lib/git-manager.js index 935f40c8..d142f6bd 100644 --- a/lib/git-manager.js +++ b/lib/git-manager.js @@ -14,7 +14,7 @@ import Label from "../models/label.js"; import Status from "../models/status.js"; import Signature from "../models/signature.js"; import getLogin from "./get-user-login.js"; -import pacer from "./pacer.js"; +import { noopPacer } from "./pacer.js"; const MyOctokit = Octokit.plugin(throttling, retry); const gitDebug = debug("pulldasher:github"); @@ -62,8 +62,6 @@ const github = new MyOctokit({ // (5xx / network) — the throttle plugin already logs 4xx rate limits — and // rethrow untouched so plugin-retry's retry logic is unaffected. github.hook.error("request", (error, options) => { - // A failed response still reports the current quota; feed it to the pacer. - pacer.observe(error.response && error.response.headers); const status = error.status; if (status === undefined || status >= 500) { gitDebug( @@ -77,12 +75,33 @@ github.hook.error("request", (error, options) => { throw error; }); -// Track the shared quota from every response's x-ratelimit-* headers so the -// pacer (which gates only bulk requests) always has a current view — live -// webhook/socket calls update it too, which is how bulk yields to live traffic. -github.hook.after("request", (response) => { - pacer.observe(response.headers); -}); +/** + * Feed a pacer the quota headers from every GitHub response, so it can pace bulk + * work against real consumption — crucially the per-pull/issue fan-out deep in + * `parse`/`parseIssue`, which is the bulk of a backfill's spend and which the + * caller can't see to observe itself. Only the CLI backfill bins call this (once + * at startup, with their pacer); the server never installs an observer, so its + * webhook/socket responses aren't tracked and nothing is paced. + * + * Installs the hooks at most once per process: they close over the first + * pacer, and a second call would only stack duplicate observers on the shared + * client (double-counting every response). The bins call this exactly once. + */ +let rateLimitObserved = false; +export function observeRateLimit(pacer) { + if (rateLimitObserved) { + return; + } + rateLimitObserved = true; + github.hook.after("request", (response) => { + pacer.observe(response.headers); + }); + github.hook.error("request", (error) => { + // A failed response still reports the current quota. + pacer.observe(error.response && error.response.headers); + throw error; + }); +} const githubRest = github.rest; @@ -108,9 +127,9 @@ export default { * * Returns a promise which resolves to an array of all open pull requests */ - getOpenPulls: function (repo) { + getOpenPulls: function (repo, pacer = noopPacer) { return logErrors( - pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo), pacer), "Getting open pulls in repo %s", repo ); @@ -121,9 +140,9 @@ export default { * * Returns a promise which resolves to an array of all pull requests */ - getAllPulls: function (repo) { + getAllPulls: function (repo, pacer = noopPacer) { return logErrors( - pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo), pacer), "Getting all pulls in repo %s", repo ); @@ -141,10 +160,10 @@ export default { * * Returns a promise which resolves to an array of all open issues */ - getOpenIssues: function (repo) { + getOpenIssues: function (repo, pacer = noopPacer) { const searchParams = params({ state: "open" }, repo); return logErrors( - pacedPaginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams, pacer) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting open issues in repo %s", @@ -157,10 +176,10 @@ export default { * * Returns a promise which resolves to an array of all issues */ - getAllIssues: function (repo) { + getAllIssues: function (repo, pacer = noopPacer) { const searchParams = params({ state: "all" }, repo); return logErrors( - pacedPaginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams, pacer) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting all issues in repo %s", @@ -657,15 +676,31 @@ function logErrors(promise, ...messageAndArgs) { /** * Paginate a bulk listing, pacing between pages so a full-repo list can't burst * through the quota. Page one fetches un-paced and seeds the pacer's quota view - * (via the after-hook observe); each `gate()` then spaces the fetch of the next - * page. Bulk-only: the live webhook/socket path never lists whole repos, so - * gating every page is safe. + * (the bin's observeRateLimit hook records each response); each `gate()` then + * spaces the fetch of the next page. The gate fronts a fetch, so it only runs + * when another page follows — gating after the last page would delay the result + * (or, below the reserve floor, pause for a full reset window) with no fetch + * left to pace. The `pacer` is injected by the caller — the CLI backfill bins + * pass a real one; everything else takes the default no-op, so the pages fetch + * unpaced (the server's startup open-pulls listing, and the live webhook/socket + * path, which never lists whole repos anyway). */ -async function pacedPaginate(route, parameters) { +async function pacedPaginate(route, parameters, pacer = noopPacer) { const items = []; for await (const response of github.paginate.iterator(route, parameters)) { items.push(...response.data); - await pacer.gate(); + if (hasNextPage(response)) { + await pacer.gate(); + } } return items; } + +/** + * Whether a paginated response links to a further page — the same `rel="next"` + * Link-header signal Octokit's own iterator uses to decide whether to continue. + */ +function hasNextPage(response) { + const link = response.headers && response.headers.link; + return Boolean(link) && link.includes('rel="next"'); +} diff --git a/lib/pacer.js b/lib/pacer.js index c8f25d13..9cbfb61c 100644 --- a/lib/pacer.js +++ b/lib/pacer.js @@ -4,23 +4,24 @@ import Promise from "bluebird"; const pacerDebug = debug("pulldasher:pacer"); -// Reserve a slice of the hourly quota for live (webhook/socket) traffic; bulk -// backfills pause rather than spend below it. See config.github.bulkReserve. -const reserve = config.github.bulkReserve ?? 1000; - // Only log a paced delay once it's long enough to explain a visibly slow // backfill — short spacing isn't worth the noise. const LOG_DELAY_THRESHOLD_MS = 5000; /** - * Proactively paces bulk GitHub requests so a backfill yields to live traffic - * instead of draining the shared token's quota to zero. A single process-wide - * instance: rate state is global to the token, so every bulk call gates against - * the same view, fed by `observe` on every response (live calls included). + * A pacer proactively paces bulk GitHub requests so a backfill yields to live + * traffic instead of draining the shared token's quota to zero. It is a + * per-process instance the entry point constructs and installs: only the CLI + * backfill bins build a real one (`createPacer`); the server installs nothing + * and runs against `noopPacer`, so live webhook/socket refreshes are never + * delayed. Rate state is global to the token, so within a process every bulk + * call gates against the same view, fed by `observe` on every response (live + * calls included). * * `observe(headers)` — record the latest quota from any response (free). - * `gate()` — await this before pushing a *bulk* item; it spreads bulk work - * across the reset window and blocks entirely while below the reserve floor. + * `gate()` — await this before each *bulk* unit of work (a consumer drain or a + * pagination page); it spreads bulk work across the reset window and blocks + * entirely while below the reserve floor. * * Pacing is calibrated by actual consumption, not by gate count: each gate * advances a token-bucket cursor by the number of requests spent since the last @@ -28,67 +29,75 @@ const LOG_DELAY_THRESHOLD_MS = 5000; * dozen calls correctly waits a dozen intervals — and a live-traffic spike, * which also bumps `used`, pushes the cursor out and makes bulk yield. */ -const pacer = { - remaining: null, - reset: null, - used: null, - lastUsed: null, - nextSlot: 0, +export function createPacer({ reserve = config.github.bulkReserve ?? 1000 } = {}) { + let remaining = null; + let reset = null; + let used = null; + let lastUsed = null; + let nextSlot = 0; - observe: function (headers) { - if (!headers) { - return; - } - const remaining = headers["x-ratelimit-remaining"]; - const reset = headers["x-ratelimit-reset"]; - const used = headers["x-ratelimit-used"]; - if (remaining !== undefined) { - this.remaining = Number(remaining); - } - if (reset !== undefined) { - this.reset = Number(reset); - } - if (used !== undefined) { - this.used = Number(used); - } - }, + return { + observe: function (headers) { + if (!headers) { + return; + } + if (headers["x-ratelimit-remaining"] !== undefined) { + remaining = Number(headers["x-ratelimit-remaining"]); + } + if (headers["x-ratelimit-reset"] !== undefined) { + reset = Number(headers["x-ratelimit-reset"]); + } + if (headers["x-ratelimit-used"] !== undefined) { + used = Number(headers["x-ratelimit-used"]); + } + }, - gate: function () { - const now = Date.now(); - const { delayMs, nextSlot, lastUsed } = computePace({ - remaining: this.remaining, - reset: this.reset, - used: this.used, - reserve: reserve, - now: now, - nextSlot: this.nextSlot, - lastUsed: this.lastUsed, - }); - this.nextSlot = nextSlot; - this.lastUsed = lastUsed; - - // The floor-pause is the only positive delay that isn't even-spread pacing. - if (delayMs > 0 && this.remaining != null && this.remaining <= reserve) { - pacerDebug( - "bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset", - this.remaining, + gate: function () { + const now = Date.now(); + const result = computePace({ + remaining, + reset, + used, reserve, - Math.round(delayMs / 1000) - ); - } else if (delayMs >= LOG_DELAY_THRESHOLD_MS) { - pacerDebug( - "pacing bulk: waiting %sms (remaining %s)", - Math.round(delayMs), - this.remaining - ); - } + now, + nextSlot, + lastUsed, + }); + nextSlot = result.nextSlot; + lastUsed = result.lastUsed; + const delayMs = result.delayMs; + + // The floor-pause is the only positive delay that isn't even-spread pacing. + if (delayMs > 0 && remaining != null && remaining <= reserve) { + pacerDebug( + "bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset", + remaining, + reserve, + Math.round(delayMs / 1000) + ); + } else if (delayMs >= LOG_DELAY_THRESHOLD_MS) { + pacerDebug( + "pacing bulk: waiting %sms (remaining %s)", + Math.round(delayMs), + remaining + ); + } - return Promise.delay(delayMs); + return Promise.delay(delayMs); + }, + }; +} + +// The Null Object pacer the server and webhook/socket path run against: every +// call is total, so call sites stay unconditional (no null guards on the hot +// Octokit hooks or the queue consumer). +export const noopPacer = { + observe: function () {}, + gate: function () { + return Promise.resolve(); }, }; -export default pacer; - /** * Pure pacing decision. Given the latest quota view and a token-bucket cursor, * returns how long the next bulk request should wait, the advanced cursor, and diff --git a/lib/refresh.js b/lib/refresh.js index 9d19faee..3d256cf1 100644 --- a/lib/refresh.js +++ b/lib/refresh.js @@ -1,60 +1,137 @@ -import gitManager from "./git-manager.js"; +import gitManager, { observeRateLimit } from "./git-manager.js"; import dbManager from "./db-manager.js"; import utils from "./utils.js"; import NotifyQueue from "notify-queue"; import debug from "./debug.js"; import Promise from "bluebird"; -import pacer from "./pacer.js"; - -// Queues for making all refreshes be synchronous, one at a time. -var issueQueue = new NotifyQueue(); -var pullQueue = new NotifyQueue(); +import { createPacer, noopPacer } from "./pacer.js"; const refreshDebug = debug("pulldasher:refresh"); -export default { - /////// Issues ///////// - - issue: function refreshIssue(repo, number) { - refreshDebug("refresh issue %s", number); - return gitManager.getIssue(repo, number).then(pushOnQueue(issueQueue)); - }, - - allIssues: function refreshAllIssues(repos) { - refreshDebug("refresh all issues"); - return utils - .forEachRepo(gitManager.getAllIssues, { repos: repos }) - .then(drainThrough(issueQueue)); - }, - - openIssues: function refreshOpenIssues(repos) { - refreshDebug("refresh all open issues"); - return utils - .forEachRepo(gitManager.getOpenIssues, { repos: repos }) - .then(drainThrough(issueQueue)); - }, - - /////// Pulls ///////// - - pull: function refreshPull(repo, number) { - refreshDebug("refresh pull %s", number); - return gitManager.getPull(repo, number).then(pushOnQueue(pullQueue)); - }, - - allPulls: function refreshAllPulls(repos) { - refreshDebug("refresh all pull"); - return utils - .forEachRepo(gitManager.getAllPulls, { repos: repos }) - .then(drainThrough(pullQueue)); - }, - - openPulls: function refreshOpenPulls(repos) { - refreshDebug("refresh all open pulls"); - return utils - .forEachRepo(gitManager.getOpenPulls, { repos: repos }) - .then(drainThrough(pullQueue)); - }, -}; +/** + * Build a refresh API over its own pair of serial queues. The injected `pacer` + * gates the *consumer* — the drain, where the API call actually spends quota + * and reports the rate-limit headers — before each bulk work item. It defaults + * to a no-op, so the server (webhook/socket refreshes and the startup open-pulls + * refresh) runs unpaced. Only the CLI backfill bins pass a real pacer, and in + * that process the consumer drains nothing but bulk, so parking it on a paced or + * floor-paused slot starves nothing. Single-item webhook/socket refreshes push + * straight through; the gate only fronts work items, never the batch-done + * sentinel. + */ +export function createRefresh({ pacer = noopPacer } = {}) { + // Queues for making all refreshes be synchronous, one at a time. + const issueQueue = new NotifyQueue(); + const pullQueue = new NotifyQueue(); + + issueQueue.pop( + makeQueueConsumer(pacer, processIssueItem, { + parseIssue: gitManager.parseIssue, + updateAllIssueData: dbManager.updateAllIssueData, + }) + ); + + pullQueue.pop( + makeQueueConsumer(pacer, processPullItem, { + parse: gitManager.parse, + updateAllPullData: dbManager.updateAllPullData, + }) + ); + + return { + /////// Issues ///////// + + issue: function refreshIssue(repo, number) { + refreshDebug("refresh issue %s", number); + return gitManager.getIssue(repo, number).then(pushOnQueue(issueQueue)); + }, + + allIssues: function refreshAllIssues(repos) { + refreshDebug("refresh all issues"); + return utils + .forEachRepo((repo) => gitManager.getAllIssues(repo, pacer), { + repos: repos, + }) + .then(drainThrough(issueQueue)); + }, + + openIssues: function refreshOpenIssues(repos) { + refreshDebug("refresh all open issues"); + return utils + .forEachRepo((repo) => gitManager.getOpenIssues(repo, pacer), { + repos: repos, + }) + .then(drainThrough(issueQueue)); + }, + + /////// Pulls ///////// + + pull: function refreshPull(repo, number) { + refreshDebug("refresh pull %s", number); + return gitManager.getPull(repo, number).then(pushOnQueue(pullQueue)); + }, + + allPulls: function refreshAllPulls(repos) { + refreshDebug("refresh all pull"); + return utils + .forEachRepo((repo) => gitManager.getAllPulls(repo, pacer), { + repos: repos, + }) + .then(drainThrough(pullQueue)); + }, + + openPulls: function refreshOpenPulls(repos) { + refreshDebug("refresh all open pulls"); + return utils + .forEachRepo((repo) => gitManager.getOpenPulls(repo, pacer), { + repos: repos, + }) + .then(drainThrough(pullQueue)); + }, + }; +} + +// The default instance is unpaced (no-op pacer): the server imports this for +// its startup open-pulls refresh and webhook/socket refreshes. The CLI backfill +// bins build their own paced instance via createPacedRefresh. +export default createRefresh(); + +/** + * Build the refresh API a CLI backfill bin runs against. One per-process pacer + * is installed as a rate-limit observer on the GitHub client (so every + * response's quota headers feed it) and threaded into the queue consumers, so + * the whole bulk drain — the per-item fan-out, where the spend actually lands — + * and the list pagination pace against the shared token's quota, reserving + * headroom for the live server's webhook/socket refreshes (a separate process, + * same token). + */ +export function createPacedRefresh() { + const pacer = createPacer(); + observeRateLimit(pacer); + return createRefresh({ pacer }); +} + +/** + * Build a queue pop consumer. The queue holds two kinds of entries: work items + * `{ response, onFailure }` and a bare function batch-done sentinel (pushed + * after a batch). A sentinel just runs and advances — never gated. For a work + * item, await the pacer's slot *first* — the drain is where the API call spends + * quota — then hand the unwrapped response, plus the item's own onFailure + * collector, to the process function. onFailure rides on the item (not the + * fixed deps) so it stays scoped to the run that enqueued it: a bulk run + * collects its own failures, webhook/socket refreshes push null and don't + * accumulate. + */ +export function makeQueueConsumer(pacer, processItem, deps) { + return async function (item, next) { + if (typeof item === "function") { + item(); + return next(); + } + await pacer.gate(); + processItem(item.response, next, { ...deps, onFailure: item.onFailure }); + }; +} /** * Returns a function that will: @@ -82,20 +159,15 @@ function pushOnQueue(queue) { * * Each item carries the run's `onFailure` collector (null for single-item * webhook/socket refreshes), so a failed parse is recorded for that run's - * end-of-run report. - * - * This is the bulk path, so it paces enqueueing against the GitHub quota: - * `pacer.gate()` waits for each item's slot *before* pushing it, keeping the - * shared queue shallow. The wait happens here, off the queue — so a paced or - * floor-paused backfill never blocks the single consumer, and live - * webhook/socket refreshes keep draining promptly. + * end-of-run report. Enqueueing is unpaced — quota pacing happens at the + * consumer (see createRefresh), so the queue can fill freely while the drain + * spaces the actual API spend. */ function pushAllOnQueue(queue, onFailure) { - return async function (githubResponses) { - for (const githubResponse of githubResponses) { - await pacer.gate(); + return function (githubResponses) { + githubResponses.forEach(function (githubResponse) { queue.push({ response: githubResponse, onFailure: onFailure }); - } + }); return new Promise(function (resolve) { queue.push(resolve); }); @@ -214,36 +286,3 @@ export function processPullItem( next(); }); } - -// The queue holds two kinds of entries: a batch-done sentinel (the bare resolve -// function pushed after a batch, see pushAllOnQueue) and work items -// `{ response, onFailure }`. onFailure rides on the item rather than on the -// queue or the fixed pop deps, so it stays scoped to the run that enqueued it — -// a bulk run collects its own failures, while webhook/socket refreshes (which -// push onFailure null) don't accumulate. Unwrap so the process functions see -// one response plus their injected collaborators. (Quota pacing happens at -// enqueue time, in pushAllOnQueue, not here — so a paused backfill can't block -// the single consumer that webhook refreshes also depend on.) -issueQueue.pop(function (item, next) { - if (typeof item === "function") { - item(); - return next(); - } - processIssueItem(item.response, next, { - parseIssue: gitManager.parseIssue, - updateAllIssueData: dbManager.updateAllIssueData, - onFailure: item.onFailure, - }); -}); - -pullQueue.pop(function (item, next) { - if (typeof item === "function") { - item(); - return next(); - } - processPullItem(item.response, next, { - parse: gitManager.parse, - updateAllPullData: dbManager.updateAllPullData, - onFailure: item.onFailure, - }); -}); diff --git a/test/pacer.test.js b/test/pacer.test.js index f9e449c2..6077151e 100644 --- a/test/pacer.test.js +++ b/test/pacer.test.js @@ -1,6 +1,6 @@ import { test } from "node:test"; import assert from "node:assert/strict"; -import { computePace } from "../lib/pacer.js"; +import { computePace, createPacer, noopPacer } from "../lib/pacer.js"; const NOW = 1_700_000_000_000; // fixed clock (ms); reset values are NOW-relative const RESERVE = 1000; @@ -92,3 +92,40 @@ test("computePace advances the cursor by the requests spent since the last gate" const eightCalls = computePace({ ...params, used: 1008 }); assert.equal(eightCalls.delayMs, 8000); }); + +// The no-op pacer the server/webhook path runs against: every call is total, so +// call sites never need null guards. gate resolves immediately, observe is inert. +test("noopPacer gate resolves and observe is harmless", async () => { + assert.doesNotThrow(() => noopPacer.observe({ "x-ratelimit-remaining": "5" })); + assert.doesNotThrow(() => noopPacer.observe(undefined)); + assert.equal(await noopPacer.gate(), undefined); +}); + +// What a real pacer does that the no-op can't: it spaces gates by the +// consumption it observes. The first gate (no spend yet) is free and sets the +// baseline; after a burst bumps `x-ratelimit-used`, the next gate must wait out +// the even-spread interval. Timing the actual delay also exercises the wrapper +// glue the pure computePace tests don't reach: header parsing, the used-delta +// cursor across calls, and Promise.delay. Only the lower bound is asserted — +// timers fire late, never early, so this can't flake. +test("createPacer spaces consecutive gates by observed consumption", async () => { + const pacer = createPacer({ reserve: 10 }); + // Window ~1–2s out, budget of 10 spendable requests → ~100–200ms per request. + const reset = Math.ceil(Date.now() / 1000) + 2; + const headers = { + "x-ratelimit-remaining": "20", + "x-ratelimit-reset": String(reset), + "x-ratelimit-used": "100", + }; + + pacer.observe(headers); + await pacer.gate(); // first gate sets the baseline; no spend yet + + pacer.observe({ ...headers, "x-ratelimit-used": "101" }); // one request spent + const beforeSecond = Date.now(); + await pacer.gate(); + assert.ok( + Date.now() - beforeSecond >= 80, + "second gate should pace out the request spent since the last gate" + ); +}); diff --git a/test/refresh-queue.test.js b/test/refresh-queue.test.js index a181c55d..aaef7018 100644 --- a/test/refresh-queue.test.js +++ b/test/refresh-queue.test.js @@ -1,6 +1,10 @@ import { test } from "node:test"; import assert from "node:assert/strict"; -import { processIssueItem, processPullItem } from "../lib/refresh.js"; +import { + processIssueItem, + processPullItem, + makeQueueConsumer, +} from "../lib/refresh.js"; // A transient failure parsing one item must not abort the sweep: the handler // logs, records the failure via the injected onFailure, and still calls next() @@ -42,3 +46,42 @@ test("processPullItem records the failure and continues past a failing parse", a assert.equal(nextCalled, 1); assert.deepEqual(failures, [{ repo: "test/repo-a", number: 9 }]); }); + +// Bulk pacing lives at the consumer: a work item awaits the pacer's slot before +// processing (the drain is where the API call spends quota), while the bare +// batch-done sentinel runs straight through, never gated. The item's onFailure +// is merged into the injected deps so it stays scoped to the run. +test("makeQueueConsumer gates a work item but never the batch-done sentinel", async () => { + let gateCalls = 0; + const pacer = { + gate: () => { + gateCalls++; + return Promise.resolve(); + }, + }; + const processed = []; + const consume = makeQueueConsumer( + pacer, + (response, next, deps) => { + processed.push({ response, deps }); + next(); + }, + { parse: "injected-dep" } + ); + + let nextCalled = 0; + const onFailure = () => {}; + await consume({ response: { number: 1 }, onFailure }, () => nextCalled++); + assert.equal(gateCalls, 1); + assert.equal(nextCalled, 1); + assert.deepEqual(processed[0].response, { number: 1 }); + assert.equal(processed[0].deps.parse, "injected-dep"); + assert.equal(processed[0].deps.onFailure, onFailure); + + let sentinelRan = 0; + await consume(() => sentinelRan++, () => nextCalled++); + assert.equal(sentinelRan, 1); + assert.equal(gateCalls, 1); // sentinel was not gated + assert.equal(nextCalled, 2); + assert.equal(processed.length, 1); // sentinel did not reach processItem +}); From 548eb29a6aca2dc69ef7ded2e3ba229e65ad7b81 Mon Sep 17 00:00:00 2001 From: Shawn Tice Date: Tue, 30 Jun 2026 18:19:27 -0500 Subject: [PATCH 3/3] Fold the queue pushes into the drain promise's executor The split (push the items, then `return new Promise`) was load-bearing only while enqueueing was async: the old loop awaited `pacer.gate()` between pushes, which can't run inside a synchronous Promise executor. With pacing moved to the consumer the loop is a plain `forEach`, so the pushes fold into the executor. It runs synchronously, so the queue ordering (items, then the resolve sentinel) is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- lib/refresh.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/refresh.js b/lib/refresh.js index 3d256cf1..047f9f65 100644 --- a/lib/refresh.js +++ b/lib/refresh.js @@ -144,8 +144,8 @@ export function makeQueueConsumer(pacer, processItem, deps) { */ function pushOnQueue(queue) { return function (githubResponse) { - queue.push({ response: githubResponse, onFailure: null }); return new Promise(function (resolve) { + queue.push({ response: githubResponse, onFailure: null }); queue.push(resolve); }); }; @@ -165,10 +165,10 @@ function pushOnQueue(queue) { */ function pushAllOnQueue(queue, onFailure) { return function (githubResponses) { - githubResponses.forEach(function (githubResponse) { - queue.push({ response: githubResponse, onFailure: onFailure }); - }); return new Promise(function (resolve) { + githubResponses.forEach(function (githubResponse) { + queue.push({ response: githubResponse, onFailure: onFailure }); + }); queue.push(resolve); }); };