From dab45fb2325685cc4cde21b33115e4eb3fc48cfb Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 26 May 2026 16:15:20 +0800 Subject: [PATCH 1/7] [Pipe] Optimize memory usage --- .../db/pipe/event/common/row/PipeRow.java | 4 +- .../tablet/PipeRawTabletInsertionEvent.java | 1 + .../event/common/tablet/PipeTabletUtils.java | 248 ++++++++++++++++++ .../parser/TabletInsertionEventParser.java | 66 ++--- ...abletInsertionEventTablePatternParser.java | 6 +- ...TabletInsertionEventTreePatternParser.java | 6 +- .../TsFileInsertionEventQueryParser.java | 5 +- ...sertionEventQueryParserTabletIterator.java | 44 ++-- .../scan/TsFileInsertionEventScanParser.java | 125 ++++++--- ...sertionEventTableParserTabletIterator.java | 78 ++++-- .../request/PipeTransferTabletBatchReq.java | 6 +- .../request/PipeTransferTabletBatchReqV2.java | 8 +- .../request/PipeTransferTabletRawReq.java | 41 +-- .../request/PipeTransferTabletRawReqV2.java | 26 +- .../protocol/opcua/server/OpcUaNameSpace.java | 2 +- .../sink/util/TabletStatementConverter.java | 69 +++-- .../util/sorter/PipeInsertEventSorter.java | 3 +- .../plan/node/write/InsertTabletNode.java | 44 +++- .../write/RelationalInsertTabletNode.java | 8 +- .../statement/crud/InsertTabletStatement.java | 22 +- .../event/PipeTabletInsertionEventTest.java | 57 ++-- .../event/TsFileInsertionEventParserTest.java | 120 +++++++++ .../sink/PipeDataNodeThriftRequestTest.java | 44 ++++ .../node/write/WritePlanNodeSplitTest.java | 20 ++ 24 files changed, 859 insertions(+), 194 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java index 6d4d25b954242..2c86fd0f2abbd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java @@ -152,7 +152,9 @@ public Type getDataType(final int columnIndex) { @Override public boolean isNull(final int columnIndex) { - return bitMaps[columnIndex].isMarked(rowIndex); + return bitMaps != null + && bitMaps[columnIndex] != null + && bitMaps[columnIndex].isMarked(rowIndex); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 59a1a87b25fa8..6829f099b476a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -455,6 +455,7 @@ public boolean isAligned() { public Tablet convertToTablet() { if (!shouldParseTimeOrPattern()) { + PipeTabletUtils.compactBitMaps(tablet); return tablet; } return initEventParser().convertToTablet(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java new file mode 100644 index 0000000000000..04f33d72a9673 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.iotdb.db.i18n.DataNodePipeMessages; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public final class PipeTabletUtils { + + private PipeTabletUtils() {} + + public static final class TabletStringInternPool { + + private final Map internedStrings = new HashMap<>(); + + public String intern(final String value) { + if (Objects.isNull(value)) { + return null; + } + + final String internedValue = internedStrings.get(value); + if (Objects.nonNull(internedValue)) { + return internedValue; + } + + internedStrings.put(value, value); + return value; + } + + public void intern(final String[] values) { + if (Objects.isNull(values)) { + return; + } + + for (int i = 0; i < values.length; ++i) { + values[i] = intern(values[i]); + } + } + + public void intern(final List values) { + if (Objects.isNull(values)) { + return; + } + + for (int i = 0; i < values.size(); ++i) { + values.set(i, intern(values.get(i))); + } + } + + public Tablet intern(final Tablet tablet) { + if (Objects.isNull(tablet)) { + return null; + } + + tablet.setInsertTargetName(intern(tablet.getDeviceId())); + internMeasurementSchemas(tablet.getSchemas()); + return tablet; + } + + public void internMeasurementSchemas(final List schemas) { + if (Objects.isNull(schemas)) { + return; + } + + for (final IMeasurementSchema schema : schemas) { + if (schema instanceof MeasurementSchema) { + intern((MeasurementSchema) schema); + } + } + } + + public MeasurementSchema intern(final MeasurementSchema schema) { + if (Objects.isNull(schema)) { + return null; + } + + schema.setMeasurementName(intern(schema.getMeasurementName())); + schema.setProps(intern(schema.getProps())); + return schema; + } + + private Map intern(final Map props) { + if (Objects.isNull(props) || props.isEmpty()) { + return props; + } + + final Map internedProps = new HashMap<>(props.size()); + for (final Map.Entry entry : props.entrySet()) { + internedProps.put(intern(entry.getKey()), intern(entry.getValue())); + } + return internedProps; + } + } + + public static Tablet internTablet( + final Tablet tablet, final TabletStringInternPool tabletStringInternPool) { + return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(tablet) : tablet; + } + + public static void initEmptyBitMaps(final Tablet tablet) { + tablet.setBitMaps(new BitMap[getColumnCount(tablet)]); + } + + public static void compactBitMaps(final Tablet tablet) { + if (Objects.isNull(tablet)) { + return; + } + tablet.setBitMaps(compactBitMaps(tablet.getBitMaps(), tablet.getRowSize())); + } + + public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { + if (Objects.isNull(bitMaps)) { + return null; + } + + boolean hasMarkedBitMap = false; + for (int i = 0; i < bitMaps.length; ++i) { + if (Objects.nonNull(bitMaps[i]) + && bitMaps[i].isAllUnmarked(Math.min(rowCount, bitMaps[i].getSize()))) { + bitMaps[i] = null; + } + if (Objects.nonNull(bitMaps[i])) { + hasMarkedBitMap = true; + } + } + + return hasMarkedBitMap ? bitMaps : null; + } + + public static void markNullValue(final Tablet tablet, final int rowIndex, final int columnIndex) { + final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1); + if (Objects.isNull(bitMaps[columnIndex])) { + bitMaps[columnIndex] = new BitMap(tablet.getMaxRowNumber()); + } + bitMaps[columnIndex].mark(rowIndex); + } + + public static void putTimestamp(final Tablet tablet, final int rowIndex, final long timestamp) { + tablet.getTimestamps()[rowIndex] = timestamp; + tablet.setRowSize(Math.max(tablet.getRowSize(), rowIndex + 1)); + } + + public static void putValue( + final Tablet tablet, + final int rowIndex, + final int columnIndex, + final TSDataType dataType, + final Object value) { + switch (dataType) { + case BOOLEAN: + ((boolean[]) tablet.getValues()[columnIndex])[rowIndex] = (Boolean) value; + break; + case INT32: + ((int[]) tablet.getValues()[columnIndex])[rowIndex] = (Integer) value; + break; + case DATE: + ((LocalDate[]) tablet.getValues()[columnIndex])[rowIndex] = (LocalDate) value; + break; + case INT64: + case TIMESTAMP: + ((long[]) tablet.getValues()[columnIndex])[rowIndex] = (Long) value; + break; + case FLOAT: + ((float[]) tablet.getValues()[columnIndex])[rowIndex] = (Float) value; + break; + case DOUBLE: + ((double[]) tablet.getValues()[columnIndex])[rowIndex] = (Double) value; + break; + case TEXT: + case BLOB: + case STRING: + ((Binary[]) tablet.getValues()[columnIndex])[rowIndex] = toBinary(value); + break; + default: + throw new UnSupportedDataTypeException(DataNodePipeMessages.UNSUPPORTED + dataType); + } + } + + private static BitMap[] ensureBitMaps(final Tablet tablet, final int minColumnCount) { + final int columnCount = Math.max(getColumnCount(tablet), minColumnCount); + BitMap[] bitMaps = tablet.getBitMaps(); + if (Objects.isNull(bitMaps)) { + bitMaps = new BitMap[columnCount]; + tablet.setBitMaps(bitMaps); + } else if (bitMaps.length < columnCount) { + final BitMap[] expandedBitMaps = new BitMap[columnCount]; + System.arraycopy(bitMaps, 0, expandedBitMaps, 0, bitMaps.length); + bitMaps = expandedBitMaps; + tablet.setBitMaps(bitMaps); + } + return bitMaps; + } + + private static int getColumnCount(final Tablet tablet) { + if (Objects.nonNull(tablet.getSchemas())) { + return tablet.getSchemas().size(); + } + return Objects.nonNull(tablet.getValues()) ? tablet.getValues().length : 0; + } + + private static Binary toBinary(final Object value) { + if (Objects.isNull(value)) { + return Binary.EMPTY_VALUE; + } + if (value instanceof Binary) { + return (Binary) value; + } + if (value instanceof byte[]) { + return new Binary((byte[]) value); + } + if (value instanceof String) { + return new Binary(((String) value).getBytes(TSFileConfig.STRING_CHARSET)); + } + throw new IllegalArgumentException( + String.format("Expected Binary, byte[] or String, but was %s.", value.getClass())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java index 15fca0ef0b785..6903504129385 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.pipe.api.access.Row; @@ -169,18 +170,14 @@ protected void parse(final InsertRowNode insertRowNode) { } else { this.valueColumns[filteredColumnIndex] = filterValueColumnsByRowIndexList( - originValueDataTypes[i], - originValues[i], - rowIndexList, - true, - bitMap, // use the output bitmap since there is no bitmap in InsertRowNode - bitMap); + originValueDataTypes[i], originValues[i], rowIndexList, true, null, bitMap); } this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; } } this.rowCount = this.timestampColumn.length; + this.nullValueColumnBitmaps = PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount); if (this.rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( DataNodePipeMessages.INSERTROWNODE_IS_PARSED_TO_ZERO_ROWS_ACCORDING, @@ -202,7 +199,6 @@ protected void parse(final InsertTabletNode insertTabletNode) throws IllegalPath this.isAligned = insertTabletNode.isAligned(); final long[] originTimestampColumn = insertTabletNode.getTimes(); - final int originRowSize = originTimestampColumn.length; final List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); @@ -228,18 +224,7 @@ protected void parse(final InsertTabletNode insertTabletNode) throws IllegalPath final TsTableColumnCategory[] originColumnCategories = insertTabletNode.getColumnCategories(); final TSDataType[] originValueColumnDataTypes = insertTabletNode.getDataTypes(); final Object[] originValueColumns = insertTabletNode.getColumns(); - final BitMap[] originBitMapList = - (insertTabletNode.getBitMaps() == null - ? IntStream.range(0, originColumnSize) - .boxed() - .map(o -> new BitMap(originRowSize)) - .toArray(BitMap[]::new) - : insertTabletNode.getBitMaps()); - for (int i = 0; i < originBitMapList.length; i++) { - if (originBitMapList[i] == null) { - originBitMapList[i] = new BitMap(originRowSize); - } - } + final BitMap[] originBitMapList = insertTabletNode.getBitMaps(); for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { @@ -267,7 +252,7 @@ protected void parse(final InsertTabletNode insertTabletNode) throws IllegalPath originValueColumns[i], rowIndexList, false, - originBitMapList[i], + getBitMap(originBitMapList, i), bitMap); } this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; @@ -275,6 +260,7 @@ protected void parse(final InsertTabletNode insertTabletNode) throws IllegalPath } this.rowCount = this.timestampColumn.length; + this.nullValueColumnBitmaps = PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount); if (rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( DataNodePipeMessages.INSERTTABLETNODE_IS_PARSED_TO_ZERO_ROWS_ACCORDING, @@ -337,18 +323,7 @@ protected void parse(final Tablet tablet, final boolean isAligned) { } final Object[] originValueColumns = tablet.getValues(); // we do not reduce value columns here by origin row size - final BitMap[] originBitMapList = - tablet.getBitMaps() == null - ? IntStream.range(0, originColumnSize) - .boxed() - .map(o -> new BitMap(tablet.getMaxRowNumber())) - .toArray(BitMap[]::new) - : tablet.getBitMaps(); // We do not reduce bitmaps here by origin row size - for (int i = 0; i < originBitMapList.length; i++) { - if (originBitMapList[i] == null) { - originBitMapList[i] = new BitMap(tablet.getMaxRowNumber()); - } - } + final BitMap[] originBitMapList = tablet.getBitMaps(); for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { @@ -374,7 +349,7 @@ protected void parse(final Tablet tablet, final boolean isAligned) { originValueColumns[i], rowIndexList, false, - originBitMapList[i], + getBitMap(originBitMapList, i), bitMap); } this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; @@ -382,6 +357,7 @@ protected void parse(final Tablet tablet, final boolean isAligned) { } this.rowCount = this.timestampColumn.length; + this.nullValueColumnBitmaps = PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount); if (this.rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( DataNodePipeMessages.TABLET_IS_PARSED_TO_ZERO_ROWS_ACCORDING, @@ -443,7 +419,7 @@ private static Object filterValueColumnsByRowIndexList( : (int[]) originValueColumn; final int[] valueColumns = new int[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0; nullValueColumnBitmap.mark(i); } else { @@ -465,7 +441,7 @@ private static Object filterValueColumnsByRowIndexList( : (LocalDate[]) originValueColumn; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = EMPTY_LOCALDATE; nullValueColumnBitmap.mark(i); } else { @@ -479,7 +455,7 @@ private static Object filterValueColumnsByRowIndexList( ? new int[] {(int) originValueColumn} : (int[]) originValueColumn; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = EMPTY_LOCALDATE; nullValueColumnBitmap.mark(i); } else { @@ -499,7 +475,7 @@ private static Object filterValueColumnsByRowIndexList( : (long[]) originValueColumn; final long[] valueColumns = new long[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0L; nullValueColumnBitmap.mark(i); } else { @@ -516,7 +492,7 @@ private static Object filterValueColumnsByRowIndexList( : (float[]) originValueColumn; final float[] valueColumns = new float[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0F; nullValueColumnBitmap.mark(i); } else { @@ -533,7 +509,7 @@ private static Object filterValueColumnsByRowIndexList( : (double[]) originValueColumn; final double[] valueColumns = new double[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0D; nullValueColumnBitmap.mark(i); } else { @@ -550,7 +526,7 @@ private static Object filterValueColumnsByRowIndexList( : (boolean[]) originValueColumn; final boolean[] valueColumns = new boolean[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = false; nullValueColumnBitmap.mark(i); } else { @@ -571,7 +547,7 @@ private static Object filterValueColumnsByRowIndexList( for (int i = 0; i < rowIndexList.size(); ++i) { if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)]) || Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues()) - || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + || isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = Binary.EMPTY_VALUE; nullValueColumnBitmap.mark(i); } else { @@ -631,6 +607,14 @@ private void fillNullValue( } } + private static BitMap getBitMap(final BitMap[] bitMaps, final int index) { + return Objects.nonNull(bitMaps) && index < bitMaps.length ? bitMaps[index] : null; + } + + private static boolean isNullValue(final BitMap bitMap, final int rowIndex) { + return Objects.nonNull(bitMap) && bitMap.isMarked(rowIndex); + } + //////////////////////////// process //////////////////////////// public abstract List processRowByRow( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java index f234045007aaa..4b2a34964e854 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; @@ -141,7 +142,10 @@ public Tablet convertToTablet() { Arrays.asList(valueColumnTypes), timestampColumn, valueColumns, - nullValueColumnBitmaps, + nullValueColumnBitmaps == null + ? null + : PipeTabletUtils.compactBitMaps( + Arrays.copyOf(nullValueColumnBitmaps, nullValueColumnBitmaps.length), rowCount), rowCount); tablet = newTablet; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java index 4e2cc3102e24f..9655175759e96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.pipe.event.common.row.PipeRow; import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector; import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletCollector; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; @@ -202,7 +203,10 @@ public Tablet convertToTablet() { Arrays.asList(measurementSchemaList), timestampColumn, valueColumns, - nullValueColumnBitmaps, + nullValueColumnBitmaps == null + ? null + : PipeTabletUtils.compactBitMaps( + Arrays.copyOf(nullValueColumnBitmaps, nullValueColumnBitmaps.length), rowCount), rowCount); return tablet; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index 9069a99cbd733..2656ec7d72d4b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -79,6 +80,7 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser private final Iterator>> deviceMeasurementsMapIterator; private final Map deviceIsAlignedMap; private final Map measurementDataTypeMap; + private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); @TestOnly public TsFileInsertionEventQueryParser( @@ -418,7 +420,8 @@ public boolean hasNext() { entry.getValue(), timeFilterExpression, allocatedMemoryBlockForTablet, - currentModifications); + currentModifications, + tabletStringInternPool); } catch (final Exception e) { close(); throw new PipeException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java index 20ba62496ca79..26413d93752f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; @@ -39,6 +41,7 @@ import org.apache.tsfile.read.expression.IExpression; import org.apache.tsfile.read.expression.QueryExpression; import org.apache.tsfile.read.query.dataset.QueryDataSet; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -59,7 +62,9 @@ public class TsFileInsertionEventQueryParserTabletIterator implements Iterator measurementDataTypeMap; private final IDeviceID deviceId; + private final String deviceIdString; private final List measurements; + private final List schemas; private final IExpression timeFilterExpression; @@ -79,20 +84,29 @@ public class TsFileInsertionEventQueryParserTabletIterator implements Iterator measurements, final IExpression timeFilterExpression, final PipeMemoryBlock allocatedBlockForTablet, - final PatternTreeMap currentModifications) + final PatternTreeMap currentModifications, + final TabletStringInternPool tabletStringInternPool) throws IOException { this.tsFileReader = tsFileReader; this.measurementDataTypeMap = measurementDataTypeMap; this.deviceId = deviceId; + this.deviceIdString = tabletStringInternPool.intern(deviceId.toString()); this.measurements = measurements.stream() .filter( measurement -> // time column in aligned time-series should not be a query column measurement != null && !measurement.isEmpty()) + .map(tabletStringInternPool::intern) .sorted() .collect(Collectors.toList()); + this.schemas = new ArrayList<>(); + for (final String measurement : this.measurements) { + final TSDataType dataType = + measurementDataTypeMap.get(deviceIdString + TsFileConstant.PATH_SEPARATOR + measurement); + schemas.add(new MeasurementSchema(measurement, dataType)); + } this.timeFilterExpression = timeFilterExpression; @@ -136,20 +150,14 @@ public Tablet next() { } private Tablet buildNextTablet() throws IOException { - final List schemas = new ArrayList<>(); - for (final String measurement : measurements) { - final TSDataType dataType = - measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR + measurement); - schemas.add(new MeasurementSchema(measurement, dataType)); - } - Tablet tablet = null; if (!queryDataSet.hasNext()) { tablet = new Tablet( // Used for tree model - deviceId.toString(), schemas, 1); - tablet.initBitMaps(); + deviceIdString, schemas, 1); + PipeTabletUtils.initEmptyBitMaps(tablet); + PipeTabletUtils.compactBitMaps(tablet); return tablet; } @@ -164,8 +172,8 @@ private Tablet buildNextTablet() throws IOException { tablet = new Tablet( // Used for tree model - deviceId.toString(), schemas, rowCountAndMemorySize.getLeft()); - tablet.initBitMaps(); + deviceIdString, schemas, rowCountAndMemorySize.getLeft()); + PipeTabletUtils.initEmptyBitMaps(tablet); if (allocatedBlockForTablet.getMemoryUsageInBytes() < rowCountAndMemorySize.getRight()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedBlockForTablet, rowCountAndMemorySize.getRight()); @@ -182,17 +190,22 @@ private Tablet buildNextTablet() throws IOException { for (int i = 0; i < fieldSize; i++) { final Field field = fields.get(i); final String measurement = measurements.get(i); + final TSDataType dataType = schemas.get(i).getType(); // Check if this value is deleted by mods if (field == null || ModsOperationUtil.isDelete(rowRecord.getTimestamp(), measurementModsList.get(i))) { - tablet.getBitMaps()[i].mark(rowIndex); + if (dataType.isBinary()) { + PipeTabletUtils.putValue(tablet, rowIndex, i, dataType, Binary.EMPTY_VALUE); + } + PipeTabletUtils.markNullValue(tablet, rowIndex, i); } else { - tablet.addValue(measurement, rowIndex, field.getObjectValue(schemas.get(i).getType())); + PipeTabletUtils.putValue( + tablet, rowIndex, i, dataType, field.getObjectValue(schemas.get(i).getType())); isNeedFillTime = true; } } if (isNeedFillTime) { - tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); + PipeTabletUtils.putTimestamp(tablet, rowIndex, rowRecord.getTimestamp()); } if (tablet.getRowSize() == tablet.getMaxRowNumber()) { @@ -200,6 +213,7 @@ private Tablet buildNextTablet() throws IOException { } } + PipeTabletUtils.compactBitMaps(tablet); return tablet; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index b1f3a5a4c1480..79f92c8d13f00 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -33,6 +33,8 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -92,8 +94,10 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private boolean currentIsMultiPage; private IDeviceID currentDevice; + private String currentDeviceString; private boolean currentIsAligned; private final List currentMeasurements = new ArrayList<>(); + private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); private final List modsInfos = new ArrayList<>(); // Cached time chunk private final List timeChunkList = new ArrayList<>(); @@ -304,8 +308,9 @@ private Tablet getNextTablet() { Tablet tablet = null; if (!data.hasCurrent()) { - tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1); - tablet.initBitMaps(); + tablet = new Tablet(currentDeviceString, currentMeasurements, 1); + PipeTabletUtils.initEmptyBitMaps(tablet); + PipeTabletUtils.compactBitMaps(tablet); return tablet; } @@ -319,8 +324,8 @@ private Tablet getNextTablet() { PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data); tablet = new Tablet( - currentDevice.toString(), currentMeasurements, rowCountAndMemorySize.getLeft()); - tablet.initBitMaps(); + currentDeviceString, currentMeasurements, rowCountAndMemorySize.getLeft()); + PipeTabletUtils.initEmptyBitMaps(tablet); if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes() < rowCountAndMemorySize.getRight()) { PipeDataNodeResourceManager.memory() @@ -332,7 +337,7 @@ private Tablet getNextTablet() { final int rowIndex = tablet.getRowSize(); if (putValueToColumns(data, tablet, rowIndex)) { - tablet.addTimestamp(rowIndex, data.currentTime()); + PipeTabletUtils.putTimestamp(tablet, rowIndex, data.currentTime()); } } @@ -347,14 +352,15 @@ private Tablet getNextTablet() { } if (tablet == null) { - tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1); - tablet.initBitMaps(); + tablet = new Tablet(currentDeviceString, currentMeasurements, 1); + PipeTabletUtils.initEmptyBitMaps(tablet); } // Switch chunk reader iff current chunk is all consumed if (!data.hasCurrent()) { prepareData(); } + PipeTabletUtils.compactBitMaps(tablet); return tablet; } catch (final Exception e) { close(); @@ -412,37 +418,68 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin case TEXT: case BLOB: case STRING: - tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, tablet.getSchemas().get(i).getType(), Binary.EMPTY_VALUE); } - tablet.getBitMaps()[i].mark(rowIndex); + PipeTabletUtils.markNullValue(tablet, rowIndex, i); continue; } isNeedFillTime = true; switch (tablet.getSchemas().get(i).getType()) { case BOOLEAN: - tablet.addValue(rowIndex, i, primitiveType.getBoolean()); + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + tablet.getSchemas().get(i).getType(), + primitiveType.getBoolean()); break; case INT32: - tablet.addValue(rowIndex, i, primitiveType.getInt()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, tablet.getSchemas().get(i).getType(), primitiveType.getInt()); break; case DATE: - tablet.addValue(rowIndex, i, DateUtils.parseIntToLocalDate(primitiveType.getInt())); + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + tablet.getSchemas().get(i).getType(), + DateUtils.parseIntToLocalDate(primitiveType.getInt())); break; case INT64: case TIMESTAMP: - tablet.addValue(rowIndex, i, primitiveType.getLong()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, tablet.getSchemas().get(i).getType(), primitiveType.getLong()); break; case FLOAT: - tablet.addValue(rowIndex, i, primitiveType.getFloat()); + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + tablet.getSchemas().get(i).getType(), + primitiveType.getFloat()); break; case DOUBLE: - tablet.addValue(rowIndex, i, primitiveType.getDouble()); + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + tablet.getSchemas().get(i).getType(), + primitiveType.getDouble()); break; case TEXT: case BLOB: case STRING: - tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues()); + final Binary binary = primitiveType.getBinary(); + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + tablet.getSchemas().get(i).getType(), + Objects.isNull(binary) || Objects.isNull(binary.getValues()) + ? Binary.EMPTY_VALUE + : binary); break; default: throw new UnSupportedDataTypeException( @@ -458,28 +495,46 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin isNeedFillTime = true; switch (tablet.getSchemas().get(0).getType()) { case BOOLEAN: - tablet.addValue(rowIndex, 0, data.getBoolean()); + PipeTabletUtils.putValue( + tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(), data.getBoolean()); break; case INT32: - tablet.addValue(rowIndex, 0, data.getInt()); + PipeTabletUtils.putValue( + tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(), data.getInt()); break; case DATE: - tablet.addValue(rowIndex, 0, DateUtils.parseIntToLocalDate(data.getInt())); + PipeTabletUtils.putValue( + tablet, + rowIndex, + 0, + tablet.getSchemas().get(0).getType(), + DateUtils.parseIntToLocalDate(data.getInt())); break; case INT64: case TIMESTAMP: - tablet.addValue(rowIndex, 0, data.getLong()); + PipeTabletUtils.putValue( + tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(), data.getLong()); break; case FLOAT: - tablet.addValue(rowIndex, 0, data.getFloat()); + PipeTabletUtils.putValue( + tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(), data.getFloat()); break; case DOUBLE: - tablet.addValue(rowIndex, 0, data.getDouble()); + PipeTabletUtils.putValue( + tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(), data.getDouble()); break; case TEXT: case BLOB: case STRING: - tablet.addValue(rowIndex, 0, data.getBinary().getValues()); + final Binary binary = data.getBinary(); + PipeTabletUtils.putValue( + tablet, + rowIndex, + 0, + tablet.getSchemas().get(0).getType(), + Objects.isNull(binary) || Objects.isNull(binary.getValues()) + ? Binary.EMPTY_VALUE + : binary); break; default: throw new UnSupportedDataTypeException( @@ -542,17 +597,17 @@ private void moveToNextChunkReader() ? new ChunkReader(chunk, filter) : new SinglePageWholeChunkReader(chunk); currentIsAligned = false; + final String measurementID = + tabletStringInternPool.intern(chunkHeader.getMeasurementID()); currentMeasurements.add( new MeasurementSchema( - chunkHeader.getMeasurementID(), + measurementID, chunkHeader.getDataType(), chunkHeader.getEncodingType(), chunkHeader.getCompressionType())); modsInfos.addAll( ModsOperationUtil.initializeMeasurementMods( - currentDevice, - Collections.singletonList(chunkHeader.getMeasurementID()), - currentModifications)); + currentDevice, Collections.singletonList(measurementID), currentModifications)); return; } case MetaMarker.VALUE_CHUNK_HEADER: @@ -569,9 +624,11 @@ private void moveToNextChunkReader() } // Increase value index + final String measurementID = + tabletStringInternPool.intern(chunkHeader.getMeasurementID()); final int valueIndex = measurementIndexMap.compute( - chunkHeader.getMeasurementID(), + measurementID, (measurement, index) -> Objects.nonNull(index) ? index + 1 : 0); // Emit when encountered non-sequential value chunk, or the chunk size exceeds @@ -631,17 +688,17 @@ > getPageDataMemoryLimitInBytes()) { valueChunkSize += chunkHeader.getDataSize(); valueChunkPageMemorySize += currentValueChunkPageMemorySize; valueChunkList.add(chunk); + final String measurementID = + tabletStringInternPool.intern(chunkHeader.getMeasurementID()); currentMeasurements.add( new MeasurementSchema( - chunkHeader.getMeasurementID(), + measurementID, chunkHeader.getDataType(), chunkHeader.getEncodingType(), chunkHeader.getCompressionType())); modsInfos.addAll( ModsOperationUtil.initializeMeasurementMods( - currentDevice, - Collections.singletonList(chunkHeader.getMeasurementID()), - currentModifications)); + currentDevice, Collections.singletonList(measurementID), currentModifications)); break; } case MetaMarker.CHUNK_GROUP_HEADER: @@ -658,6 +715,10 @@ > getPageDataMemoryLimitInBytes()) { measurementIndexMap.clear(); final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; + currentDeviceString = + Objects.nonNull(currentDevice) + ? tabletStringInternPool.intern(currentDevice.toString()) + : null; break; } case MetaMarker.OPERATION_INDEX_RANGE: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index e3caecd144d3c..9f707a747c2d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -21,6 +21,8 @@ import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; @@ -52,6 +54,7 @@ import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; import java.io.IOException; import java.util.ArrayList; @@ -72,6 +75,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator> filteredTableSchemaIterator; + private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); // For memory control private final PipeMemoryBlock allocatedMemoryBlockForTablet; @@ -237,7 +241,7 @@ public boolean hasNext() { case INIT_DEVICE_META: if (filteredTableSchemaIterator != null && filteredTableSchemaIterator.hasNext()) { final Map.Entry entry = filteredTableSchemaIterator.next(); - tableName = entry.getKey(); + tableName = tabletStringInternPool.intern(entry.getKey()); final TableSchema tableSchema = entry.getValue(); // The table name has changed, set to false isSameTableName = false; @@ -257,7 +261,7 @@ public boolean hasNext() { if (schema != null && schema.getMeasurementName() != null && !schema.getMeasurementName().isEmpty()) { - final String measurementName = schema.getMeasurementName(); + final String measurementName = internMeasurementName(schema); if (ColumnCategory.TAG.equals(columnCategory)) { columnTypes.add(ColumnCategory.TAG); measurementList.add(measurementName); @@ -321,7 +325,7 @@ private Tablet buildNextTablet() { new ArrayList<>(dataTypeList), new ArrayList<>(columnTypes), rowCountAndMemorySize.getLeft()); - tablet.initBitMaps(); + PipeTabletUtils.initEmptyBitMaps(tablet); isFirstRow = false; } final int rowIndex = tablet.getRowSize(); @@ -331,7 +335,7 @@ private Tablet buildNextTablet() { if (fillMeasurementValueColumns(batchData, tablet, rowIndex)) { fillDeviceIdColumns(deviceID, tablet, rowIndex); - tablet.addTimestamp(rowIndex, batchData.currentTime()); + PipeTabletUtils.putTimestamp(tablet, rowIndex, batchData.currentTime()); } } @@ -342,9 +346,10 @@ private Tablet buildNextTablet() { if (isFirstRow) { tablet = new Tablet(tableName, measurementList, dataTypeList, columnTypes, 0); - tablet.initBitMaps(); + PipeTabletUtils.initEmptyBitMaps(tablet); } + PipeTabletUtils.compactBitMaps(tablet); return tablet; } @@ -396,14 +401,15 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta boolean hasSelectedNonNullChunk = false; for (; offset < fieldSchemaList.size(); ++offset) { final IMeasurementSchema schema = fieldSchemaList.get(offset); + final String measurementName = internMeasurementName(schema); if (isFieldDeletedByMods( - schema.getMeasurementName(), + measurementName, alignedChunkMetadata.getStartTime(), alignedChunkMetadata.getEndTime())) { continue; } - final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName()); + final IChunkMetadata metadata = valueChunkMetadataMap.get(measurementName); Chunk chunk = null; if (metadata != null) { chunk = reader.readMemChunk((ChunkMetadata) metadata); @@ -422,7 +428,7 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta hasSelectedNonNullChunk = true; } columnTypes.add(ColumnCategory.FIELD); - measurementList.add(schema.getMeasurementName()); + measurementList.add(measurementName); dataTypeList.add(schema.getType()); valueChunkList.add(chunk); hasSelectedField = true; @@ -452,7 +458,7 @@ private boolean areAllFieldsDeletedByMods( for (final IMeasurementSchema schema : fieldSchemaList) { if (!ModsOperationUtil.isAllDeletedByMods( currentDeviceID, - schema.getMeasurementName(), + internMeasurementName(schema), alignedChunkMetadata.getStartTime(), alignedChunkMetadata.getEndTime(), modifications)) { @@ -469,6 +475,13 @@ private boolean isFieldDeletedByMods( deviceID, measurementID, startTime, endTime, modifications); } + private String internMeasurementName(final IMeasurementSchema schema) { + if (schema instanceof MeasurementSchema) { + tabletStringInternPool.intern((MeasurementSchema) schema); + } + return tabletStringInternPool.intern(schema.getMeasurementName()); + } + private boolean fillMeasurementValueColumns( final BatchData data, final Tablet tablet, final int rowIndex) { final TsPrimitiveType[] primitiveTypes = @@ -488,41 +501,55 @@ private boolean fillMeasurementValueColumns( case TEXT: case BLOB: case STRING: - tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); + PipeTabletUtils.putValue(tablet, rowIndex, i, dataTypeList.get(i), Binary.EMPTY_VALUE); } - tablet.getBitMaps()[i].mark(rowIndex); + PipeTabletUtils.markNullValue(tablet, rowIndex, i); continue; } needFillTime = true; switch (dataTypeList.get(i)) { case BOOLEAN: - tablet.addValue(rowIndex, i, primitiveType.getBoolean()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, dataTypeList.get(i), primitiveType.getBoolean()); break; case INT32: - tablet.addValue(rowIndex, i, primitiveType.getInt()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, dataTypeList.get(i), primitiveType.getInt()); break; case DATE: - tablet.addValue(rowIndex, i, DateUtils.parseIntToLocalDate(primitiveType.getInt())); + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + dataTypeList.get(i), + DateUtils.parseIntToLocalDate(primitiveType.getInt())); break; case INT64: case TIMESTAMP: - tablet.addValue(rowIndex, i, primitiveType.getLong()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, dataTypeList.get(i), primitiveType.getLong()); break; case FLOAT: - tablet.addValue(rowIndex, i, primitiveType.getFloat()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, dataTypeList.get(i), primitiveType.getFloat()); break; case DOUBLE: - tablet.addValue(rowIndex, i, primitiveType.getDouble()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, dataTypeList.get(i), primitiveType.getDouble()); break; case TEXT: case BLOB: case STRING: Binary binary = primitiveType.getBinary(); - tablet.addValue( + PipeTabletUtils.putValue( + tablet, rowIndex, i, - binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() : binary.getValues()); + dataTypeList.get(i), + Objects.isNull(binary) || Objects.isNull(binary.getValues()) + ? Binary.EMPTY_VALUE + : binary); break; default: throw new UnSupportedDataTypeException( @@ -538,16 +565,19 @@ private void fillDeviceIdColumns( int i = 1; for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) { if (deviceIdSegments[i] == null) { - tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues()); - tablet.getBitMaps()[i - 1].mark(rowIndex); + PipeTabletUtils.putValue( + tablet, rowIndex, i - 1, dataTypeList.get(i - 1), Binary.EMPTY_VALUE); + PipeTabletUtils.markNullValue(tablet, rowIndex, i - 1); continue; } - tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]); + PipeTabletUtils.putValue( + tablet, rowIndex, i - 1, dataTypeList.get(i - 1), deviceIdSegments[i]); } while (i <= deviceIdSize) { - tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues()); - tablet.getBitMaps()[i - 1].mark(rowIndex); + PipeTabletUtils.putValue( + tablet, rowIndex, i - 1, dataTypeList.get(i - 1), Binary.EMPTY_VALUE); + PipeTabletUtils.markNullValue(tablet, rowIndex, i - 1); i++; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index 8d75e9864bb52..ede3370f5b0d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; @@ -34,7 +35,6 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.apache.tsfile.write.record.Tablet; import java.io.DataOutputStream; import java.io.IOException; @@ -130,6 +130,7 @@ public static PipeTransferTabletBatchReq toTPipeTransferReq( public static PipeTransferTabletBatchReq fromTPipeTransferReq( final TPipeTransferReq transferReq) { final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq(); + final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); // Binary size, for rolling upgrade ReadWriteIOUtils.readInt(transferReq.body); @@ -143,8 +144,7 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { batchReq.tabletReqs.add( - PipeTransferTabletRawReq.toTPipeTransferRawReq( - Tablet.deserialize(transferReq.body), ReadWriteIOUtils.readBool(transferReq.body))); + PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool)); } batchReq.version = transferReq.version; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java index 80550b6350f05..e7278158876f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; @@ -211,6 +212,7 @@ public static PipeTransferTabletBatchReqV2 toTPipeTransferReq( public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq( final org.apache.iotdb.service.rpc.thrift.TPipeTransferReq transferReq) { final PipeTransferTabletBatchReqV2 batchReq = new PipeTransferTabletBatchReqV2(); + final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); // Binary req, for rolling upgrade ReadWriteIOUtils.readInt(transferReq.body); @@ -220,12 +222,14 @@ public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq( batchReq.insertNodeReqs.add( PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq( (InsertNode) PlanFragment.deserializeHelper(transferReq.body, null), - ReadWriteIOUtils.readString(transferReq.body))); + tabletStringInternPool.intern(ReadWriteIOUtils.readString(transferReq.body)))); } size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { - batchReq.tabletReqs.add(PipeTransferTabletRawReqV2.toTPipeTransferRawReq(transferReq.body)); + batchReq.tabletReqs.add( + PipeTransferTabletRawReqV2.toTPipeTransferRawReq( + transferReq.body, tabletStringInternPool)); } batchReq.version = transferReq.version; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 1504b3eadb9b9..98ea83b6d7631 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; @@ -112,6 +114,27 @@ public static PipeTransferTabletRawReq toTPipeTransferRawReq( return tabletReq; } + public static PipeTransferTabletRawReq toTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { + final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); + + final int startPosition = buffer.position(); + try { + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeStatementFromTabletFormat( + buffer, false, tabletStringInternPool); + tabletReq.isAligned = insertTabletStatement.isAligned(); + tabletReq.statement = insertTabletStatement; + } catch (final Exception e) { + buffer.position(startPosition); + tabletReq.tablet = + PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); + tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer); + } + + return tabletReq; + } + /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTabletRawReq toTPipeTransferReq( @@ -135,22 +158,8 @@ public static PipeTransferTabletRawReq toTPipeTransferReq( } public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferReq transferReq) { - final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); - - final ByteBuffer buffer = transferReq.body; - final int startPosition = buffer.position(); - try { - // V1: no databaseName, readDatabaseName = false - final InsertTabletStatement insertTabletStatement = - TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false); - tabletReq.isAligned = insertTabletStatement.isAligned(); - // devicePath is already set in deserializeStatementFromTabletFormat for V1 format - tabletReq.statement = insertTabletStatement; - } catch (final Exception e) { - buffer.position(startPosition); - tabletReq.tablet = Tablet.deserialize(buffer); - tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer); - } + final PipeTransferTabletRawReq tabletReq = + toTPipeTransferRawReq(transferReq.body, new TabletStringInternPool()); tabletReq.version = transferReq.version; tabletReq.type = transferReq.type; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java index 2458e5e243f98..d395bf6cf5f26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; @@ -116,9 +118,14 @@ public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq( } public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(final ByteBuffer buffer) { + return toTPipeTransferRawReq(buffer, new TabletStringInternPool()); + } + + public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { final PipeTransferTabletRawReqV2 tabletReq = new PipeTransferTabletRawReqV2(); - tabletReq.deserializeTPipeTransferRawReq(buffer); + tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool); tabletReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW_V2.getType(); @@ -153,7 +160,7 @@ public static PipeTransferTabletRawReqV2 fromTPipeTransferReq( final TPipeTransferReq transferReq) { final PipeTransferTabletRawReqV2 tabletReq = new PipeTransferTabletRawReqV2(); - tabletReq.deserializeTPipeTransferRawReq(transferReq.body); + tabletReq.deserializeTPipeTransferRawReq(transferReq.body, new TabletStringInternPool()); tabletReq.body = transferReq.body; tabletReq.version = transferReq.version; @@ -202,11 +209,20 @@ public int hashCode() { /////////////////////////////// Util /////////////////////////////// public void deserializeTPipeTransferRawReq(final ByteBuffer buffer) { + deserializeTPipeTransferRawReq(buffer, new TabletStringInternPool()); + } + + public void deserializeTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { + final TabletStringInternPool internPool = + Objects.nonNull(tabletStringInternPool) + ? tabletStringInternPool + : new TabletStringInternPool(); final int startPosition = buffer.position(); try { // V2: read databaseName, readDatabaseName = true final InsertTabletStatement insertTabletStatement = - TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true); + TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true, internPool); this.isAligned = insertTabletStatement.isAligned(); // databaseName is already set in deserializeStatementFromTabletFormat when // readDatabaseName=true @@ -216,9 +232,9 @@ public void deserializeTPipeTransferRawReq(final ByteBuffer buffer) { // If Statement deserialization fails, fallback to Tablet format // Reset buffer position for Tablet deserialization buffer.position(startPosition); - this.tablet = Tablet.deserialize(buffer); + this.tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), internPool); this.isAligned = ReadWriteIOUtils.readBool(buffer); - this.dataBaseName = ReadWriteIOUtils.readString(buffer); + this.dataBaseName = internPool.intern(ReadWriteIOUtils.readString(buffer)); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index 917720220bf2d..58df22a20f495 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -460,7 +460,7 @@ private void transferTabletForPubSubModel( for (int rowIndex = 0; rowIndex < tablet.getRowSize(); ++rowIndex) { // Filter null value - if (tablet.getBitMaps()[columnIndex].isMarked(rowIndex)) { + if (tablet.isNull(rowIndex, columnIndex)) { continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index e8b8e36cb49c9..773d40e99d1f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.utils.PathUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; @@ -40,6 +41,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Objects; /** * Utility class for converting between InsertTabletStatement and Tablet format ByteBuffer. This @@ -76,12 +78,21 @@ private TabletStatementConverter() { */ public static InsertTabletStatement deserializeStatementFromTabletFormat( final ByteBuffer byteBuffer, final boolean readDatabaseName) throws IllegalPathException { + return deserializeStatementFromTabletFormat(byteBuffer, readDatabaseName, null); + } + + public static InsertTabletStatement deserializeStatementFromTabletFormat( + final ByteBuffer byteBuffer, + final boolean readDatabaseName, + final TabletStringInternPool tabletStringInternPool) + throws IllegalPathException { final InsertTabletStatement statement = new InsertTabletStatement(); // Calculate memory size during deserialization, use INSTANCE_SIZE constant long memorySize = InsertTabletStatement.getInstanceSize(); - final String insertTargetName = ReadWriteIOUtils.readString(byteBuffer); + final String insertTargetName = + intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool); final int rowSize = ReadWriteIOUtils.readInt(byteBuffer); @@ -118,7 +129,7 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( for (int i = 0; i < schemaSize; i++) { final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); if (hasSchema) { - final Pair pair = readMeasurement(byteBuffer); + final Pair pair = readMeasurement(byteBuffer, tabletStringInternPool); measurement[i] = pair.getLeft(); dataTypes[i] = pair.getRight(); columnCategories[i] = @@ -169,15 +180,12 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( if (isBitMapsNotNull) { // Use the method that returns both BitMap array and memory size final Pair bitMapsAndMemory = - readBitMapsFromBufferWithMemory(byteBuffer, schemaSize); + readBitMapsFromBufferWithMemory(byteBuffer, schemaSize, rowSize); bitMaps = bitMapsAndMemory.getLeft(); bitMapsMemorySize = bitMapsAndMemory.getRight(); } else { - // Calculate memory for empty BitMap array: array header + references - bitMaps = new BitMap[schemaSize]; - bitMapsMemorySize = - org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( - NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize); + bitMaps = null; + bitMapsMemorySize = 0; } // Add bitMaps memory to total @@ -217,7 +225,8 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( // Read databaseName if requested (V2 format) if (readDatabaseName) { - final String databaseName = ReadWriteIOUtils.readString(byteBuffer); + final String databaseName = + intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool); if (databaseName != null) { statement.setDatabaseName(databaseName); // Calculate memory for databaseName @@ -226,7 +235,9 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( if (PathUtils.isTableModelDatabase(databaseName)) { statement.setWriteToTable(true); // For table model, insertTargetName is table name, convert to lowercase - statement.setDevicePath(new PartialPath(insertTargetName.toLowerCase(), false)); + statement.setDevicePath( + new PartialPath( + intern(insertTargetName.toLowerCase(), tabletStringInternPool), false)); statement.setColumnCategories(columnCategories); memorySize += columnCategoriesMemorySize; @@ -269,6 +280,11 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( return deserializeStatementFromTabletFormat(byteBuffer, false); } + private static String intern( + final String value, final TabletStringInternPool tabletStringInternPool) { + return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(value) : value; + } + /** * Skip a string in ByteBuffer without reading it. This is more efficient than reading and * discarding the string. @@ -289,10 +305,13 @@ private static void skipString(final ByteBuffer buffer) { * @param buffer ByteBuffer containing serialized measurement schema * @return Pair of measurement name and data type */ - private static Pair readMeasurement(final ByteBuffer buffer) { + private static Pair readMeasurement( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { // Read measurement name and data type final Pair pair = - new Pair<>(ReadWriteIOUtils.readString(buffer), TSDataType.deserializeFrom(buffer)); + new Pair<>( + intern(ReadWriteIOUtils.readString(buffer), tabletStringInternPool), + TSDataType.deserializeFrom(buffer)); // Skip encoding type (byte) and compression type (byte) - 2 bytes total buffer.position(buffer.position() + 2); @@ -315,13 +334,11 @@ private static Pair readMeasurement(final ByteBuffer buffer) * array and the calculated memory size. */ private static Pair readBitMapsFromBufferWithMemory( - final ByteBuffer byteBuffer, final int columns) { + final ByteBuffer byteBuffer, final int columns, final int rowSize) { final BitMap[] bitMaps = new BitMap[columns]; - // Calculate memory: array header + object references - long memorySize = - org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( - NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns); + long bitMapsMemorySize = 0; + boolean hasMarkedBitMap = false; for (int i = 0; i < columns; i++) { final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); @@ -329,18 +346,30 @@ private static Pair readBitMapsFromBufferWithMemory( final int size = ReadWriteIOUtils.readInt(byteBuffer); final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer); final byte[] byteArray = valueBinary.getValues(); - bitMaps[i] = new BitMap(size, byteArray); + final BitMap bitMap = new BitMap(size, byteArray); + if (bitMap.isAllUnmarked(Math.min(rowSize, bitMap.getSize()))) { + continue; + } + bitMaps[i] = bitMap; + hasMarkedBitMap = true; // Calculate memory for this BitMap: BitMap object + byte array // BitMap shallow size + byte array (array header + array length) - memorySize += + bitMapsMemorySize += SIZE_OF_BITMAP + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + byteArray.length); } } - return new Pair<>(bitMaps, memorySize); + if (!hasMarkedBitMap) { + return new Pair<>(null, 0L); + } + return new Pair<>( + bitMaps, + bitMapsMemorySize + + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns)); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java index 46a3fc6df9429..55c7a3ea788c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.util.sorter; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.tsfile.enums.TSDataType; @@ -106,7 +107,7 @@ protected void sortAndMayDeduplicateValuesAndBitMaps() { } if (bitMapsModified) { - dataAdapter.setBitMaps(bitMaps); + dataAdapter.setBitMaps(PipeTabletUtils.compactBitMaps(bitMaps, deDuplicatedSize)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index a6289141531d5..5f5ec8d42efa1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -332,7 +332,7 @@ protected List doSplit(Map> spli protected InsertTabletNode getEmptySplit(int count) { long[] subTimes = new long[count]; Object[] values = initTabletValues(dataTypes.length, count, dataTypes); - BitMap[] newBitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count); + BitMap[] newBitMaps = initBitmapsForSplit(dataTypes.length, count); return new InsertTabletNode( getPlanNodeId(), targetPath, @@ -370,7 +370,10 @@ protected WritePlanNode generateOneSplit(Map.Entry 0 ? rowCount : times == null ? 0 : times.length; + final BitMap[] splitBitMaps = new BitMap[columnSize]; + boolean hasBitMap = false; + for (int i = 0; i < columnSize && i < this.bitMaps.length; ++i) { + if (this.bitMaps[i] != null + && !this.bitMaps[i].isAllUnmarked(Math.min(sourceRowCount, this.bitMaps[i].getSize()))) { + splitBitMaps[i] = new BitMap(rowSize); + hasBitMap = true; + } + } + return hasBitMap ? splitBitMaps : null; + } + + protected static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { + if (bitMaps == null) { + return null; + } + + boolean hasBitMap = false; + for (int i = 0; i < bitMaps.length; ++i) { + if (bitMaps[i] != null + && bitMaps[i].isAllUnmarked(Math.min(rowCount, bitMaps[i].getSize()))) { + bitMaps[i] = null; + } + if (bitMaps[i] != null) { + hasBitMap = true; + } + } + return hasBitMap ? bitMaps : null; + } + @Override public void markFailedMeasurement(int index) { if (measurements[index] == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 8d24ad7736434..03f42cd7a3831 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -180,7 +180,7 @@ public R accept(IPlanVisitor visitor, C context) { protected InsertTabletNode getEmptySplit(int count) { long[] subTimes = new long[count]; Object[] values = initTabletValues(dataTypes.length, count, dataTypes); - BitMap[] newBitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count); + BitMap[] newBitMaps = initBitmapsForSplit(dataTypes.length, count); RelationalInsertTabletNode split = new RelationalInsertTabletNode( getPlanNodeId(), @@ -442,7 +442,10 @@ private List generateOneSplitList( if (dataTypes[i] != null) { System.arraycopy(columns[i], start, subNode.columns[i], destLoc, length); } - if (subNode.bitMaps != null && this.bitMaps[i] != null) { + if (subNode.bitMaps != null + && subNode.bitMaps[i] != null + && i < this.bitMaps.length + && this.bitMaps[i] != null) { BitMap.copyOfRange(this.bitMaps[i], start, subNode.bitMaps[i], destLoc, length); } } @@ -451,6 +454,7 @@ private List generateOneSplitList( subNode.setFailedMeasurementNumber(getFailedMeasurementNumber()); subNode.setRange(locs); subNode.setDataRegionReplicaSet(entry.getKey()); + subNode.bitMaps = compactBitMaps(subNode.bitMaps, subNode.rowCount); result.add(subNode); return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index 199463055708e..786a9efcfc540 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -390,7 +390,7 @@ public List getSplitList() { statement.setMeasurementSchemas(measurementSchemas); statement.setDataTypes(dataTypes); if (this.nullBitMaps != null) { - statement.setBitMaps(copiedBitMaps); + statement.setBitMaps(compactBitMaps(copiedBitMaps, rowCount)); } statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info); insertTabletStatementList.add(statement); @@ -833,7 +833,7 @@ public Tablet convertToTablet() throws MetadataException { tabletColumnTypes, timestamps, tabletValues, - bitMaps, + compactBitMaps(bitMaps, rowSize), rowSize); } catch (final Exception e) { throw new MetadataException( @@ -900,6 +900,24 @@ private Object convertColumnToTablet( return columnValue; } + private static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { + if (bitMaps == null) { + return null; + } + + boolean hasBitMap = false; + for (int i = 0; i < bitMaps.length; ++i) { + if (bitMaps[i] != null + && bitMaps[i].isAllUnmarked(Math.min(rowCount, bitMaps[i].getSize()))) { + bitMaps[i] = null; + } + if (bitMaps[i] != null) { + hasBitMap = true; + } + } + return hasBitMap ? bitMaps : null; + } + @Override public String toString() { final int size = CommonDescriptor.getInstance().getConfig().getPathLogMaxSize(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 367578f2a400d..da3dee91caad9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -198,11 +198,6 @@ private void createTablet() { final Object[] values = new Object[schemas.length]; // create tablet for insertRowNode - BitMap[] bitMapsForInsertRowNode = new BitMap[schemas.length]; - for (int i = 0; i < schemas.length; i++) { - bitMapsForInsertRowNode[i] = new BitMap(1); - } - values[0] = new int[1]; values[1] = new long[1]; values[2] = new float[1]; @@ -228,20 +223,9 @@ private void createTablet() { } tabletForInsertRowNode = - new Tablet( - deviceId, - Arrays.asList(schemas), - new long[] {times[0]}, - values, - bitMapsForInsertRowNode, - 1); + new Tablet(deviceId, Arrays.asList(schemas), new long[] {times[0]}, values, null, 1); // create tablet for insertTabletNode - BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length]; - for (int i = 0; i < schemas.length; i++) { - bitMapsForInsertTabletNode[i] = new BitMap(times.length); - } - values[0] = new int[times.length]; values[1] = new long[times.length]; values[2] = new float[times.length]; @@ -268,13 +252,7 @@ private void createTablet() { tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas), times.length); tabletForInsertTabletNode = - new Tablet( - deviceId, - Arrays.asList(schemas), - times, - values, - bitMapsForInsertTabletNode, - times.length); + new Tablet(deviceId, Arrays.asList(schemas), times, values, null, times.length); } @Test @@ -341,6 +319,37 @@ public void convertToAlignedTabletForTest() throws Exception { Assert.assertTrue(isAligned4); } + @Test + public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception { + final BitMap[] bitMaps = new BitMap[schemas.length]; + bitMaps[0] = new BitMap(times.length); + bitMaps[1] = new BitMap(times.length); + bitMaps[1].mark(1); + + final InsertTabletNode nodeWithSparseColumn = + new InsertTabletNode( + new PlanNodeId("plannode bitmap"), + new PartialPath(deviceId), + false, + measurementIds, + dataTypes, + schemas, + times, + bitMaps, + insertTabletNode.getColumns(), + times.length); + + final Tablet tablet = + new TabletInsertionEventTreePatternParser( + nodeWithSparseColumn, new PrefixTreePattern(pattern)) + .convertToTablet(); + + Assert.assertNotNull(tablet.getBitMaps()); + Assert.assertNull(tablet.getBitMaps()[0]); + Assert.assertNotNull(tablet.getBitMaps()[1]); + Assert.assertTrue(tablet.isNull(1, 1)); + } + @Test public void convertToTabletWithFilteredRowsForTest() throws Exception { TabletInsertionEventTreePatternParser container1 = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index cdde28bce3814..07af1c09cda73 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -58,6 +58,7 @@ import org.apache.tsfile.read.common.Path; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.TsFileWriter; @@ -79,6 +80,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Objects; @@ -276,6 +278,67 @@ public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws E } } + @Test + public void testQueryParserSkipsUnnecessaryBitMaps() throws Exception { + testTreeParserSkipsUnnecessaryBitMaps(true); + } + + @Test + public void testScanParserSkipsUnnecessaryBitMaps() throws Exception { + testTreeParserSkipsUnnecessaryBitMaps(false); + } + + @Test + public void testTableParserSkipsUnnecessaryBitMaps() throws Exception { + alignedTsFile = new File("table-parser-bitmap.tsfile"); + if (alignedTsFile.exists()) { + Assert.assertTrue(alignedTsFile.delete()); + } + + final List schemaList = + Arrays.asList( + new MeasurementSchema("tag0", TSDataType.STRING), + new MeasurementSchema("dense", TSDataType.INT64), + new MeasurementSchema("sparse", TSDataType.INT64)); + final List columnNameList = Arrays.asList("tag0", "dense", "sparse"); + final List dataTypeList = + Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.INT64); + final List columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD); + + final Tablet tablet = + new Tablet("bitmap_table", columnNameList, dataTypeList, columnCategoryList, 2); + for (int rowIndex = 0; rowIndex < 2; ++rowIndex) { + tablet.addTimestamp(rowIndex, rowIndex); + tablet.addValue(rowIndex, 0, "tag-value"); + tablet.addValue(rowIndex, 1, (long) rowIndex); + tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null); + } + + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerTableSchema(new TableSchema("bitmap_table", schemaList, columnCategoryList)); + writer.writeTable(tablet); + } + + try (final TsFileInsertionEventTableParser parser = + new TsFileInsertionEventTableParser( + alignedTsFile, + new TablePattern(true, null, null), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + null, + false)) { + final Iterator iterator = parser.toTabletInsertionEvents().iterator(); + Assert.assertTrue(iterator.hasNext()); + final Tablet parsedTablet = ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet(); + assertBitMapExistence(parsedTablet, false, false, true); + Assert.assertTrue(parsedTablet.isNull(1, 2)); + Assert.assertFalse(iterator.hasNext()); + } + } + @Test public void manualTestScanParserSplitPerformance() throws Exception { Assume.assumeTrue( @@ -970,6 +1033,63 @@ private void testPartialNullValue(final boolean isQuery) throws Exception { alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE, Long.MAX_VALUE, isQuery, 4); } + private void testTreeParserSkipsUnnecessaryBitMaps(final boolean isQuery) throws Exception { + alignedTsFile = new File(isQuery ? "query-parser-bitmap.tsfile" : "scan-parser-bitmap.tsfile"); + if (alignedTsFile.exists()) { + Assert.assertTrue(alignedTsFile.delete()); + } + + final List schemaList = + Arrays.asList( + new MeasurementSchema("dense", TSDataType.INT64), + new MeasurementSchema("sparse", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.sg.d", schemaList, 2); + for (int rowIndex = 0; rowIndex < 2; ++rowIndex) { + tablet.addTimestamp(rowIndex, rowIndex); + tablet.addValue("dense", rowIndex, (long) rowIndex); + tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null); + } + + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); + writer.writeAligned(tablet); + } + + try (final TsFileInsertionEventParser parser = + isQuery + ? new TsFileInsertionEventQueryParser( + alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE, Long.MAX_VALUE, null) + : new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + final Iterator iterator = parser.toTabletInsertionEvents().iterator(); + Assert.assertTrue(iterator.hasNext()); + final Tablet parsedTablet = ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet(); + assertBitMapExistence(parsedTablet, false, true); + Assert.assertTrue(parsedTablet.isNull(1, 1)); + Assert.assertFalse(iterator.hasNext()); + } + } + + private void assertBitMapExistence( + final Tablet tablet, final boolean... expectedColumnHasBitMap) { + final BitMap[] bitMaps = tablet.getBitMaps(); + Assert.assertNotNull(bitMaps); + Assert.assertEquals(expectedColumnHasBitMap.length, bitMaps.length); + for (int i = 0; i < expectedColumnHasBitMap.length; ++i) { + if (expectedColumnHasBitMap[i]) { + Assert.assertNotNull(bitMaps[i]); + } else { + Assert.assertNull(bitMaps[i]); + } + } + } + private void generateLargeAlignedTsFile( final File tsFile, final List schemaList, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 38704ec7fea18..10573e5609dba 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -54,6 +55,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.record.Tablet; @@ -494,6 +496,28 @@ public void testPipeTransferTabletBatchReq() throws IOException { Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); } + @Test + public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames() throws IOException { + final List tabletBuffers = new ArrayList<>(); + tabletBuffers.add( + serializeTablet(createSingleValueTablet(new String("root.sg.d"), new String("s1")), false)); + tabletBuffers.add( + serializeTablet(createSingleValueTablet(new String("root.sg.d"), new String("s1")), false)); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq( + PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), tabletBuffers)); + final Pair statements = + deserializedReq.constructStatements(); + final List insertTabletStatements = + statements.getRight().getInsertTabletStatementList(); + + Assert.assertEquals(2, insertTabletStatements.size()); + Assert.assertSame( + insertTabletStatements.get(0).getMeasurements()[0], + insertTabletStatements.get(1).getMeasurements()[0]); + } + @Test public void testPipeTransferTabletBatchReqV2() throws IOException { final List insertNodeBuffers = new ArrayList<>(); @@ -770,4 +794,24 @@ public void testPipeTransferFilePieceResp() throws IOException { Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus()); Assert.assertEquals(resp.getEndWritingOffset(), deserializeResp.getEndWritingOffset()); } + + private static Tablet createSingleValueTablet(final String deviceId, final String measurement) { + final List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32)); + + final Tablet tablet = new Tablet(deviceId, schemaList, 8); + tablet.addTimestamp(0, 1); + tablet.addValue(measurement, 0, 1); + return tablet; + } + + private static ByteBuffer serializeTablet(final Tablet tablet, final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + tablet.serialize(outputStream); + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java index 3b8f972380962..01857cb5f8aa9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java @@ -47,6 +47,7 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -205,6 +206,9 @@ public void testSplitInsertTablet() throws IllegalPathException { insertTabletNode.setColumns( new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}}); insertTabletNode.setRowCount(insertTabletNode.getTimes().length); + final BitMap[] bitMaps = new BitMap[] {new BitMap(insertTabletNode.getRowCount())}; + bitMaps[0].mark(2); + insertTabletNode.setBitMaps(bitMaps); DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); dataPartitionQueryParam.setDeviceID( @@ -224,6 +228,12 @@ public void testSplitInsertTablet() throws IllegalPathException { Assert.assertEquals(tabletNode.getTimes().length, 2); TConsensusGroupId regionId = tabletNode.getDataRegionReplicaSet().getRegionId(); Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()), regionId.getId()); + if (tabletNode.getTimes()[0] == 1) { + Assert.assertNotNull(tabletNode.getBitMaps()); + Assert.assertTrue(tabletNode.getBitMaps()[0].isMarked(0)); + } else { + Assert.assertNull(tabletNode.getBitMaps()); + } } insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 2")); @@ -271,6 +281,9 @@ public void testSplitRelationalInsertTablet() throws IllegalPathException { relationalInsertTabletNode.setColumnCategories( new TsTableColumnCategory[] {TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD}); relationalInsertTabletNode.setRowCount(12); + final BitMap[] bitMaps = new BitMap[] {new BitMap(12), new BitMap(12)}; + bitMaps[1].mark(2); + relationalInsertTabletNode.setBitMaps(bitMaps); List dataPartitionQueryParamList = new ArrayList<>(); DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); @@ -300,6 +313,13 @@ public void testSplitRelationalInsertTablet() throws IllegalPathException { Assert.assertTrue(tabletNode.getTimes()[0] < tabletNode.getTimes()[1]); TConsensusGroupId regionId = tabletNode.getDataRegionReplicaSet().getRegionId(); Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()), regionId.getId()); + if (tabletNode.getTimes()[0] == 1) { + Assert.assertNotNull(tabletNode.getBitMaps()); + Assert.assertNull(tabletNode.getBitMaps()[0]); + Assert.assertTrue(tabletNode.getBitMaps()[1].isMarked(0)); + } else { + Assert.assertNull(tabletNode.getBitMaps()); + } } } From c08dfae1a7c04bad9803e7e66b7e08cc278148e3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 26 May 2026 17:36:05 +0800 Subject: [PATCH 2/7] fix --- .../event/common/tablet/PipeTabletUtils.java | 8 + .../builder/PipeTableModelTsFileBuilder.java | 4 +- .../PipeTableModelTsFileBuilderV2.java | 3 +- .../builder/PipeTreeModelTsFileBuilder.java | 4 +- .../builder/PipeTreeModelTsFileBuilderV2.java | 3 +- .../event/TsFileInsertionEventParserTest.java | 190 ++++++++++-------- 6 files changed, 129 insertions(+), 83 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java index 04f33d72a9673..11f4f187f900c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java @@ -31,6 +31,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import java.time.LocalDate; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -158,6 +159,13 @@ public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount return hasMarkedBitMap ? bitMaps : null; } + public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) { + final BitMap[] bitMaps = tablet.getBitMaps(); + return Objects.nonNull(bitMaps) + ? Arrays.copyOf(bitMaps, bitMaps.length) + : new BitMap[getColumnCount(tablet)]; + } + public static void markNullValue(final Tablet tablet, final int rowIndex, final int columnIndex) { final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1); if (Objects.isNull(bitMaps[columnIndex])) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java index c98c978988d86..161f5c07a53b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.util.builder; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.enums.ColumnCategory; @@ -243,7 +244,8 @@ private >>> T tryBestToAggr aggregatedSchemas.addAll(tablet.getSchemas()); aggregatedColumnCategories.addAll(tablet.getColumnTypes()); aggregatedValues.addAll(Arrays.asList(tablet.getValues())); - aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps())); + aggregatedBitMaps.addAll( + Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet))); // Remove the aggregated tablet tablets.pollFirst(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java index 8c89109e1eda3..fb275a1893fed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; @@ -202,7 +203,7 @@ private void writeTabletsIntoOneFile( .map(schema -> (MeasurementSchema) schema) .toArray(MeasurementSchema[]::new); Object[] values = Arrays.copyOf(tablet.getValues(), tablet.getValues().length); - BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(), tablet.getBitMaps().length); + BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet); ColumnCategory[] columnCategory = tablet.getColumnTypes().toArray(new ColumnCategory[0]); // convert date value to int refer to diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java index 34e30c99d4a79..11350b4dff341 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.util.builder; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.external.commons.io.FileUtils; @@ -230,7 +231,8 @@ private Tablet tryBestToAggregateTablets( // Aggregate the current tablet's data aggregatedSchemas.addAll(tablet.getSchemas()); aggregatedValues.addAll(Arrays.asList(tablet.getValues())); - aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps())); + aggregatedBitMaps.addAll( + Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet))); // Remove the aggregated tablet tablets.pollFirst(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java index 48e8982be80d8..07703695d16e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; @@ -146,7 +147,7 @@ private void writeTabletsIntoOneFile( .map(schema -> (MeasurementSchema) schema) .toArray(MeasurementSchema[]::new); Object[] values = Arrays.copyOf(tablet.getValues(), tablet.getValues().length); - BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(), tablet.getBitMaps().length); + BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet); // convert date value to int refer to // org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 07af1c09cda73..3ce07680ce689 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -105,6 +105,7 @@ public class TsFileInsertionEventParserTest { "iotdb.query.parser.performance.enabled"; private static final String MANUAL_TABLE_PARSER_PERFORMANCE_TEST = "iotdb.table.parser.performance.enabled"; + private static final long BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE = 1024 * 1024L; private File alignedTsFile; private File nonalignedTsFile; @@ -290,52 +291,65 @@ public void testScanParserSkipsUnnecessaryBitMaps() throws Exception { @Test public void testTableParserSkipsUnnecessaryBitMaps() throws Exception { - alignedTsFile = new File("table-parser-bitmap.tsfile"); - if (alignedTsFile.exists()) { - Assert.assertTrue(alignedTsFile.delete()); - } + final long originalPipeMaxReaderChunkSize = + PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE); - final List schemaList = - Arrays.asList( - new MeasurementSchema("tag0", TSDataType.STRING), - new MeasurementSchema("dense", TSDataType.INT64), - new MeasurementSchema("sparse", TSDataType.INT64)); - final List columnNameList = Arrays.asList("tag0", "dense", "sparse"); - final List dataTypeList = - Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.INT64); - final List columnCategoryList = - Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD); - - final Tablet tablet = - new Tablet("bitmap_table", columnNameList, dataTypeList, columnCategoryList, 2); - for (int rowIndex = 0; rowIndex < 2; ++rowIndex) { - tablet.addTimestamp(rowIndex, rowIndex); - tablet.addValue(rowIndex, 0, "tag-value"); - tablet.addValue(rowIndex, 1, (long) rowIndex); - tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null); - } + try { + alignedTsFile = new File("table-parser-bitmap.tsfile"); + if (alignedTsFile.exists()) { + Assert.assertTrue(alignedTsFile.delete()); + } - try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { - writer.registerTableSchema(new TableSchema("bitmap_table", schemaList, columnCategoryList)); - writer.writeTable(tablet); - } + final List schemaList = + Arrays.asList( + new MeasurementSchema("tag0", TSDataType.STRING), + new MeasurementSchema("dense", TSDataType.INT64), + new MeasurementSchema("sparse", TSDataType.INT64)); + final List columnNameList = Arrays.asList("tag0", "dense", "sparse"); + final List dataTypeList = + Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.INT64); + final List columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD); + + final Tablet tablet = + new Tablet("bitmap_table", columnNameList, dataTypeList, columnCategoryList, 2); + for (int rowIndex = 0; rowIndex < 2; ++rowIndex) { + tablet.addTimestamp(rowIndex, rowIndex); + tablet.addValue(rowIndex, 0, "tag-value"); + tablet.addValue(rowIndex, 1, (long) rowIndex); + tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null); + } - try (final TsFileInsertionEventTableParser parser = - new TsFileInsertionEventTableParser( - alignedTsFile, - new TablePattern(true, null, null), - Long.MIN_VALUE, - Long.MAX_VALUE, - null, - null, - null, - false)) { - final Iterator iterator = parser.toTabletInsertionEvents().iterator(); - Assert.assertTrue(iterator.hasNext()); - final Tablet parsedTablet = ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet(); - assertBitMapExistence(parsedTablet, false, false, true); - Assert.assertTrue(parsedTablet.isNull(1, 2)); - Assert.assertFalse(iterator.hasNext()); + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerTableSchema(new TableSchema("bitmap_table", schemaList, columnCategoryList)); + writer.writeTable(tablet); + } + + try (final TsFileInsertionEventTableParser parser = + new TsFileInsertionEventTableParser( + alignedTsFile, + new TablePattern(true, null, null), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + null, + false)) { + final Iterator iterator = parser.toTabletInsertionEvents().iterator(); + Assert.assertTrue(iterator.hasNext()); + final Tablet parsedTablet = + ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet(); + assertBitMapExistence(parsedTablet, false, false, true); + Assert.assertTrue(parsedTablet.isNull(1, 2)); + Assert.assertFalse(iterator.hasNext()); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); } } @@ -1034,45 +1048,63 @@ private void testPartialNullValue(final boolean isQuery) throws Exception { } private void testTreeParserSkipsUnnecessaryBitMaps(final boolean isQuery) throws Exception { - alignedTsFile = new File(isQuery ? "query-parser-bitmap.tsfile" : "scan-parser-bitmap.tsfile"); - if (alignedTsFile.exists()) { - Assert.assertTrue(alignedTsFile.delete()); - } + final long originalPipeMaxReaderChunkSize = + PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE); - final List schemaList = - Arrays.asList( - new MeasurementSchema("dense", TSDataType.INT64), - new MeasurementSchema("sparse", TSDataType.INT64)); - final Tablet tablet = new Tablet("root.sg.d", schemaList, 2); - for (int rowIndex = 0; rowIndex < 2; ++rowIndex) { - tablet.addTimestamp(rowIndex, rowIndex); - tablet.addValue("dense", rowIndex, (long) rowIndex); - tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null); - } + try { + alignedTsFile = + new File(isQuery ? "query-parser-bitmap.tsfile" : "scan-parser-bitmap.tsfile"); + if (alignedTsFile.exists()) { + Assert.assertTrue(alignedTsFile.delete()); + } - try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { - writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); - writer.writeAligned(tablet); - } + final List schemaList = + Arrays.asList( + new MeasurementSchema("dense", TSDataType.INT64), + new MeasurementSchema("sparse", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.sg.d", schemaList, 2); + for (int rowIndex = 0; rowIndex < 2; ++rowIndex) { + tablet.addTimestamp(rowIndex, rowIndex); + tablet.addValue("dense", rowIndex, (long) rowIndex); + tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null); + } - try (final TsFileInsertionEventParser parser = - isQuery - ? new TsFileInsertionEventQueryParser( - alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE, Long.MAX_VALUE, null) - : new TsFileInsertionEventScanParser( - alignedTsFile, - new PrefixTreePattern("root"), - Long.MIN_VALUE, - Long.MAX_VALUE, - null, - null, - false)) { - final Iterator iterator = parser.toTabletInsertionEvents().iterator(); - Assert.assertTrue(iterator.hasNext()); - final Tablet parsedTablet = ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet(); - assertBitMapExistence(parsedTablet, false, true); - Assert.assertTrue(parsedTablet.isNull(1, 1)); - Assert.assertFalse(iterator.hasNext()); + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); + writer.writeAligned(tablet); + } + + try (final TsFileInsertionEventParser parser = + isQuery + ? new TsFileInsertionEventQueryParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null) + : new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + final Iterator iterator = parser.toTabletInsertionEvents().iterator(); + Assert.assertTrue(iterator.hasNext()); + final Tablet parsedTablet = + ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet(); + assertBitMapExistence(parsedTablet, false, true); + Assert.assertTrue(parsedTablet.isNull(1, 1)); + Assert.assertFalse(iterator.hasNext()); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); } } From 6a7afa479917a93bbcac4306045afab7d282446b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 26 May 2026 17:37:34 +0800 Subject: [PATCH 3/7] fix --- .../db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java | 3 +-- .../db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java index 161f5c07a53b4..8f81dd4303247 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java @@ -244,8 +244,7 @@ private >>> T tryBestToAggr aggregatedSchemas.addAll(tablet.getSchemas()); aggregatedColumnCategories.addAll(tablet.getColumnTypes()); aggregatedValues.addAll(Arrays.asList(tablet.getValues())); - aggregatedBitMaps.addAll( - Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet))); + aggregatedBitMaps.addAll(Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet))); // Remove the aggregated tablet tablets.pollFirst(); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java index 11350b4dff341..844fc6042d844 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java @@ -231,8 +231,7 @@ private Tablet tryBestToAggregateTablets( // Aggregate the current tablet's data aggregatedSchemas.addAll(tablet.getSchemas()); aggregatedValues.addAll(Arrays.asList(tablet.getValues())); - aggregatedBitMaps.addAll( - Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet))); + aggregatedBitMaps.addAll(Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet))); // Remove the aggregated tablet tablets.pollFirst(); } else { From 9857642257b7d5463fe2343cd10cf1d94775bc90 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 26 May 2026 18:51:54 +0800 Subject: [PATCH 4/7] fix --- .../pipe/resource/memory/PipeMemoryWeightUtil.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index 7693c8ff51250..04eff0067e555 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -191,14 +191,14 @@ private static Pair calculateTabletRowCountAndMemoryBySize( return new Pair<>(1, 0); } - // Calculate row number according to the max size of a pipe tablet. - // "-100" is the estimated size of other data structures in a pipe tablet. + // Calculate row number according to the max size of a pipe tablet. "100" is the estimated size + // of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. - // Here we estimate the max use of int sizeLimit = - Math.min( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - (int) (inputNum * rowBytesUsed * 1.2)); + (int) + Math.min( + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), + Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) rowBytesUsed * 1.2)); int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); From c16fa230260f3e0b76677713a80b40f15eebd6bd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 26 May 2026 18:55:06 +0800 Subject: [PATCH 5/7] Fix lazy bitmap review issues --- .../db/pipe/event/common/row/PipeRow.java | 1 + .../event/common/row/PipeRowCollector.java | 6 +- .../event/common/tablet/PipeTabletUtils.java | 23 ++----- .../plan/node/write/InsertTabletNode.java | 21 +----- .../write/RelationalInsertTabletNode.java | 3 +- .../statement/crud/InsertTabletStatement.java | 23 +------ .../rescon/quotas/DefaultOperationQuota.java | 28 ++++++-- .../apache/iotdb/db/utils/BitMapUtils.java | 47 ++++++++++++++ .../quotas/DefaultOperationQuotaTest.java | 64 +++++++++++++++++++ 9 files changed, 151 insertions(+), 65 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java index 2c86fd0f2abbd..b0897ed396ae1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java @@ -153,6 +153,7 @@ public Type getDataType(final int columnIndex) { @Override public boolean isNull(final int columnIndex) { return bitMaps != null + && columnIndex < bitMaps.length && bitMaps[columnIndex] != null && bitMaps[columnIndex].isMarked(rowIndex); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index 97b21695c1742..4c399b7b01800 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; @@ -77,7 +78,7 @@ public void collectRow(Row row) { Pair rowCountAndMemorySize = PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow); tablet = new Tablet(deviceId, measurementSchemaList, rowCountAndMemorySize.getLeft()); - tablet.initBitMaps(); + PipeTabletUtils.initEmptyBitMaps(tablet); isAligned = pipeRow.isAligned(); } @@ -94,7 +95,7 @@ public void collectRow(Row row) { tablet.addValue(measurementSchemaArray[i].getMeasurementName(), rowIndex, value); } if (row.isNull(i)) { - tablet.getBitMaps()[i].mark(rowIndex); + PipeTabletUtils.markNullValue(tablet, rowIndex, i); } } @@ -105,6 +106,7 @@ public void collectRow(Row row) { private void collectTabletInsertionEvent() { if (tablet != null) { + PipeTabletUtils.compactBitMaps(tablet); // TODO: non-PipeInsertionEvent sourceEvent is not supported? final PipeInsertionEvent pipeInsertionEvent = sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) sourceEvent) : null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java index 11f4f187f900c..2fd574bdbe7c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet; import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.iotdb.db.utils.BitMapUtils; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; @@ -129,6 +130,11 @@ public static Tablet internTablet( return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(tablet) : tablet; } + /** + * Initializes the bitmap array shell without allocating per-column bitmaps. Null values must be + * recorded through {@link #markNullValue(Tablet, int, int)} so the column bitmap is allocated + * lazily. + */ public static void initEmptyBitMaps(final Tablet tablet) { tablet.setBitMaps(new BitMap[getColumnCount(tablet)]); } @@ -141,22 +147,7 @@ public static void compactBitMaps(final Tablet tablet) { } public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { - if (Objects.isNull(bitMaps)) { - return null; - } - - boolean hasMarkedBitMap = false; - for (int i = 0; i < bitMaps.length; ++i) { - if (Objects.nonNull(bitMaps[i]) - && bitMaps[i].isAllUnmarked(Math.min(rowCount, bitMaps[i].getSize()))) { - bitMaps[i] = null; - } - if (Objects.nonNull(bitMaps[i])) { - hasMarkedBitMap = true; - } - } - - return hasMarkedBitMap ? bitMaps : null; + return BitMapUtils.compactBitMaps(bitMaps, rowCount); } public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 5f5ec8d42efa1..adf14a39381db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; +import org.apache.iotdb.db.utils.BitMapUtils; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -382,7 +383,7 @@ protected WritePlanNode generateOneSplit(Map.Entry generateOneSplitList( subNode.setFailedMeasurementNumber(getFailedMeasurementNumber()); subNode.setRange(locs); subNode.setDataRegionReplicaSet(entry.getKey()); - subNode.bitMaps = compactBitMaps(subNode.bitMaps, subNode.rowCount); + subNode.bitMaps = BitMapUtils.compactBitMaps(subNode.bitMaps, subNode.rowCount); result.add(subNode); return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index 786a9efcfc540..82f5990e0375c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -43,6 +43,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.utils.BitMapUtils; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.tsfile.enums.ColumnCategory; @@ -390,7 +391,7 @@ public List getSplitList() { statement.setMeasurementSchemas(measurementSchemas); statement.setDataTypes(dataTypes); if (this.nullBitMaps != null) { - statement.setBitMaps(compactBitMaps(copiedBitMaps, rowCount)); + statement.setBitMaps(BitMapUtils.compactBitMaps(copiedBitMaps, rowCount)); } statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info); insertTabletStatementList.add(statement); @@ -833,7 +834,7 @@ public Tablet convertToTablet() throws MetadataException { tabletColumnTypes, timestamps, tabletValues, - compactBitMaps(bitMaps, rowSize), + BitMapUtils.compactBitMaps(bitMaps, rowSize), rowSize); } catch (final Exception e) { throw new MetadataException( @@ -900,24 +901,6 @@ private Object convertColumnToTablet( return columnValue; } - private static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { - if (bitMaps == null) { - return null; - } - - boolean hasBitMap = false; - for (int i = 0; i < bitMaps.length; ++i) { - if (bitMaps[i] != null - && bitMaps[i].isAllUnmarked(Math.min(rowCount, bitMaps[i].getSize()))) { - bitMaps[i] = null; - } - if (bitMaps[i] != null) { - hasBitMap = true; - } - } - return hasBitMap ? bitMaps : null; - } - @Override public String toString() { final int size = CommonDescriptor.getInstance().getConfig().getPathLogMaxSize(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java index af83d12c3079c..764d6763184ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java @@ -117,9 +117,7 @@ protected void updateEstimateConsumeQuota(int numWrites, int numReads, Statement case BATCH_INSERT: // InsertTabletStatement InsertTabletStatement insertTabletStatement = (InsertTabletStatement) s; - for (BitMap bitMap : insertTabletStatement.getBitMaps()) { - avgSize += bitMap.getSize(); - } + avgSize += calculationWrite(insertTabletStatement.getBitMaps()); break; case BATCH_INSERT_ONE_DEVICE: // InsertRowsOfOneDeviceStatement @@ -152,10 +150,12 @@ protected void updateEstimateConsumeQuota(int numWrites, int numReads, Statement for (int i = 0; i < insertMultiTabletsStatement.getInsertTabletStatementList().size(); i++) { - for (BitMap bitMap : - insertMultiTabletsStatement.getInsertTabletStatementList().get(i).getBitMaps()) { - avgSize += bitMap.getSize(); - } + avgSize += + calculationWrite( + insertMultiTabletsStatement + .getInsertTabletStatementList() + .get(i) + .getBitMaps()); } } break; @@ -179,6 +179,20 @@ private long calculationWrite(Object[] values) { return size; } + private long calculationWrite(BitMap[] bitMaps) { + if (bitMaps == null) { + return 0; + } + + long size = 0; + for (BitMap bitMap : bitMaps) { + if (bitMap != null) { + size += bitMap.getSize(); + } + } + return size; + } + private long estimateConsume(int numReqs, long avgSize) { if (numReqs > 0) { return avgSize * numReqs; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java new file mode 100644 index 0000000000000..c6e1aecbd3d2e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import org.apache.tsfile.utils.BitMap; + +import java.util.Objects; + +public final class BitMapUtils { + + private BitMapUtils() {} + + public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { + if (Objects.isNull(bitMaps)) { + return null; + } + + boolean hasMarkedBitMap = false; + for (int i = 0; i < bitMaps.length; ++i) { + if (Objects.nonNull(bitMaps[i]) + && bitMaps[i].isAllUnmarked(Math.min(rowCount, bitMaps[i].getSize()))) { + bitMaps[i] = null; + } + if (Objects.nonNull(bitMaps[i])) { + hasMarkedBitMap = true; + } + } + return hasMarkedBitMap ? bitMaps : null; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java new file mode 100644 index 0000000000000..5f11039eeb312 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.rescon.quotas; + +import org.apache.iotdb.common.rpc.thrift.TTimedQuota; +import org.apache.iotdb.common.rpc.thrift.ThrottleType; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.utils.BitMap; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; + +public class DefaultOperationQuotaTest { + + @Test + public void testCheckQuotaWithNullAndSparseBitMaps() throws Exception { + final DefaultOperationQuota quota = new DefaultOperationQuota(createQuotaLimiter()); + + final InsertTabletStatement tabletWithoutBitMaps = new InsertTabletStatement(); + tabletWithoutBitMaps.setBitMaps(null); + quota.checkQuota(1, 0, tabletWithoutBitMaps); + + final InsertTabletStatement tabletWithSparseBitMaps = new InsertTabletStatement(); + final BitMap bitMap = new BitMap(8); + bitMap.mark(0); + tabletWithSparseBitMaps.setBitMaps(new BitMap[] {null, bitMap}); + quota.checkQuota(1, 0, tabletWithSparseBitMaps); + + final InsertMultiTabletsStatement multiTabletsStatement = new InsertMultiTabletsStatement(); + multiTabletsStatement.setInsertTabletStatementList( + Arrays.asList(tabletWithoutBitMaps, tabletWithSparseBitMaps)); + quota.checkQuota(1, 0, multiTabletsStatement); + } + + private static QuotaLimiter createQuotaLimiter() { + final Map quotas = new EnumMap<>(ThrottleType.class); + for (final ThrottleType throttleType : ThrottleType.values()) { + quotas.put(throttleType, new TTimedQuota(60_000L, 1_000_000_000L)); + } + return QuotaLimiter.fromThrottle(Collections.unmodifiableMap(quotas)); + } +} From 4879c3a7c48dd837c7fbe750b930b2edec93aa46 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 26 May 2026 19:03:30 +0800 Subject: [PATCH 6/7] Remove redundant empty bitmap initialization --- .../pipe/event/common/row/PipeRowCollector.java | 17 ++++++++--------- .../event/common/tablet/PipeTabletUtils.java | 9 --------- ...InsertionEventQueryParserTabletIterator.java | 3 --- .../scan/TsFileInsertionEventScanParser.java | 4 ---- ...InsertionEventTableParserTabletIterator.java | 2 -- 5 files changed, 8 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index 4c399b7b01800..326d9c7d31ed6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -78,7 +78,6 @@ public void collectRow(Row row) { Pair rowCountAndMemorySize = PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow); tablet = new Tablet(deviceId, measurementSchemaList, rowCountAndMemorySize.getLeft()); - PipeTabletUtils.initEmptyBitMaps(tablet); isAligned = pipeRow.isAligned(); } @@ -86,14 +85,14 @@ public void collectRow(Row row) { tablet.addTimestamp(rowIndex, row.getTime()); for (int i = 0; i < row.size(); i++) { final Object value = row.getObject(i); - if (value instanceof Binary) { - tablet.addValue( - measurementSchemaArray[i].getMeasurementName(), - rowIndex, - PipeBinaryTransformer.transformToBinary((Binary) value)); - } else { - tablet.addValue(measurementSchemaArray[i].getMeasurementName(), rowIndex, value); - } + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + measurementSchemaArray[i].getType(), + value instanceof Binary + ? PipeBinaryTransformer.transformToBinary((Binary) value) + : value); if (row.isNull(i)) { PipeTabletUtils.markNullValue(tablet, rowIndex, i); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java index 2fd574bdbe7c8..0175f72104ad1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java @@ -130,15 +130,6 @@ public static Tablet internTablet( return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(tablet) : tablet; } - /** - * Initializes the bitmap array shell without allocating per-column bitmaps. Null values must be - * recorded through {@link #markNullValue(Tablet, int, int)} so the column bitmap is allocated - * lazily. - */ - public static void initEmptyBitMaps(final Tablet tablet) { - tablet.setBitMaps(new BitMap[getColumnCount(tablet)]); - } - public static void compactBitMaps(final Tablet tablet) { if (Objects.isNull(tablet)) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java index 26413d93752f8..5c7c06089fe9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java @@ -156,8 +156,6 @@ private Tablet buildNextTablet() throws IOException { new Tablet( // Used for tree model deviceIdString, schemas, 1); - PipeTabletUtils.initEmptyBitMaps(tablet); - PipeTabletUtils.compactBitMaps(tablet); return tablet; } @@ -173,7 +171,6 @@ private Tablet buildNextTablet() throws IOException { new Tablet( // Used for tree model deviceIdString, schemas, rowCountAndMemorySize.getLeft()); - PipeTabletUtils.initEmptyBitMaps(tablet); if (allocatedBlockForTablet.getMemoryUsageInBytes() < rowCountAndMemorySize.getRight()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedBlockForTablet, rowCountAndMemorySize.getRight()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 79f92c8d13f00..b18bf6255ab6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -309,8 +309,6 @@ private Tablet getNextTablet() { if (!data.hasCurrent()) { tablet = new Tablet(currentDeviceString, currentMeasurements, 1); - PipeTabletUtils.initEmptyBitMaps(tablet); - PipeTabletUtils.compactBitMaps(tablet); return tablet; } @@ -325,7 +323,6 @@ private Tablet getNextTablet() { tablet = new Tablet( currentDeviceString, currentMeasurements, rowCountAndMemorySize.getLeft()); - PipeTabletUtils.initEmptyBitMaps(tablet); if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes() < rowCountAndMemorySize.getRight()) { PipeDataNodeResourceManager.memory() @@ -353,7 +350,6 @@ private Tablet getNextTablet() { if (tablet == null) { tablet = new Tablet(currentDeviceString, currentMeasurements, 1); - PipeTabletUtils.initEmptyBitMaps(tablet); } // Switch chunk reader iff current chunk is all consumed diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index 9f707a747c2d7..5b50eb166bebb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -325,7 +325,6 @@ private Tablet buildNextTablet() { new ArrayList<>(dataTypeList), new ArrayList<>(columnTypes), rowCountAndMemorySize.getLeft()); - PipeTabletUtils.initEmptyBitMaps(tablet); isFirstRow = false; } final int rowIndex = tablet.getRowSize(); @@ -346,7 +345,6 @@ private Tablet buildNextTablet() { if (isFirstRow) { tablet = new Tablet(tableName, measurementList, dataTypeList, columnTypes, 0); - PipeTabletUtils.initEmptyBitMaps(tablet); } PipeTabletUtils.compactBitMaps(tablet); From 75c37a0bf2dfd6fb7a1708bb835893dafbc9d485 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 26 May 2026 19:19:02 +0800 Subject: [PATCH 7/7] Fix pipe tablet null bitmap update --- .../event/common/tablet/PipeTabletUtils.java | 11 ++++ .../common/tablet/PipeTabletUtilsTest.java | 52 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java index 0175f72104ad1..097d936176d93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java @@ -195,6 +195,17 @@ public static void putValue( default: throw new UnSupportedDataTypeException(DataNodePipeMessages.UNSUPPORTED + dataType); } + unmarkNullValue(tablet, rowIndex, columnIndex); + } + + private static void unmarkNullValue( + final Tablet tablet, final int rowIndex, final int columnIndex) { + final BitMap[] bitMaps = tablet.getBitMaps(); + if (Objects.nonNull(bitMaps) + && columnIndex < bitMaps.length + && Objects.nonNull(bitMaps[columnIndex])) { + bitMaps[columnIndex].unmark(rowIndex); + } } private static BitMap[] ensureBitMaps(final Tablet tablet, final int minColumnCount) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java new file mode 100644 index 0000000000000..9ef48fe52ec4b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class PipeTabletUtilsTest { + + @Test + public void testPutValueUnmarksReusedNullRow() { + final List schemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.FLOAT), + new MeasurementSchema("s2", TSDataType.FLOAT)); + final Tablet tablet = new Tablet("root.sg.d1", schemas, 2); + + PipeTabletUtils.markNullValue(tablet, 0, 0); + PipeTabletUtils.markNullValue(tablet, 0, 1); + + PipeTabletUtils.putValue(tablet, 0, 0, TSDataType.FLOAT, 1.0f); + PipeTabletUtils.putTimestamp(tablet, 0, 1L); + PipeTabletUtils.compactBitMaps(tablet); + + Assert.assertFalse(tablet.isNull(0, 0)); + Assert.assertTrue(tablet.isNull(0, 1)); + } +}