Conversation
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17748 +/- ##
============================================
- Coverage 40.59% 40.57% -0.03%
Complexity 2574 2574
============================================
Files 5179 5180 +1
Lines 349979 350366 +387
Branches 44749 44803 +54
============================================
+ Hits 142082 142153 +71
- Misses 207897 208213 +316 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
luoluoyuyu
left a comment
There was a problem hiding this comment.
Review 总结
用 CommitterKey(含 restartTimes)精确匹配 drop 的 pipe 实例,修复「DROP 后同名 pipe 重启/重建时误丢新 pipe 事件」的问题,设计合理。
建议合入前确认行内 2 点语义并补 IT。
|
|
||
| if (enrichedEvent.getPipeName() != null | ||
| && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { | ||
| && (pendingQueue.isEventFromDroppedPipe(enrichedEvent) |
There was a problem hiding this comment.
丢弃逻辑分两支:
isEventFromDroppedPipe(enrichedEvent)— 有 committerKey 的精确匹配committerKey == null时回退到(pipeName, creationTime, regionId)
请确认:committerKey == null 的事件是否只可能来自「尚未 enrich」的瞬时状态?若某些路径长期为 null,可能在 drop 后仍按旧三元组误丢新 pipe(若 creationTime 复用)。
建议在注释中写明 null committerKey 的生命周期,或保证 enrich 在入队前完成。
| return committerKey.getPipeName().equals(event.getPipeName()) | ||
| && committerKey.getCreationTime() == event.getCreationTime() | ||
| && committerKey.getRegionId() == event.getRegionId() | ||
| && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); |
There was a problem hiding this comment.
committerKey.getRestartTimes() < 0 时跳过 restartTimes 比较,用于 drop 时丢弃该 creationTime 下所有 restart 实例。
建议:
- 在
CommitterKey或调用处用常量/document 说明-1语义(如DROP_ALL_RESTARTS) - 补 IT:
CREATE pipe A → DROP → CREATE 同名 pipe B(新 creationTime)→ 验证 B 的事件不被 discard


Description
This PR fixes pipe event discard logic when dropping a pipe task by using
CommitterKeyinstead of only(pipeName, creationTime, regionId).Previously, queued/retry/batched events were matched only by pipe name, creation time, and region id, which could incorrectly
discard events from a restarted pipe task. This change propagates the full committer key through pending queues, sink
subtasks, batch builders, and sink implementations so discard checks can distinguish task restart generations.
Changes
CommitterKeyfor dropped pipe task tracking and event matching.This PR has:
for an unfamiliar reader.
for code coverage.
Key changed/added classes (or packages if there are too many classes) in this PR