Skip to content

Fix pipe drop event discard with restart-aware committer keys#17748

Merged
jt2594838 merged 5 commits into
masterfrom
Fix-drop
May 27, 2026
Merged

Fix pipe drop event discard with restart-aware committer keys#17748
jt2594838 merged 5 commits into
masterfrom
Fix-drop

Conversation

@Caideyipi
Copy link
Copy Markdown
Collaborator

@Caideyipi Caideyipi commented May 22, 2026

Description

This PR fixes pipe event discard logic when dropping a pipe task by using CommitterKey instead of only (pipeName, creationTime, regionId).

Previously, queued/retry/batched events were matched only by pipe name, creation time, and region id, which could incorrectly
discard events from a restarted pipe task. This change propagates the full committer key through pending queues, sink
subtasks, batch builders, and sink implementations so discard checks can distinguish task restart generations.

Changes

  • Use CommitterKey for dropped pipe task tracking and event matching.
  • Propagate restart-aware discard APIs across pipe sink subtasks and connectors.
  • Update async, sync, airgap, and websocket sinks to discard only matching pipe task events.
  • Keep backward-compatible discard methods with wildcard restart time.

This PR has:

  • been self-reviewed.
    • concurrent read
    • concurrent write
    • concurrent read and write
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods.
  • added or updated version, license, or notice information
  • added comments explaining the "why" and the intent of the code wherever would not be obvious
    for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold
    for code coverage.
  • added integration tests.
  • been tested in a test IoTDB cluster.

Key changed/added classes (or packages if there are too many classes) in this PR

@Caideyipi Caideyipi changed the title Fixed the drop pipe logic to avoid losing data Fix pipe drop event discard with restart-aware committer keys May 22, 2026
@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
7.3% Duplication on New Code (required ≤ 5%)

See analysis details on SonarQube Cloud

@codecov
Copy link
Copy Markdown

codecov Bot commented May 26, 2026

Codecov Report

❌ Patch coverage is 56.32184% with 38 lines in your changes missing coverage. Please review.
✅ Project coverage is 40.57%. Comparing base (0d1b838) to head (a20c1d3).
⚠️ Report is 13 commits behind head on master.

Files with missing lines Patch % Lines
.../payload/evolvable/batch/PipeTabletEventBatch.java 0.00% 7 Missing ⚠️
.../pipe/agent/task/subtask/sink/PipeSinkSubtask.java 25.00% 6 Missing ⚠️
...ink/protocol/airgap/IoTDBDataRegionAirGapSink.java 0.00% 3 Missing ⚠️
.../protocol/thrift/sync/IoTDBDataRegionSyncSink.java 0.00% 3 Missing ⚠️
...db/pipe/sink/protocol/websocket/WebSocketSink.java 0.00% 3 Missing ⚠️
...e/sink/protocol/PipeConnectorWithEventDiscard.java 0.00% 3 Missing ⚠️
...gent/task/subtask/sink/PipeSinkSubtaskManager.java 0.00% 2 Missing ⚠️
...d/evolvable/batch/PipeTransferBatchReqBuilder.java 50.00% 2 Missing ⚠️
...rotocol/thrift/async/IoTDBDataRegionAsyncSink.java 83.33% 2 Missing ⚠️
...n/task/subtask/SubscriptionSinkSubtaskManager.java 0.00% 2 Missing ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17748      +/-   ##
============================================
- Coverage     40.59%   40.57%   -0.03%     
  Complexity     2574     2574              
============================================
  Files          5179     5180       +1     
  Lines        349979   350366     +387     
  Branches      44749    44803      +54     
============================================
+ Hits         142082   142153      +71     
- Misses       207897   208213     +316     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jt2594838 jt2594838 merged commit e7ab13f into master May 27, 2026
42 of 44 checks passed
Copy link
Copy Markdown
Member

@luoluoyuyu luoluoyuyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review 总结

CommitterKey(含 restartTimes)精确匹配 drop 的 pipe 实例,修复「DROP 后同名 pipe 重启/重建时误丢新 pipe 事件」的问题,设计合理。

建议合入前确认行内 2 点语义并补 IT。


if (enrichedEvent.getPipeName() != null
&& pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) {
&& (pendingQueue.isEventFromDroppedPipe(enrichedEvent)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

丢弃逻辑分两支:

  1. isEventFromDroppedPipe(enrichedEvent) — 有 committerKey 的精确匹配
  2. committerKey == null 时回退到 (pipeName, creationTime, regionId)

请确认committerKey == null 的事件是否只可能来自「尚未 enrich」的瞬时状态?若某些路径长期为 null,可能在 drop 后仍按旧三元组误丢 pipe(若 creationTime 复用)。

建议在注释中写明 null committerKey 的生命周期,或保证 enrich 在入队前完成。

return committerKey.getPipeName().equals(event.getPipeName())
&& committerKey.getCreationTime() == event.getCreationTime()
&& committerKey.getRegionId() == event.getRegionId()
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

committerKey.getRestartTimes() < 0 时跳过 restartTimes 比较,用于 drop 时丢弃该 creationTime 下所有 restart 实例。

建议

  1. CommitterKey 或调用处用常量/document 说明 -1 语义(如 DROP_ALL_RESTARTS
  2. 补 IT:CREATE pipe A → DROP → CREATE 同名 pipe B(新 creationTime)→ 验证 B 的事件不被 discard

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants