-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fix CQ recovery gap and stale callback contamination #17734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,10 @@ | |
|
|
||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.locks.ReadWriteLock; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
|
||
|
|
@@ -57,11 +60,14 @@ | |
|
|
||
| private final ReadWriteLock lock; | ||
|
|
||
| private final ConcurrentMap<String, String> locallyScheduledCQs; | ||
|
|
||
| private ScheduledExecutorService executor; | ||
|
|
||
| public CQManager(ConfigManager configManager) { | ||
| this.configManager = configManager; | ||
| this.lock = new ReentrantReadWriteLock(); | ||
| this.locallyScheduledCQs = new ConcurrentHashMap<>(); | ||
| this.executor = | ||
| IoTDBThreadPoolFactory.newScheduledThreadPool( | ||
| CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); | ||
|
|
@@ -79,7 +85,11 @@ | |
|
|
||
| public TSStatus dropCQ(TDropCQReq req) { | ||
| try { | ||
| return configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); | ||
| TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); | ||
| if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| locallyScheduledCQs.remove(req.cqId); | ||
| } | ||
| return status; | ||
| } catch (ConsensusException e) { | ||
| LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ, req.cqId, e); | ||
| // consensus layer related errors | ||
|
|
@@ -113,7 +123,7 @@ | |
| return res; | ||
| } | ||
|
|
||
| public void startCQScheduler() { | ||
|
Check failure on line 126 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
|
||
| lock.writeLock().lock(); | ||
| try { | ||
| // 1. shutdown previous cq schedule thread pool | ||
|
|
@@ -132,6 +142,7 @@ | |
| executor = | ||
| IoTDBThreadPoolFactory.newScheduledThreadPool( | ||
| CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); | ||
| locallyScheduledCQs.clear(); | ||
|
|
||
| // 3. get all CQs | ||
| List<CQInfo.CQEntry> allCQs = null; | ||
|
|
@@ -155,8 +166,16 @@ | |
| if (allCQs != null) { | ||
| for (CQInfo.CQEntry entry : allCQs) { | ||
| if (entry.getState() == CQState.ACTIVE) { | ||
| if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) { | ||
| continue; | ||
| } | ||
| CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); | ||
| cqScheduleTask.submitSelf(); | ||
| try { | ||
| cqScheduleTask.submitSelf(); | ||
| } catch (RuntimeException e) { | ||
| unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5()); | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -176,11 +195,30 @@ | |
| try { | ||
| previous = executor; | ||
| executor = null; | ||
| locallyScheduledCQs.clear(); | ||
| } finally { | ||
| lock.writeLock().unlock(); | ||
| } | ||
| if (previous != null) { | ||
| previous.shutdown(); | ||
| } | ||
| } | ||
|
|
||
| public boolean markCQLocallyScheduled(String cqId, String md5) { | ||
|
Check warning on line 207 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
|
||
| AtomicBoolean shouldSchedule = new AtomicBoolean(false); | ||
| locallyScheduledCQs.compute( | ||
| cqId, | ||
| (ignored, previousMd5) -> { | ||
| if (!md5.equals(previousMd5)) { | ||
| shouldSchedule.set(true); | ||
| return md5; | ||
| } | ||
| return previousMd5; | ||
| }); | ||
| return shouldSchedule.get(); | ||
| } | ||
|
|
||
| public void unmarkCQLocallyScheduled(String cqId, String md5) { | ||
|
Check warning on line 221 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
|
||
| locallyScheduledCQs.remove(cqId, md5); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using remove(cqId, md5) ensures we only unmark the entry we own after a failed submitSelf. Good defensive pattern. |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When previousMd5 is null, !md5.equals(previousMd5) schedules the CQ as expected. When previousMd5 equals md5, scheduling is skipped, which avoids duplicates.
If the same cqId is updated with a new md5, this returns true and schedules again. Please confirm in this PR that any previously running CQScheduleTask for the old md5 is stopped or superseded; otherwise two tasks could run briefly.