fix: distinguish graceful EventQueue shutdown#1082
Conversation
🧪 Code Coverage (vs
|
| Base | PR | Delta | |
|---|---|---|---|
| src/a2a/server/events/event_queue_v2.py | 91.19% | 92.12% | 🟢 +0.93% |
| src/a2a/utils/telemetry.py | 90.63% | 91.41% | 🟢 +0.78% |
| Total | 93.07% | 93.10% | 🟢 +0.03% |
Generated by coverage-comment.yml
There was a problem hiding this comment.
Code Review
This pull request introduces a new exception, QueueShutDownNormal, to distinguish between graceful and immediate queue shutdowns in the EventQueueSink. The implementation tracks the shutdown type using a new _closed_immediate flag, ensuring that dequeue_event raises the appropriate exception based on how the queue was closed. Tests have been updated to verify this behavior. The reviewer identified a potential issue where subsequent calls to close() could incorrectly overwrite the shutdown state, and provided a code suggestion to ensure the state is correctly maintained.
| logger.debug('Closing EventQueueSink.') | ||
| async with self._lock: | ||
| self._is_closed = True | ||
| self._closed_immediate = immediate |
There was a problem hiding this comment.
If close() is called multiple times (which is safe and idempotent), a subsequent call with immediate=False after an immediate=True call would overwrite self._closed_immediate to False. This would incorrectly mark the shutdown as graceful, causing subsequent dequeues to raise QueueShutDownNormal instead of the generic QueueShutDown, even though the queue was actually flushed immediately and events were lost. Using self._closed_immediate = bool(self._closed_immediate) or immediate ensures that once the queue is closed immediately, it cannot be downgraded to a graceful shutdown status.
| self._closed_immediate = immediate | |
| self._closed_immediate = bool(self._closed_immediate) or immediate |
Summary
Motivation
The v2 queue currently uses the same QueueShutDown class for graceful stream teardown and abnormal lifecycle paths such as immediate close or attempts to use a closed queue. Telemetry wrappers and consumers cannot distinguish those cases after the generic exception is raised.
Validation