From 3fbba9bb94bbc47dc38037f9030cd28fd3afb545 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 25 May 2026 10:23:41 +0800 Subject: [PATCH] Pipe: Do not listen to tsFiles when no sources need (#17669) * assigner * Update PipeDataRegionAssigner.java * fix (cherry picked from commit 906b86f784bc60b515b3d51347b18478b103803c) --- .../assigner/PipeDataRegionAssigner.java | 48 ++++++++++- .../PipeInsertionDataNodeListener.java | 47 ++++------- .../pipe/source/PipeRealtimeExtractTest.java | 81 +++++++++++++++++++ 3 files changed, 139 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 0b4eb547144af..2375726e42772 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -56,6 +56,9 @@ public class PipeDataRegionAssigner implements Closeable { private final String dataRegionId; + private volatile int listenToTsFileSourceCount = 0; + private volatile int listenToInsertNodeSourceCount = 0; + private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); public String getDataRegionId() { @@ -194,12 +197,34 @@ private void assignToSource( }); } - public void startAssignTo(final PipeRealtimeDataRegionSource extractor) { - matcher.register(extractor); + public synchronized void startAssignTo(final PipeRealtimeDataRegionSource source) { + matcher.register(source); + if (source.isNeedListenToTsFile()) { + listenToTsFileSourceCount++; + } + if (source.isNeedListenToInsertNode()) { + listenToInsertNodeSourceCount++; + } + logSourceAssignmentChange("registered", source); + } + + public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource source) { + matcher.deregister(source); + if (source.isNeedListenToTsFile()) { + listenToTsFileSourceCount--; + } + if (source.isNeedListenToInsertNode()) { + listenToInsertNodeSourceCount--; + } + logSourceAssignmentChange("deregistered", source); } - public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) { - matcher.deregister(extractor); + public boolean shouldListenToTsFile() { + return listenToTsFileSourceCount > 0; + } + + public boolean shouldListenToInsertNode() { + return listenToInsertNodeSourceCount > 0; } public boolean notMoreSourceNeededToBeAssigned() { @@ -236,4 +261,19 @@ public int getTsFileInsertionEventCount() { public int getPipeHeartbeatEventCount() { return eventCounter.getPipeHeartbeatEventCount(); } + + private void logSourceAssignmentChange( + final String action, final PipeRealtimeDataRegionSource source) { + LOGGER.info( + "Pipe {}@{} {} realtime source on data region {} (listenToTsFile={}, listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={}, insertNodeSourceCount={}).", + source.getPipeName(), + source.getCreationTime(), + action, + dataRegionId, + source.isNeedListenToTsFile(), + source.isNeedListenToInsertNode(), + matcher.getRegisterCount(), + listenToTsFileSourceCount, + listenToInsertNodeSourceCount); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 882d4aff0d8eb..ad3586df830e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; /** * PipeInsertionEventListener is a singleton in each data node. @@ -48,23 +47,20 @@ public class PipeInsertionDataNodeListener { private final ConcurrentMap dataRegionId2Assigner = new ConcurrentHashMap<>(); - private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0); - private final AtomicInteger listenToInsertNodeSourceCount = new AtomicInteger(0); - //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( final String dataRegionId, final PipeRealtimeDataRegionSource source) { - dataRegionId2Assigner - .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId)) - .startAssignTo(source); - - if (source.isNeedListenToTsFile()) { - listenToTsFileSourceCount.incrementAndGet(); - } - if (source.isNeedListenToInsertNode()) { - listenToInsertNodeSourceCount.incrementAndGet(); - } + // Keep registration inside compute so the assigner is fully started before it becomes visible + // to concurrent listeners. + dataRegionId2Assigner.compute( + dataRegionId, + (id, assigner) -> { + final PipeDataRegionAssigner actualAssigner = + assigner == null ? new PipeDataRegionAssigner(dataRegionId) : assigner; + actualAssigner.startAssignTo(source); + return actualAssigner; + }); } public synchronized void stopListenAndAssign( @@ -79,13 +75,6 @@ public synchronized void stopListenAndAssign( assigner.stopAssignTo(source); - if (source.isNeedListenToTsFile()) { - listenToTsFileSourceCount.decrementAndGet(); - } - if (source.isNeedListenToInsertNode()) { - listenToInsertNodeSourceCount.decrementAndGet(); - } - if (assigner.notMoreSourceNeededToBeAssigned()) { // The removed assigner will is the same as the one referenced by the variable `assigner` dataRegionId2Assigner.remove(dataRegionId); @@ -104,14 +93,10 @@ public synchronized void stopListenAndAssign( public void listenToTsFile( final String dataRegionId, final TsFileResource tsFileResource, final boolean isLoaded) { - // We don't judge whether listenToTsFileSourceCount.get() == 0 here on purpose - // because sources may use tsfile events when some exceptions occur in the - // insert nodes listening process. - final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); - // only events from registered data region will be extracted - if (assigner == null) { + // only events from registered data region with tsfile listeners will be extracted + if (assigner == null || !assigner.shouldListenToTsFile()) { return; } @@ -121,14 +106,10 @@ public void listenToTsFile( public void listenToInsertNode( final String dataRegionId, final InsertNode insertNode, final TsFileResource tsFileResource) { - if (listenToInsertNodeSourceCount.get() == 0) { - return; - } - final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); - // only events from registered data region will be extracted - if (assigner == null) { + // only events from registered data region with insert listeners will be extracted + if (assigner == null || !assigner.shouldListenToInsertNode()) { return; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java index 9e5b42bb93c4d..2cba4c8eebcaa 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; @@ -39,6 +40,7 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.enums.TSDataType; @@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; public class PipeRealtimeExtractTest { @@ -268,6 +271,52 @@ public void testRealtimeExtractProcess() { } } + @Test + public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws Exception { + try (final NoTsFileRealtimeDataRegionSource extractor = + new NoTsFileRealtimeDataRegionSource()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1); + } + }); + final PipeTaskRuntimeConfiguration configuration = + new PipeTaskRuntimeConfiguration( + new PipeTaskSourceRuntimeEnvironment( + "1", + 1, + Integer.parseInt(dataRegion1), + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + + extractor.validate(new PipeParameterValidator(parameters)); + extractor.customize(parameters, configuration); + extractor.start(); + + final File dataRegionDir = + new File(tsFileDir.getPath() + File.separator + dataRegion1 + File.separator + "0"); + final boolean ignored = dataRegionDir.mkdirs(); + final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile"); + Assert.assertTrue(tsFile.createNewFile()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.updateStartTime( + new PlainDeviceID(String.join(TsFileConstant.PATH_SEPARATOR, device)), 0); + resource.close(); + + PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegion1, resource, false); + + final long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1); + while (System.currentTimeMillis() < deadline + && extractor.getObservedTsFileEventCount() == 0) { + TimeUnit.MILLISECONDS.sleep(10); + } + + Assert.assertEquals(0, extractor.getObservedTsFileEventCount()); + } + } + private Future write2DataRegion( final int writeNum, final String dataRegionId, final int startNum) { final File dataRegionDir = @@ -351,4 +400,36 @@ private Future listen( } }); } + + private static class NoTsFileRealtimeDataRegionSource extends PipeRealtimeDataRegionSource { + + private final AtomicInteger observedTsFileEventCount = new AtomicInteger(0); + + @Override + public Event supply() { + return null; + } + + @Override + protected void doExtract(final PipeRealtimeEvent event) { + if (event.getEvent() instanceof TsFileInsertionEvent) { + observedTsFileEventCount.incrementAndGet(); + } + event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(), false); + } + + @Override + public boolean isNeedListenToTsFile() { + return false; + } + + @Override + public boolean isNeedListenToInsertNode() { + return false; + } + + private int getObservedTsFileEventCount() { + return observedTsFileEventCount.get(); + } + } }