From 9dfaf5e7b93d0d676221f1e2dcdebb42d7453d44 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 21 May 2026 14:42:08 +0800 Subject: [PATCH 1/2] Pipe: Mask sensitive attributes in sink subtask display strings Use masked PipeParameters display string for logs, metrics and subtask names while keeping unmasked sorted string for internal lifecycle map keys. Also treat scp.password as a sensitive parameter. Co-authored-by: Cursor --- .../customizer/parameter/PipeParameters.java | 1 + .../subtask/sink/PipeSinkSubtaskManager.java | 24 +++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index c13f87ae5794f..f9ef2a64a8346 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -429,6 +429,7 @@ public static class ValueHider { static { KEYS.add("ssl.trust-store-pwd"); + KEYS.add("scp.password"); KEYS.add("password"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 367b92104062d..83f2a6eb82020 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -91,6 +91,7 @@ public synchronized String register( final int sinkNum; boolean realTimeFirst = false; String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + final String attributeDisplayString = generateAttributeDisplayString(pipeSinkParameters); if (isDataRegionSink) { sinkNum = pipeSinkParameters.getIntOrDefault( @@ -119,7 +120,9 @@ public synchronized String register( sinkNum = 1; attributeSortedString = "schema_" + attributeSortedString; } - environment.setAttributeSortedString(attributeSortedString); + final String attributeDisplayStringWithPrefix = + isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString; + environment.setAttributeSortedString(attributeDisplayStringWithPrefix); if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { final PipeSinkSubtaskExecutor executor = executorSupplier.get(); @@ -168,9 +171,10 @@ public synchronized String register( final PipeSinkSubtask pipeSinkSubtask = new PipeSinkSubtask( String.format( - "%s_%s_%s", attributeSortedString, environment.getCreationTime(), sinkIndex), + "%s_%s_%s", + attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex), environment.getCreationTime(), - attributeSortedString, + attributeDisplayStringWithPrefix, sinkIndex, pendingQueue, pipeSink); @@ -181,7 +185,7 @@ public synchronized String register( LOGGER.info( DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED, - attributeSortedString, + attributeDisplayStringWithPrefix, executor.getWorkingThreadName(), executor.getCallbackThreadName()); attributeSortedString2SubtaskLifeCycleMap.put( @@ -264,13 +268,23 @@ public UnboundedBlockingPendingQueue getPipeSinkPendingQueue( .getPendingQueue(); } - private String generateAttributeSortedString(final PipeParameters pipeConnectorParameters) { + private static String generateAttributeSortedString( + final PipeParameters pipeConnectorParameters) { final TreeMap sortedStringSourceMap = new TreeMap<>(pipeConnectorParameters.getAttribute()); sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY); return sortedStringSourceMap.toString(); } + /** Masked attribute string for logs, metrics and exception messages. */ + private static String generateAttributeDisplayString( + final PipeParameters pipeConnectorParameters) { + final TreeMap filteredAttributes = + new TreeMap<>(pipeConnectorParameters.getAttribute()); + filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY); + return new PipeParameters(filteredAttributes).toString(); + } + ///////////////////////// Singleton Instance Holder ///////////////////////// private PipeSinkSubtaskManager() { From 7f12308da4cd979c753e689d93ae6b2e4292bd08 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Mon, 25 May 2026 15:05:58 +0800 Subject: [PATCH 2/2] Pipe: Fix sink compression timer keying and masked error paths Key compressionTimerMap by per-subtask taskID instead of masked attribute string to avoid collisions when only sensitive fields differ. Use masked display strings in subtask-not-found exceptions and pass sinkTaskId from runtime environment to IoTDB sinks for timer lookup. Co-authored-by: Cursor --- .../subtask/sink/PipeSinkSubtaskManager.java | 34 +++++++++++++++---- .../sink/PipeDataRegionSinkMetrics.java | 10 +++--- .../airgap/IoTDBDataRegionAirGapSink.java | 5 ++- .../async/IoTDBDataRegionAsyncSink.java | 5 ++- .../thrift/sync/IoTDBDataRegionSyncSink.java | 5 ++- .../env/PipeTaskSinkRuntimeEnvironment.java | 9 +++++ .../commons/pipe/sink/protocol/IoTDBSink.java | 7 ++-- 7 files changed, 52 insertions(+), 23 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 83f2a6eb82020..ca2d5820ec956 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -63,6 +63,8 @@ public class PipeSinkSubtaskManager { private final Map> attributeSortedString2SubtaskLifeCycleMap = new HashMap<>(); + private final Map attributeSortedString2DisplayString = new HashMap<>(); + public synchronized String register( final Supplier executorSupplier, final PipeParameters pipeSinkParameters, @@ -140,6 +142,12 @@ public synchronized String register( } for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) { + final String taskID = + String.format( + "%s_%s_%s", + attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex); + environment.setSinkTaskId(taskID); + final PipeConnector pipeSink = isDataRegionSink ? PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters) @@ -170,9 +178,7 @@ public synchronized String register( // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle final PipeSinkSubtask pipeSinkSubtask = new PipeSinkSubtask( - String.format( - "%s_%s_%s", - attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex), + taskID, environment.getCreationTime(), attributeDisplayStringWithPrefix, sinkIndex, @@ -190,6 +196,8 @@ public synchronized String register( executor.getCallbackThreadName()); attributeSortedString2SubtaskLifeCycleMap.put( attributeSortedString, pipeSinkSubtaskLifeCycleList); + attributeSortedString2DisplayString.put( + attributeSortedString, attributeDisplayStringWithPrefix); } for (final PipeSinkSubtaskLifeCycle lifeCycle : @@ -206,7 +214,7 @@ public synchronized void deregister( final int regionId, final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); + throwNoSuchSubtaskException(attributeSortedString); } final List lifeCycles = @@ -219,6 +227,7 @@ public synchronized void deregister( if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); + attributeSortedString2DisplayString.remove(attributeSortedString); executor.shutdown(); LOGGER.info( DataNodePipeMessages.THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN, @@ -234,7 +243,7 @@ public synchronized void deregister( public synchronized void start(final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); + throwNoSuchSubtaskException(attributeSortedString); } for (final PipeSinkSubtaskLifeCycle lifeCycle : @@ -245,7 +254,7 @@ public synchronized void start(final String attributeSortedString) { public synchronized void stop(final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { - throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); + throwNoSuchSubtaskException(attributeSortedString); } for (final PipeSinkSubtaskLifeCycle lifeCycle : @@ -258,7 +267,8 @@ public UnboundedBlockingPendingQueue getPipeSinkPendingQueue( final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { throw new PipeException( - DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + attributeSortedString); + DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + + getDisplayStringForException(attributeSortedString)); } // All subtasks share the same pending queue @@ -285,6 +295,16 @@ private static String generateAttributeDisplayString( return new PipeParameters(filteredAttributes).toString(); } + private void throwNoSuchSubtaskException(final String attributeSortedString) { + throw new PipeException( + FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + + getDisplayStringForException(attributeSortedString)); + } + + private String getDisplayStringForException(final String attributeSortedString) { + return attributeSortedString2DisplayString.getOrDefault(attributeSortedString, "unknown"); + } + ///////////////////////// Singleton Instance Holder ///////////////////////// private PipeSinkSubtaskManager() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java index dd7707d1b9685..9b2f876ff2d20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java @@ -199,8 +199,8 @@ private void createRate(final String taskID) { private void createTimer(final String taskID) { final PipeSinkSubtask sink = sinkMap.get(taskID); - compressionTimerMap.putIfAbsent( - sink.getAttributeSortedString(), + compressionTimerMap.put( + taskID, metricService.getOrCreateTimer( Metric.PIPE_COMPRESSION_TIME.toString(), MetricLevel.IMPORTANT, @@ -394,7 +394,7 @@ private void removeTimer(final String taskID) { sink.getAttributeSortedString(), Tag.CREATION_TIME.toString(), String.valueOf(sink.getCreationTime())); - compressionTimerMap.remove(sink.getAttributeSortedString()); + compressionTimerMap.remove(taskID); } private void removeHistogram(final String taskID) { @@ -492,8 +492,8 @@ public void markPipeHeartbeatEvent(final String taskID) { rate.mark(); } - public Timer getCompressionTimer(final String attributeSortedString) { - return Objects.isNull(metricService) ? null : compressionTimerMap.get(attributeSortedString); + public Timer getCompressionTimer(final String taskID) { + return Objects.isNull(metricService) ? null : compressionTimerMap.get(taskID); } //////////////////////////// singleton //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 649ef35c4ce0c..d8c6624f75501 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -603,9 +603,8 @@ protected byte[] getTransferMultiFilePieceBytes( @Override protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException { - if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { - compressionTimer = - PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString); + if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { + compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId); } return super.compressIfNeeded(reqInBytes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 9adbcf6cf16d1..c35320338ff9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -510,9 +510,8 @@ private void transferBatchedEventsIfNecessary() throws IOException, WriteProcess @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { - if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { - compressionTimer = - PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString); + if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { + compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId); } return super.compressIfNeeded(req); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 5e6297d843851..8352f987d7996 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -594,9 +594,8 @@ private void doTransfer( @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { - if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { - compressionTimer = - PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString); + if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { + compressionTimer = PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId); } return super.compressIfNeeded(req); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java index b838289134882..26081d9c78a6a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java @@ -21,6 +21,7 @@ public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment { private String attributeSortedString; + private String sinkTaskId; public PipeTaskSinkRuntimeEnvironment( final String pipeName, final long creationTime, final int regionId) { @@ -34,4 +35,12 @@ public String getAttributeSortedString() { public void setAttributeSortedString(String attributeSortedString) { this.attributeSortedString = attributeSortedString; } + + public String getSinkTaskId() { + return sinkTaskId; + } + + public void setSinkTaskId(final String sinkTaskId) { + this.sinkTaskId = sinkTaskId; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index a52779650f82a..b5662aeec2ce9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -189,6 +189,7 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent private final AtomicLong totalUncompressedSize = new AtomicLong(0); private final AtomicLong totalCompressedSize = new AtomicLong(0); protected String attributeSortedString; + protected String sinkTaskId; protected Timer compressionTimer; protected boolean isRealtimeFirst; @@ -391,8 +392,10 @@ public void customize( throws Exception { final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment(); if (environment instanceof PipeTaskSinkRuntimeEnvironment) { - attributeSortedString = - ((PipeTaskSinkRuntimeEnvironment) environment).getAttributeSortedString(); + final PipeTaskSinkRuntimeEnvironment sinkEnvironment = + (PipeTaskSinkRuntimeEnvironment) environment; + attributeSortedString = sinkEnvironment.getAttributeSortedString(); + sinkTaskId = sinkEnvironment.getSinkTaskId(); } nodeUrls.clear();