Skip to content

fix: distinguish graceful EventQueue shutdown#1082

Draft
AXEG0 wants to merge 1 commit into
a2aproject:mainfrom
AXEG0:codex-queueshutdown-normal
Draft

fix: distinguish graceful EventQueue shutdown#1082
AXEG0 wants to merge 1 commit into
a2aproject:mainfrom
AXEG0:codex-queueshutdown-normal

Conversation

@AXEG0
Copy link
Copy Markdown

@AXEG0 AXEG0 commented May 27, 2026

Summary

  • add QueueShutDownNormal for graceful EventQueueSink shutdowns
  • map graceful sink dequeue shutdowns to the narrower subclass
  • keep immediate shutdown and closed enqueue/tap paths on the generic QueueShutDown signal

Motivation

The v2 queue currently uses the same QueueShutDown class for graceful stream teardown and abnormal lifecycle paths such as immediate close or attempts to use a closed queue. Telemetry wrappers and consumers cannot distinguish those cases after the generic exception is raised.

Validation

  • UV_CACHE_DIR=/var/tmp/argylle/tmp/task-1779877269025068489-2f550c95/uv-cache UV_PYTHON_INSTALL_DIR=/var/tmp/argylle/tmp/task-1779877269025068489-2f550c95/uv-python uv run pytest tests/server/events/test_event_queue_v2.py -q

@github-actions
Copy link
Copy Markdown

🧪 Code Coverage (vs main)

⬇️ Download Full Report

Base PR Delta
src/a2a/server/events/event_queue_v2.py 91.19% 92.12% 🟢 +0.93%
src/a2a/utils/telemetry.py 90.63% 91.41% 🟢 +0.78%
Total 93.07% 93.10% 🟢 +0.03%

Generated by coverage-comment.yml

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new exception, QueueShutDownNormal, to distinguish between graceful and immediate queue shutdowns in the EventQueueSink. The implementation tracks the shutdown type using a new _closed_immediate flag, ensuring that dequeue_event raises the appropriate exception based on how the queue was closed. Tests have been updated to verify this behavior. The reviewer identified a potential issue where subsequent calls to close() could incorrectly overwrite the shutdown state, and provided a code suggestion to ensure the state is correctly maintained.

logger.debug('Closing EventQueueSink.')
async with self._lock:
self._is_closed = True
self._closed_immediate = immediate
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If close() is called multiple times (which is safe and idempotent), a subsequent call with immediate=False after an immediate=True call would overwrite self._closed_immediate to False. This would incorrectly mark the shutdown as graceful, causing subsequent dequeues to raise QueueShutDownNormal instead of the generic QueueShutDown, even though the queue was actually flushed immediately and events were lost. Using self._closed_immediate = bool(self._closed_immediate) or immediate ensures that once the queue is closed immediately, it cannot be downgraded to a graceful shutdown status.

Suggested change
self._closed_immediate = immediate
self._closed_immediate = bool(self._closed_immediate) or immediate

@AXEG0 AXEG0 changed the title Distinguish graceful EventQueue shutdown fix: distinguish graceful EventQueue shutdown May 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant