diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index 95e8196ad3865..f0cc462161287 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -202,7 +202,10 @@ private void collectEvent(final Event event) { enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); if (enrichedEvent.getPipeName() != null - && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { + && (pendingQueue.isEventFromDroppedPipe(enrichedEvent) + || (enrichedEvent.getCommitterKey() == null + && pendingQueue.isPipeDropped( + enrichedEvent.getPipeName(), creationTime, regionId)))) { enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName()); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index f972bba0e6ede..e35763c30123a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -360,12 +360,16 @@ public void discardAllEvents() { @Override public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public void discardEventsOfPipe(final CommitterKey committerKey) { + super.discardEventsOfPipe(committerKey); tsfileInsertEventDeque.removeIf( event -> { if (event instanceof EnrichedEvent - && isEventFromPipe( - ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { + && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 1e7c50f389e51..c855eb5714013 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -201,10 +202,9 @@ public void close() { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe( - final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + public void discardEventsOfPipe(final CommitterKey committerKey) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + inputPendingQueue.discardEventsOfPipe(committerKey); try { increaseHighPriorityTaskCount(); @@ -217,9 +217,7 @@ public void discardEventsOfPipe( // use a new thread to stop all the pipes, we will not encounter deadlock here. Or else we // will. if (lastEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) - && creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime() - && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { + && isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) { // Do not clear the last event's reference counts because it may be on transferring lastEvent = null; // Submit self to avoid that the lastEvent has been retried "max times" times and has @@ -241,9 +239,7 @@ public void discardEventsOfPipe( // clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) - && creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime() - && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) { + && isEventFromPipe((EnrichedEvent) lastExceptionEvent, committerKey)) { clearReferenceCountAndReleaseLastExceptionEvent(); } } @@ -252,11 +248,18 @@ public void discardEventsOfPipe( } if (outputPipeConnector instanceof PipeConnectorWithEventDiscard) { - ((PipeConnectorWithEventDiscard) outputPipeConnector) - .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + ((PipeConnectorWithEventDiscard) outputPipeConnector).discardEventsOfPipe(committerKey); } } + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); + } + //////////////////////////// APIs provided for metric framework //////////////////////////// public String getAttributeSortedString() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 1780f5a87efa8..61a064e1637a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.pipe.api.event.Event; @@ -86,19 +87,17 @@ public synchronized void register() { * Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be inconsistent with the * {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel connector scheduling. * - * @param pipeNameToDeregister pipe name - * @param regionId region id + * @param committerKey committer key of the pipe task to deregister * @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, indicating that the * {@link PipeSinkSubtask} should never be used again * @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister( - final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { + public synchronized boolean deregister(final CommitterKey committerKey) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } - subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId); + subtask.discardEventsOfPipe(committerKey); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index d7f81c12dbc98..817471c785ab6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; @@ -209,7 +210,10 @@ public synchronized void deregister( // Shall not be empty final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor; - lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId)); + final CommitterKey committerKey = + PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId); + + lifeCycles.removeIf(o -> o.deregister(committerKey)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index c44e12a4bbf20..7058b88575015 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; @@ -156,11 +157,13 @@ public synchronized void close() { */ public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName()) - && creationTimeToDrop == event.getCreationTime() - && regionId == event.getRegionId()) { + if (isEventFromPipe(event, committerKey)) { event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); return true; } @@ -168,6 +171,14 @@ public synchronized void discardEventsOfPipe( }); } + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); + } + public synchronized void decreaseEventsReferenceCount( final String holderMessage, final boolean shouldReport) { events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index ac5f568f1c6af..45264138596ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -197,10 +198,12 @@ public boolean isEmpty() { public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); - endPointToBatch - .values() - .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + defaultBatch.discardEventsOfPipe(committerKey); + endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 13bcb537ae1ed..81e745dc6a7ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.airgap; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.utils.RetryUtils; @@ -546,8 +547,13 @@ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index fe3d44bedb488..69dcb922e4a24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -22,8 +22,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; @@ -123,9 +123,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final Map pendingHandlers = new ConcurrentHashMap<>(); - // Pipe name, creation time, region id - private final Set> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -722,16 +720,20 @@ public boolean isEnableSendTsFileLimit() { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isDroppedPipe( - (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { + && isDroppedPipe((EnrichedEvent) event, committerKey)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -742,8 +744,7 @@ && isDroppedPipe( retryTsFileQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isDroppedPipe( - (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { + && isDroppedPipe((EnrichedEvent) event, committerKey)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -845,18 +846,14 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) { } private boolean isDroppedPipe(final EnrichedEvent event) { - return droppedPipeTaskKeys.contains( - new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId())); - } - - private static boolean isDroppedPipe( - final EnrichedEvent event, - final String pipeNameToDrop, - final long creationTimeToDrop, - final int regionId) { - return pipeNameToDrop.equals(event.getPipeName()) - && creationTimeToDrop == event.getCreationTime() - && regionId == event.getRegionId(); + return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, key)); + } + + private static boolean isDroppedPipe(final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 552b8cf1cae1d..ef3d59f0d2a54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; @@ -523,8 +524,13 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + @Override + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(committerKey); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 2a8b5c8c3c07d..0ecd1ad6e340b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; -import org.apache.iotdb.commons.pipe.datastructure.Triple; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -59,9 +59,7 @@ public class WebSocketConnectorServer extends WebSocketServer { private final ConcurrentHashMap> eventsWaitingForAck = new ConcurrentHashMap<>(); - // Pipe name, creation time, region id - private final Set> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private final BidiMap router = new DualTreeBidiMap(null, Comparator.comparing(Object::hashCode)) {}; @@ -117,33 +115,33 @@ public synchronized void unregister(WebSocketSink connector) { .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); } - droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName)); + droppedPipeTaskKeys.removeIf(key -> key.getPipeName().equals(pipeName)); } public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); final PriorityBlockingQueue eventTransferQueue = - eventsWaitingForTransfer.get(pipeNameToDrop); + eventsWaitingForTransfer.get(committerKey.getPipeName()); if (eventTransferQueue != null) { eventTransferQueue.removeIf( - eventWrapper -> - discardIfMatches(eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + eventWrapper -> discardIfMatches(eventWrapper.event, committerKey)); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } } final ConcurrentHashMap eventId2EventMap = - eventsWaitingForAck.get(pipeNameToDrop); + eventsWaitingForAck.get(committerKey.getPipeName()); if (eventId2EventMap != null) { eventId2EventMap .entrySet() - .removeIf( - entry -> - discardIfMatches( - entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); + .removeIf(entry -> discardIfMatches(entry.getValue().event, committerKey)); } } @@ -515,19 +513,13 @@ public EventWaitingForAck(WebSocketSink connector, Event event) { } } - private boolean discardIfMatches( - final Event event, - final String pipeNameToDrop, - final long creationTimeToDrop, - final int regionId) { + private boolean discardIfMatches(final Event event, final CommitterKey committerKey) { if (!(event instanceof EnrichedEvent)) { return false; } final EnrichedEvent enrichedEvent = (EnrichedEvent) event; - if (!pipeNameToDrop.equals(enrichedEvent.getPipeName()) - || creationTimeToDrop != enrichedEvent.getCreationTime() - || regionId != enrichedEvent.getRegionId()) { + if (!isEventFromPipe(enrichedEvent, committerKey)) { return false; } @@ -537,11 +529,16 @@ private boolean discardIfMatches( private boolean isDroppedPipe(final Event event) { return event instanceof EnrichedEvent - && droppedPipeTaskKeys.contains( - new Triple<>( - ((EnrichedEvent) event).getPipeName(), - ((EnrichedEvent) event).getCreationTime(), - ((EnrichedEvent) event).getRegionId())); + && droppedPipeTaskKeys.stream() + .anyMatch(key -> isEventFromPipe((EnrichedEvent) event, key)); + } + + private static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); } private boolean isQueueAvailable( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index 40fccc12c999f..7841e0199b250 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; @@ -173,6 +174,13 @@ public void discardEventsOfPipe( } } + @Override + public void discardEventsOfPipe(final CommitterKey committerKey) { + if (server != null) { + server.discardEventsOfPipe(committerKey); + } + } + public void commit(EnrichedEvent enrichedEvent) { Optional.ofNullable(enrichedEvent) .ifPresent(event -> event.decreaseReferenceCount(WebSocketSink.class.getName(), true)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index af871feaa7e0d..c24098f44fca6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.subscription.task.subtask; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor; import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask; import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtaskLifeCycle; @@ -63,8 +64,7 @@ public synchronized void register() { } @Override - public synchronized boolean deregister( - final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { + public synchronized boolean deregister(final CommitterKey committerKey) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java index f4547673eaa42..1c081888b3577 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; @@ -167,7 +168,11 @@ public synchronized void deregister( final PipeSinkSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - if (lifeCycle.deregister(pipeName, creationTime, regionId)) { + + final CommitterKey committerKey = + PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId); + + if (lifeCycle.deregister(committerKey)) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java index ddfc699721b92..2a15fb9ea181b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.pipe.api.PipeConnector; @@ -51,9 +52,10 @@ public void testDiscardEventsOfPipeDelegatesToConnector() { connector)); try { - subtask.discardEventsOfPipe("pipe", 1L, 1); + final CommitterKey committerKey = new CommitterKey("pipe", 1L, 1, -1); + subtask.discardEventsOfPipe(committerKey); - verify((PipeConnectorWithEventDiscard) connector).discardEventsOfPipe("pipe", 1L, 1); + verify((PipeConnectorWithEventDiscard) connector).discardEventsOfPipe(committerKey); } finally { subtask.close(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 8d920121363a3..c7b91f36d222b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -19,8 +19,8 @@ package org.apache.iotdb.commons.pipe.agent.task.connection; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -47,9 +47,7 @@ public abstract class BlockingPendingQueue { protected final AtomicBoolean isClosed = new AtomicBoolean(false); - // Pipe name, creation time, region id - protected final Set> droppedPipeTaskKeys = - ConcurrentHashMap.newKeySet(); + protected final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); protected BlockingPendingQueue( final BlockingQueue pendingQueue, final PipeEventCounter eventCounter) { @@ -138,12 +136,15 @@ public void discardAllEvents() { public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1)); + } + + public void discardEventsOfPipe(final CommitterKey committerKey) { + droppedPipeTaskKeys.add(committerKey); pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && isEventFromPipe( - ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { + && isEventFromPipe((EnrichedEvent) event, committerKey)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -191,16 +192,30 @@ protected static boolean isEventFromPipe( && regionId == event.getRegionId(); } + protected static boolean isEventFromPipe( + final EnrichedEvent event, final CommitterKey committerKey) { + return committerKey.getPipeName().equals(event.getPipeName()) + && committerKey.getCreationTime() == event.getCreationTime() + && committerKey.getRegionId() == event.getRegionId() + && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); + } + protected boolean isEventFromDroppedPipe(final E event) { return event instanceof EnrichedEvent && ((EnrichedEvent) event).getPipeName() != null - && isPipeDropped( - ((EnrichedEvent) event).getPipeName(), - ((EnrichedEvent) event).getCreationTime(), - ((EnrichedEvent) event).getRegionId()); + && isEventFromDroppedPipe((EnrichedEvent) event); + } + + public boolean isEventFromDroppedPipe(final EnrichedEvent event) { + return droppedPipeTaskKeys.stream().anyMatch(key -> isEventFromPipe(event, key)); } public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { - return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, regionId)); + return droppedPipeTaskKeys.stream() + .anyMatch( + key -> + key.getPipeName().equals(pipeName) + && key.getCreationTime() == creationTime + && key.getRegionId() == regionId); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index 7056b052a3ee1..9e1653a25167d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -169,6 +169,11 @@ private boolean commitSingleId( return true; } + public CommitterKey getCommitterKey( + final String pipeName, final long creationTime, final int regionId) { + return generateCommitterKey(pipeName, creationTime, regionId); + } + private CommitterKey generateCommitterKey( final String pipeName, final long creationTime, final int regionId) { return taskAgent.getCommitterKey( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java index ab4dbcf90750f..4ffc0c25ed244 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java @@ -19,7 +19,14 @@ package org.apache.iotdb.commons.pipe.sink.protocol; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; + public interface PipeConnectorWithEventDiscard { void discardEventsOfPipe(String pipeName, long creationTime, int regionId); + + default void discardEventsOfPipe(final CommitterKey committerKey) { + discardEventsOfPipe( + committerKey.getPipeName(), committerKey.getCreationTime(), committerKey.getRegionId()); + } }