Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ private void collectEvent(final Event event) {
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());

if (enrichedEvent.getPipeName() != null
&& pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) {
&& (pendingQueue.isEventFromDroppedPipe(enrichedEvent)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop logic uses isEventFromDroppedPipe when possible, and falls back to isPipeDropped only when committerKey is null. Please document when committerKey can still be null at collection time; if that window is wide, the fallback could discard events for a recreated pipe with the same name and creationTime.

|| (enrichedEvent.getCommitterKey() == null
&& pendingQueue.isPipeDropped(
enrichedEvent.getPipeName(), creationTime, regionId)))) {
enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
return tsfileInsertEventDeque.peek();
}

public synchronized void replace(

Check warning on line 202 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 113 to 64, Complexity from 15 to 14, Nesting Level from 5 to 2, Number of Variables from 24 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5c6SaOUjVO0RCkIE9X&open=AZ5c6SaOUjVO0RCkIE9X&pullRequest=17748
String dataRegionId, Set<TsFileResource> sourceFiles, List<TsFileResource> targetFiles) {

final int regionId = Integer.parseInt(dataRegionId);
Expand Down Expand Up @@ -356,12 +356,16 @@
@Override
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public void discardEventsOfPipe(final CommitterKey committerKey) {
super.discardEventsOfPipe(committerKey);
tsfileInsertEventDeque.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& isEventFromPipe(
((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) {
&& isEventFromPipe((EnrichedEvent) event, committerKey)) {
if (((EnrichedEvent) event)
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
Expand Down Expand Up @@ -201,10 +202,9 @@ public void close() {
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
* its queued events in the output pipe connector.
*/
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
public void discardEventsOfPipe(final CommitterKey committerKey) {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
inputPendingQueue.discardEventsOfPipe(committerKey);

try {
increaseHighPriorityTaskCount();
Expand All @@ -217,9 +217,7 @@ public void discardEventsOfPipe(
// use a new thread to stop all the pipes, we will not encounter deadlock here. Or else we
// will.
if (lastEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
&& creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
&& isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) {
// Do not clear the last event's reference counts because it may be on transferring
lastEvent = null;
// Submit self to avoid that the lastEvent has been retried "max times" times and has
Expand All @@ -241,9 +239,7 @@ public void discardEventsOfPipe(
// clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())
&& creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
&& isEventFromPipe((EnrichedEvent) lastExceptionEvent, committerKey)) {
clearReferenceCountAndReleaseLastExceptionEvent();
}
}
Expand All @@ -252,11 +248,18 @@ public void discardEventsOfPipe(
}

if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
((PipeConnectorWithEventDiscard) outputPipeSink)
.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
((PipeConnectorWithEventDiscard) outputPipeSink).discardEventsOfPipe(committerKey);
}
}

private static boolean isEventFromPipe(
final EnrichedEvent event, final CommitterKey committerKey) {
return committerKey.getPipeName().equals(event.getPipeName())
&& committerKey.getCreationTime() == event.getCreationTime()
&& committerKey.getRegionId() == event.getRegionId()
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restartTimes < 0 skips restart comparison for drop-all-restarts behavior. Consider a named constant (e.g. DROP_ALL_RESTARTS) instead of a magic -1, and add a test: drop pipe, recreate same name, verify new events are not discarded.

}

//////////////////////////// APIs provided for metric framework ////////////////////////////

public String getAttributeSortedString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;

import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand Down Expand Up @@ -87,19 +88,17 @@ public synchronized void register() {
* Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be inconsistent with the
* {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel connector scheduling.
*
* @param pipeNameToDeregister pipe name
* @param regionId region id
* @param committerKey committer key of the pipe task to deregister
* @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle, indicating that the
* {@link PipeSinkSubtask} should never be used again
* @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
*/
public synchronized boolean deregister(
final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) {
public synchronized boolean deregister(final CommitterKey committerKey) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException(DataNodePipeMessages.REGISTEREDTASKCOUNT_0_1);
}

subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId);
subtask.discardEventsOfPipe(committerKey);

try {
if (registeredTaskCount > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
Expand Down Expand Up @@ -211,7 +212,10 @@ public synchronized void deregister(
// Shall not be empty
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;

lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
final CommitterKey committerKey =
PipeEventCommitManager.getInstance().getCommitterKey(pipeName, creationTime, regionId);

lifeCycles.removeIf(o -> o.deregister(committerKey));

if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;

import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand Down Expand Up @@ -157,18 +158,28 @@ public synchronized void close() {
*/
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
events.removeIf(
event -> {
if (pipeNameToDrop.equals(event.getPipeName())
&& creationTimeToDrop == event.getCreationTime()
&& regionId == event.getRegionId()) {
if (isEventFromPipe(event, committerKey)) {
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
return true;
}
return false;
});
}

private static boolean isEventFromPipe(
final EnrichedEvent event, final CommitterKey committerKey) {
return committerKey.getPipeName().equals(event.getPipeName())
&& committerKey.getCreationTime() == event.getCreationTime()
&& committerKey.getRegionId() == event.getRegionId()
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
}

public synchronized void decreaseEventsReferenceCount(
final String holderMessage, final boolean shouldReport) {
events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
Expand Down Expand Up @@ -201,10 +202,12 @@ public boolean isEmpty() {

public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
endPointToBatch
.values()
.forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId));
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
defaultBatch.discardEventsOfPipe(committerKey);
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(committerKey));
}

public int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.airgap;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.utils.RetryUtils;
Expand Down Expand Up @@ -613,8 +614,13 @@ protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
Expand Down Expand Up @@ -130,9 +130,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();

// Pipe name, creation time, region id
private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
private final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet();

private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;
Expand Down Expand Up @@ -749,16 +747,20 @@ public boolean isEnableSendTsFileLimit() {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId));
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
droppedPipeTaskKeys.add(committerKey);

if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& isDroppedPipe(
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
&& isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
Expand All @@ -769,8 +771,7 @@ && isDroppedPipe(
retryTsFileQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& isDroppedPipe(
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
&& isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
Expand Down Expand Up @@ -872,18 +873,14 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
}

private boolean isDroppedPipe(final EnrichedEvent event) {
return droppedPipeTaskKeys.contains(
new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId()));
}

private static boolean isDroppedPipe(
final EnrichedEvent event,
final String pipeNameToDrop,
final long creationTimeToDrop,
final int regionId) {
return pipeNameToDrop.equals(event.getPipeName())
&& creationTimeToDrop == event.getCreationTime()
&& regionId == event.getRegionId();
return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event, key));
}

private static boolean isDroppedPipe(final EnrichedEvent event, final CommitterKey committerKey) {
return committerKey.getPipeName().equals(event.getPipeName())
&& committerKey.getCreationTime() == event.getCreationTime()
&& committerKey.getRegionId() == event.getRegionId()
&& (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
Expand Down Expand Up @@ -604,8 +605,13 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop, regionId, -1));
}

@Override
public synchronized void discardEventsOfPipe(final CommitterKey committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}

Expand Down
Loading
Loading