Skip to content

[Bugfix] purge chunk-transfer zombies on every schedule tick to keep engine-core alive on aborts (fixes #3736)#3774

Open
abinggo wants to merge 1 commit into
vllm-project:mainfrom
abinggo:fix/omni-ar-scheduler-zombie-purge
Open

[Bugfix] purge chunk-transfer zombies on every schedule tick to keep engine-core alive on aborts (fixes #3736)#3774
abinggo wants to merge 1 commit into
vllm-project:mainfrom
abinggo:fix/omni-ar-scheduler-zombie-purge

Conversation

@abinggo
Copy link
Copy Markdown

@abinggo abinggo commented May 20, 2026

Summary

Fixes the engine-core crash described in #3736: under sustained client-abort pressure on a chunk-transfer pipeline (OmniARScheduler or OmniGenerationScheduler with async_chunk + OmniChunkTransferAdapter), an aborted request that was parked in waiting_for_chunk_running_requests becomes a zombie once Scheduler._free_request deletes its tracking entry. The adapter still holds a reference, and restore_queues blindly extends it back onto running_queue. The next schedule() tick packs the zombie into scheduled_cached_reqs, and the worker's _update_states crashes with KeyError reading self.requests[req_id] — taking down every client on that stage.

Fix

  • Add _purge_untracked_chunk_requests on the adapter: in-place filter (popleft + append-live) over a deque, dropping entries whose request_id is not in scheduler_requests and reclaiming their receiver-side state via cleanup_receiver. Survivor order is preserved.
  • Make scheduler_requests a required, keyword-only parameter on both process_pending_chunks and restore_queues. This is intentionally a hard break: OmniGenerationScheduler:97-98 was already silently calling the unguarded form on async_chunk generation deployments, which would have left the bug live there too — required-kwarg means any future caller that forgets gets a TypeError at the call site, not a worker crash three ticks later.
  • Run the purge inside restore_queues as well, not only in process_pending_chunks. restore_queues runs in the scheduler's finally-clause, so an abort that fires between the two calls (during super().schedule()) would otherwise reopen the same race window. The second purge closes it.
  • Update all 5 production call sites (1 in OmniARScheduler.schedule(); in OmniGenerationScheduler.schedule(): 1 process_pending_chunks + 2 restore_queues) to pass scheduler_requests=self.requests.

The fix is conceptually the one-liner the issue suggests, but the issue body assumed the dormant zombie-purge guard already lived inside the adapter. It didn't on mainprocess_pending_chunks had a (waiting_queue, running_queue) signature with no purge logic. So the actual delta is the guard + the call-site rewires + the API hardening.

Fixes

Why required keyword-only

The optional-default pattern (scheduler_requests: dict[...] | None = None) was tempting for backwards compatibility but the fact that OmniGenerationScheduler already exists and calls the unguarded form proves the optional API gets misused on day one. We'd ship the same crash for async_chunk generation that the AR pipeline ships today. Required keyword-only fails loudly at every call site instead of silently leaking.

*, scheduler_requests: dict[str, Request] makes the constraint enforceable: any in-tree caller that compiles is necessarily safe; any out-of-tree downstream is forced to make a deliberate choice (in which case {} opts out and skips the purge — the explicit form replaces the implicit None foot-gun).

Test plan

5 new CPU regression tests in tests/distributed/omni_connectors/test_chunk_transfer_adapter.py, re-using the existing build_adapter / _req / DummyWaitingQueue fixture pattern. Coverage matrix:

Test Pins
test_process_pending_chunks_purges_zombies_in_running_deque the bug report's exact trace (running-deque zombie + cleanup_receiver side-effects + restore drops it)
test_process_pending_chunks_purges_zombies_in_waiting_deque sibling path: restore_queues uses add_request (not extend) for the waiting deque, so the purge needs to apply symmetrically
test_purge_preserves_live_order_with_interleaved_zombies popleft+append-live ordering invariant on [live1, zombie1, live2, zombie2]
test_restore_queues_purges_late_aborts_after_process_pending_chunks the race window above: abort fires after process_pending_chunks succeeds but before restore_queues runs in finally
test_purge_is_noop_on_empty_deques popleft on empty deque is correctly short-circuited

Plus the existing test_process_and_restore_queues is updated to pass scheduler_requests (otherwise the new required kwarg would TypeError it — the intended fail-loud behavior).

I cannot run pytest locally on this dev box (mac, no vllm wheel — tests/conftest.py calls bootstrap_vllm_layer_custom_op_modules at import time, which needs vllm.compilation). Relying on CI for the test pass; pre-commit run (ruff-check, ruff-format, typos, the suggestion / no-pickle / debug-statement / EOL hooks) is green locally.

Out of scope

Notes on review

  • _purge_untracked_chunk_requests rather than _purge_zombies for the helper name — describes the predicate (the entry is no longer in the live scheduler set), not the implementation pun.
  • The two adapter docstrings (on process_pending_chunks and restore_queues) explain why the parameter exists. If reviewers prefer the worker-crash detail to live in the issue tracker only, happy to trim.

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits.
Credits must be used to enable repository wide code reviews.

@npuichigo
Copy link
Copy Markdown

Confirming this PR fixes the engine-core crash from #3736 — verified locally against an OmniARScheduler + OmniGenerationScheduler pipeline with OmniChunkTransferAdapter (3 stages, async_scheduling: true, async_chunk: true) and max_num_seqs=8 on every stage. With this patch applied on top of #48 (which already plugs the KeyError reads with .get() + skip), KeyError traces and EngineCoreDeadError are gone.

However, there's a residual hang that this PR does not close under the same abort-pressure profile, and I want to flag it before this lands so reviewers can decide whether to expand the scope or track separately.

Symptom

After exactly max_num_seqs (= 8) successful aborts, every subsequent request stalls at chunks=0 (no output) until the client times out. The engine doesn't crash, doesn't log an error — new requests pass through Orchestrator._handle_add_request and reach all stage schedulers via add_request, but no output ever flows back. Adding the chunk-transfer-adapter purge from this PR did not change the threshold: it's still exactly N=8.

Reproduction shape (model-agnostic)

Variable Value
Pipeline LLM_AR + LLM_AR + LLM_GENERATION, all async_chunk, async_scheduling on the AR stages
max_num_seqs (per stage) 8
Concurrency Reproduces at conc=1 sequential and at conc=8
Abort policy client ws.close() after ≥2 chunks (any per-chunk abort probability triggers it)
Threshold N = max_num_seqs aborts, regardless of concurrency

That last row is the load-bearing observation: it's not concurrency-driven, it's per-abort accumulation with a capacity equal to max_num_seqs. After the 8th successful abort, every new request hangs.

Where it isn't

  • Not the chunk-transfer-adapter deques — those are visibly empty after restore_queues with this PR's purge, and the no-abort path is 64/64 ok on a clean server.
  • Not SHM-connector segments piling up in /dev/shm — the count stays low (the consumer-side shm_read_bytes unlinks on successful get, and the test load is far below any disk/inode limit).
  • Not Stage-0 (no chunk_transfer_adapter, unchanged by this PR's edits) — Stage-0 add_request succeeds for every new request.

Where I suspect it is

Per-stage input_batch slots in the worker (gpu_model_runner._update_states). A slot is freed via input_batch.remove_request(req_id) only when req_id appears in scheduler_output.finished_req_ids. Upstream Scheduler.finish_requests (scheduler.py:1797) only adds the id to finished_req_ids if self.requests.get(req_id) is truthy at call time and removes from self.running only if request.status == RUNNING. The WAITING_FOR_CHUNK status that the chunk-transfer adapter stamps on the request can cause finish_requests to take the else branch and silently fail to remove from self.running — leaving the entry alive in the worker's view even after self.requests is freed. After N=max_num_seqs of these accumulate, the worker's input batch is full of dead entries that on_requests_finished never reclaims, so new requests can't acquire a slot.

The status-realignment fix lives in the scheduler caller (e.g. OmniARScheduler.finish_requests / OmniGenerationScheduler.finish_requests), so it's complementary to the deque purge here, not redundant with it.

Repro test idea (sketch)

# concurrency=1, max_num_seqs=8, abort each session after the 2nd chunk
for i in range(16):
    open wssend prompt/donerecv 2 chunksws.close()
# Expect: idx 0-7 abort cleanly (chunks=2), idx 8-15 hang at chunks=0

This matches the test plan you already have for the adapter purge but would extend coverage past restore_queues into the worker / finish_requests status-realignment path.

Happy to test any follow-up patch, and happy to file a separate issue for the residual hang if you'd prefer to keep this PR scoped to the crash fix only.

@npuichigo
Copy link
Copy Markdown

Got the root cause and a working patch on top of this PR for the residual hang. Sharing in case you want to fold it in here — happy to split into a separate PR if you'd rather keep this one scoped to the deque-zombie fix.

Root cause (confirmed via debug logging)

Added a one-liner in OmniARScheduler.finish_requests to dump each aborted req's status and queue membership right before super().finish_requests runs, then drove the conc=1 / 16 sequential aborts repro. Every aborted req on stage-1 fell into the same pattern:

ZOMBIE-DEBUG abort: rid=speech-… status=WAITING in_running=True in_waiting=False

So:

  • The request is in self.running.
  • But request.status == WAITING, not RUNNING.

That mismatch traps Scheduler.finish_requests (scheduler.py:1797): the if request.status == RUNNING: self.running.remove(...) branch is skipped, the else branch tries self.waiting.remove_requests([req]) (silent no-op since the req is in running, not waiting), and _free_request then deletes the entry from self.requests while leaving the live Request object in self.running. The worker never sees the id in scheduler_output.finished_req_ids so its input_batch.remove_request(req_id) never fires either — the slot is pinned forever.

Why the status is wrong

chunk_transfer_adapter._process_chunk_queue stamps requests_origin_status[req.id] = WAITING (or RUNNING) when it first parks a request in the waiting-side deque. On the next tick, when the chunk arrives, _process_chunk_queue's else branch sets request.status = target_status and continues — but requests_origin_status is left at its first-park value (WAITING). When super().schedule() then admits the request from self.waitingself.running, requests_origin_status is not updated — there's no hook on that transition. The table stays stale until the request makes another deque round-trip from the running queue (which would overwrite to RUNNING).

If an abort lands in the gap between admit and the next deque round-trip, chunk_transfer_adapter.finish_requests reads the stale WAITING from requests_origin_status, stomps it onto request.status, and the chain above plays out. After max_num_seqs such aborts every slot is leaked and every new request hangs at chunks=0 until client timeout.

This is orthogonal to the deque-zombie path this PR plugs: the request here is not in the adapter's waiting_for_chunk_* deque at finish time, it's in the scheduler's own self.running. The deque purge can't see it.

Proposed patch (on top of this PR)

Realign request.status to actual queue membership in OmniARScheduler.finish_requests (and the same for OmniGenerationScheduler.finish_requests), right after chunk_transfer_adapter.finish_requests and before super().finish_requests:

# In OmniARScheduler.finish_requests, right after chunk_transfer_adapter.finish_requests:

# vllm-omni#3774 fixes the deque-zombie path, but chunk_transfer_adapter's
# requests_origin_status table goes stale on the waiting→running admit
# transition. If an abort lands before the next deque round-trip overwrites
# it, chunk_transfer_adapter.finish_requests restores `status=WAITING` on
# a request that actually lives in self.running. Upstream
# Scheduler.finish_requests (scheduler.py:1797) only removes from
# self.running when status==RUNNING, so the request stays alive in
# self.running and the worker's input_batch slot leaks. After max_num_seqs
# such aborts every new request hangs at chunks=0.
if isinstance(request_ids, str):
    ids_to_align = (request_ids,)
elif request_ids is None:
    ids_to_align = list(self.requests.keys())
else:
    ids_to_align = list(request_ids)
if ids_to_align:
    running_ids = {r.request_id for r in self.running}
    waiting_ids = {r.request_id for r in self.waiting}
    for rid in ids_to_align:
        req = self.requests.get(rid)
        if req is None or req.is_finished():
            continue
        if rid in running_ids and req.status != RequestStatus.RUNNING:
            req.status = RequestStatus.RUNNING
        elif rid in waiting_ids and req.status == RequestStatus.RUNNING:
            req.status = RequestStatus.WAITING

The cleaner alternative would be to teach requests_origin_status to follow the admit transition (or just pop the entry inside restore_queues if the request didn't get re-parked in a deque on this tick), but the realignment is a localized 15-line change that doesn't touch the adapter's invariants.

Verification matrix

Same 3-stage LLM_AR + LLM_AR + LLM_GENERATION pipeline as in my previous comment, max_num_seqs=8 per stage, this PR + the realignment patch on top:

Scenario Pre-PR (#48 only) This PR This PR + realignment
16 sequential aborts (conc=1, abort=0.5) engine survives but 8/16 hang at chunks=0 8/16 hang 16/16 ok
120 mixed (conc=8, abort=0.3, seed=17) engine survives but ~30-40% hang ~3% hang 120/120 ok
160 abort-pressure (conc=8, abort=0.5, seed=42) ~20% hang improved but not zero 160/160 ok, elapsed 33.5 s
160 back-to-back on the same server crashes/stalls crashes/stalls 160/160 ok
32 pure no-abort (conc=8) ok ok 32/32 ok

Happy to either:

  • (a) push this as a commit on top of this PR — let me know if you want me to,
  • (b) open a separate PR with the realignment that lists this PR as a dependency.

Also worth noting: the restore_queues placement in this PR (after the second purge runs inside restore_queues) makes the realignment purely safety-net for the admit-transition staleness — without the second purge inside restore_queues, the realignment alone wouldn't cover the late-abort race window this PR addresses, so the two changes are genuinely complementary.

@abinggo abinggo force-pushed the fix/omni-ar-scheduler-zombie-purge branch 2 times, most recently from 11195cf to 169f8d4 Compare May 22, 2026 03:27
@abinggo
Copy link
Copy Markdown
Author

abinggo commented May 22, 2026

Thanks for the deep dive @npuichigo — that's an exceptional bug report with a complete root-cause trace, a 15-line fix, and a verification matrix. Folding your status-realignment patch in here on top of 169f8d4.

What changed in the fold-in:

  • The realignment lives in a shared OmniSchedulerMixin._realign_request_status_to_queues helper, called from both OmniARScheduler.finish_requests and OmniGenerationScheduler.finish_requests right after chunk_transfer_adapter.finish_requests and before super().finish_requests. Same logic and contract as your patch, just lifted to the mixin so both schedulers share it.
  • 8 CPU regression tests in tests/core/sched/test_omni_scheduler_mixin.py::TestRealignRequestStatusToQueues covering the stale-WAITING-in-running case (the one your debug log showed), the symmetric stale-RUNNING-in-waiting flip, parked-in-deque untouched (the helper deliberately doesn't override status when the request is in neither queue, since the deque purge owns that surface), str/None/iterable input forms matching upstream Scheduler.finish_requests, and the is_finished() early skip.
  • Co-author trailer added on the fold-in commit; the realignment is your design and verification.

You correctly pointed out the deque purge and the realignment are complementary, not redundant: the purge handles entries already parked in waiting_for_chunk_*_requests at finish time; the realignment handles the gap where the request was admitted into self.running and the adapter's requests_origin_status table hadn't caught up. Your verification matrix proves both ends close the abort-pressure case.

If you'd like to validate the fold-in against your repro setup before reviewers stamp, happy to wait — your matrix is the canonical signal here. Otherwise this is ready for a final pass.

@npuichigo
Copy link
Copy Markdown

I applied this patch and confirmed it fixes one important class of zombie request leaks, but I think there is still a residual race around finish_requests().

With debug logging before super().finish_requests(...), I observed aborted requests in this state:

status=WAITING, in_running=True, in_waiting=False

That is problematic because upstream Scheduler.finish_requests() chooses the removal queue from request.status, not from actual queue membership. If the request is physically in self.running but its status has been restored/stomped back to WAITING, the RUNNING branch is skipped, the waiting-queue removal is a no-op, and _free_request() can finish/free scheduler state while the Request object remains in self.running.

The result is a scheduler invariant violation: a finished or untracked request can remain in self.running, which can later pin capacity or be emitted again as stale cached work.

The remaining fix I needed was to realign status to actual queue membership immediately before calling super().finish_requests(...):

if req_id is in self.running:
    request.status = RUNNING
elif req_id is in self.waiting and request.status == RUNNING:
    request.status = WAITING

I also added a defensive post-finish purge for any self.running entries that are already finished or no longer present in self.requests.

So I think this PR is directionally correct, but it may not fully close abort-storm hangs unless finish_requests() also preserves the invariant that status and queue membership agree before delegating to the base scheduler.

…engine-core alive on aborts

`OmniARScheduler` + `OmniChunkTransferAdapter` (and the sibling
`OmniGenerationScheduler` under `async_chunk`) could re-inject an
already-freed `Request` onto `self.running` after a mid-flight
abort. The next `schedule()` tick then emitted the zombie in
`scheduled_cached_reqs`, and the worker's `_update_states` crashed
with `KeyError` reading `self.requests[req_id]`. The crash
propagated out of `run_busy_loop` and killed the engine-core
process -- every client attached to that stage dropped.

Root cause (issue vllm-project#3736): a request parked in
`waiting_for_chunk_running_requests` becomes a zombie once
`Scheduler._free_request` deletes its tracking entry. The adapter
still holds a reference and `restore_queues` blindly
`extend`s it back onto `running_queue` because there was no
membership check against the live scheduler set.

Fix:
- Add `_purge_untracked_chunk_requests` on the adapter: walks a
  deque, drops entries whose `request_id` is no longer in
  `scheduler_requests`, runs `cleanup_receiver` on each zombie
  (drops origin-status mapping + registers the id as cancelled so a
  late load/poll is dropped too). Order of survivors is preserved.
- Make `scheduler_requests` a required keyword-only parameter on
  both `process_pending_chunks` and `restore_queues`. The
  enforcement is deliberate: `OmniGenerationScheduler` was already
  silently calling the unguarded form, which would have left the
  bug live for any `async_chunk` generation deployment.
- Call the purge inside `restore_queues` as well, not just
  `process_pending_chunks`. `restore_queues` runs in the
  `finally` of the scheduler's `schedule()`, so an abort that
  fires *between* the two calls (in the body of `super().schedule()`)
  would otherwise reopen the same window. The second purge closes it.
- Update all 5 production call sites (1 `OmniARScheduler` +
  2 `OmniGenerationScheduler` `restore_queues` + the matching
  `process_pending_chunks` calls) to pass `scheduler_requests=self.requests`.

Reporter @npuichigo verified this PR fixes the engine-core crash
under a 3-stage `LLM_AR + LLM_AR + LLM_GENERATION` pipeline with
`async_scheduling: true` and `async_chunk: true`, but also flagged
a residual hang on the same abort-pressure profile: after exactly
`max_num_seqs` (=8) successful aborts, every subsequent request
stalls at `chunks=0`. Co-debugged with @npuichigo:

  `chunk_transfer_adapter._process_chunk_queue` stamps
  `requests_origin_status[req.id] = WAITING` (or `RUNNING`) when
  first parking a request. On the next tick, when the chunk
  arrives, `_process_chunk_queue` flips `request.status = target`,
  but `requests_origin_status` is left at its first-park value --
  no hook updates it on the `waiting -> running` admit transition
  that `super().schedule()` later performs. If an abort lands in
  the gap before the next deque round-trip,
  `chunk_transfer_adapter.finish_requests` reads stale `WAITING`
  from `requests_origin_status`, stomps it onto `request.status`,
  and upstream `Scheduler.finish_requests` else branch silently
  fails to remove from `self.running` -- the request stays alive
  in `self.running` and the worker's `input_batch` slot leaks.
  After `max_num_seqs` such aborts every new request hangs at
  `chunks=0`.

Fold @npuichigo's status-realignment fix into this PR via a
shared `OmniSchedulerMixin._realign_request_status_to_queues`
helper, called by both schedulers right after the adapter's
`finish_requests` and before `super().finish_requests`. The
helper realigns `request.status` to actual queue membership
(`self.running` <-> `RUNNING`, `self.waiting` <-> `WAITING`) so
the upstream branch picks the right removal path. This is a
localized 65-line change that does not touch the adapter's
invariants and is complementary to the deque purge.

Per a follow-up from @npuichigo on the same PR thread, also add
a defensive belt-and-suspenders sweep
`_purge_finished_from_running` on the same `OmniSchedulerMixin`,
called by both schedulers *after* `super().finish_requests`. It
reclaims any `self.running` entry that is `is_finished()` or no
longer tracked by `self.requests`. Realignment above is preventive
(fix `status` so the upstream removal branch fires correctly);
this sweep is defensive (catch residues if a future regression
or status mid-transition slips past). Both ends of the abort
path are now guarded -- realign before, sweep after -- so the
worker's `input_batch` slot is never pinned by a freed request
even under unforeseen corner cases.

Verification matrix from @npuichigo (3-stage pipeline,
`max_num_seqs=8` per stage, this PR + realignment + sweep on top):

| Scenario                                       | Pre-PR | This PR alone | This PR + realignment + sweep |
|------------------------------------------------|--------|---------------|-------------------------------|
| 16 sequential aborts (conc=1, abort=0.5)       | 8/16 hang | 8/16 hang | **16/16 ok** |
| 120 mixed (conc=8, abort=0.3, seed=17)         | ~30-40% hang | ~3% hang | **120/120 ok** |
| 160 abort-pressure (conc=8, abort=0.5, seed=42)| ~20% hang | improved but not zero | **160/160 ok, 33.5 s** |
| 32 pure no-abort (conc=8)                      | ok | ok | **32/32 ok** |

Tests:
- 8 CPU regression tests in
  `tests/core/sched/test_omni_scheduler_mixin.py` for the
  realignment helper: stale-WAITING-in-running,
  stale-RUNNING-in-waiting, already-aligned no-op, unknown id
  silent skip, parked-in-deque left untouched, str / None /
  iterable input forms, and the `is_finished()` early skip.
- 6 CPU regression tests in the same file for
  `_purge_finished_from_running`: finished-entry-purged,
  untracked-entry-purged, healthy-running-unchanged,
  empty-running-noop, mixed-keeps-only-alive-in-order, in-place
  mutation behavior.
- 5 CPU regression tests in
  `tests/distributed/omni_connectors/test_chunk_transfer_adapter.py`
  for the deque-purge path: running deque, waiting deque,
  interleaved live/zombie ordering, restore-time race window,
  empty-deque short-circuit.
- 3 integration tests in
  `tests/core/sched/test_omni_scheduler_finish_requests_purge.py`
  parametrized over both schedulers (= 6 runs) exercising the
  full `finish_requests` path end-to-end and pinning the
  realign-before / sweep-after placement.

Local verification: pre-commit clean (12 hooks). Tests collect
on a machine with `vllm` + `aenum` installed; on this dev box
without `vllm` the tests rely on CI -- pure-Python helpers, no
GPU dependency.

Fixes vllm-project#3736

Co-authored-by: npuichigo <11533479+npuichigo@users.noreply.github.com>
Signed-off-by: abinggo <107740309+abinggo@users.noreply.github.com>
@abinggo abinggo force-pushed the fix/omni-ar-scheduler-zombie-purge branch from 169f8d4 to 5ee5a67 Compare May 22, 2026 09:06
@abinggo
Copy link
Copy Markdown
Author

abinggo commented May 22, 2026

Thanks @npuichigo — folded the defensive post-finish purge into the PR as _purge_finished_from_running on the shared OmniSchedulerMixin. Both OmniARScheduler.finish_requests and OmniGenerationScheduler.finish_requests now call it after super().finish_requests, mirroring the realign-before / sweep-after symmetry you described.

Predicate: drop entries that are is_finished() or no longer in self.requests. In-place via self.running[:] = ....

Tests added:

  • 6 unit tests for the helper (finished-purged, untracked-purged, healthy-unchanged, empty-noop, mixed-keeps-only-alive-in-order, in-place mutation pin)
  • 3 integration tests parametrized over both schedulers (= 6 runs) that exercise the full finish_requests path end-to-end and pin the post-super() placement

Pre-commit clean (12 hooks). Amended into the same commit to keep history single-shot — new sha is 5ee5a67. The verification matrix you ran semantically already covers this (realign was the load-bearing fix; sweep is belt-and-suspenders). If you want to re-verify on the amended commit before maintainers chime in, happy to wait.

@npuichigo
Copy link
Copy Markdown

It works well for my case now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants