diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java index 33bf0a5925c39..a8579b32bb771 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java @@ -152,7 +152,10 @@ public Type getDataType(final int columnIndex) { @Override public boolean isNull(final int columnIndex) { - return bitMaps[columnIndex].isMarked(rowIndex); + return bitMaps != null + && columnIndex < bitMaps.length + && bitMaps[columnIndex] != null + && bitMaps[columnIndex].isMarked(rowIndex); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index 42560f23a5c59..c26b05f6756b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; @@ -66,27 +67,26 @@ public void collectRow(Row row) { Pair rowCountAndMemorySize = PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow); tablet = new Tablet(deviceId, measurementSchemaList, rowCountAndMemorySize.getLeft()); - tablet.initBitMaps(); isAligned = pipeRow.isAligned(); } final int rowIndex = tablet.rowSize; - tablet.addTimestamp(rowIndex, row.getTime()); + PipeTabletUtils.putTimestamp(tablet, rowIndex, row.getTime()); for (int i = 0; i < row.size(); i++) { final Object value = row.getObject(i); - if (value instanceof org.apache.iotdb.pipe.api.type.Binary) { - tablet.addValue( - measurementSchemaArray[i].getMeasurementId(), - rowIndex, - PipeBinaryTransformer.transformToBinary((org.apache.iotdb.pipe.api.type.Binary) value)); - } else { - tablet.addValue(measurementSchemaArray[i].getMeasurementId(), rowIndex, value); - } + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + measurementSchemaArray[i].getType(), + value instanceof org.apache.iotdb.pipe.api.type.Binary + ? PipeBinaryTransformer.transformToBinary( + (org.apache.iotdb.pipe.api.type.Binary) value) + : value); if (row.isNull(i)) { - tablet.bitMaps[i].mark(rowIndex); + PipeTabletUtils.markNullValue(tablet, rowIndex, i); } } - tablet.rowSize++; if (tablet.rowSize == tablet.getMaxRowNumber()) { collectTabletInsertionEvent(); @@ -95,6 +95,7 @@ public void collectRow(Row row) { private void collectTabletInsertionEvent() { if (tablet != null) { + PipeTabletUtils.compactBitMaps(tablet); tabletInsertionEventList.add( new PipeRawTabletInsertionEvent( tablet, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index d322291934ffe..261e3c7a8b324 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -323,6 +323,7 @@ public boolean isAligned() { public Tablet convertToTablet() { if (!shouldParseTimeOrPattern()) { + PipeTabletUtils.compactBitMaps(tablet); return tablet; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java new file mode 100644 index 0000000000000..0a6b073b5b6ca --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.iotdb.db.utils.BitMapUtils; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.time.LocalDate; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public final class PipeTabletUtils { + + private PipeTabletUtils() {} + + public static final class TabletStringInternPool { + + private final Map internedStrings = new HashMap<>(); + + public String intern(final String value) { + if (Objects.isNull(value)) { + return null; + } + + final String internedValue = internedStrings.get(value); + if (Objects.nonNull(internedValue)) { + return internedValue; + } + + internedStrings.put(value, value); + return value; + } + + public void intern(final String[] values) { + if (Objects.isNull(values)) { + return; + } + + for (int i = 0; i < values.length; ++i) { + values[i] = intern(values[i]); + } + } + + public void intern(final List values) { + if (Objects.isNull(values)) { + return; + } + + for (int i = 0; i < values.size(); ++i) { + values.set(i, intern(values.get(i))); + } + } + + public Tablet intern(final Tablet tablet) { + if (Objects.isNull(tablet)) { + return null; + } + + tablet.setDeviceId(intern(tablet.deviceId)); + internMeasurementSchemas(tablet.getSchemas()); + return tablet; + } + + public void internMeasurementSchemas(final List schemas) { + if (Objects.isNull(schemas)) { + return; + } + + for (final MeasurementSchema schema : schemas) { + intern(schema); + } + } + + public MeasurementSchema intern(final MeasurementSchema schema) { + if (Objects.isNull(schema)) { + return null; + } + + schema.setMeasurementId(intern(schema.getMeasurementId())); + schema.setProps(intern(schema.getProps())); + return schema; + } + + private Map intern(final Map props) { + if (Objects.isNull(props) || props.isEmpty()) { + return props; + } + + final Map internedProps = new HashMap<>(props.size()); + for (final Map.Entry entry : props.entrySet()) { + internedProps.put(intern(entry.getKey()), intern(entry.getValue())); + } + return internedProps; + } + } + + public static Tablet internTablet( + final Tablet tablet, final TabletStringInternPool tabletStringInternPool) { + return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(tablet) : tablet; + } + + public static void compactBitMaps(final Tablet tablet) { + if (Objects.isNull(tablet)) { + return; + } + tablet.bitMaps = compactBitMaps(tablet.bitMaps, tablet.rowSize); + } + + public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { + return BitMapUtils.compactBitMaps(bitMaps, rowCount); + } + + public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) { + final BitMap[] bitMaps = tablet.bitMaps; + return Objects.nonNull(bitMaps) + ? Arrays.copyOf(bitMaps, bitMaps.length) + : new BitMap[getColumnCount(tablet)]; + } + + public static void markNullValue(final Tablet tablet, final int rowIndex, final int columnIndex) { + final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1); + if (Objects.isNull(bitMaps[columnIndex])) { + bitMaps[columnIndex] = new BitMap(tablet.getMaxRowNumber()); + } + bitMaps[columnIndex].mark(rowIndex); + } + + public static void putTimestamp(final Tablet tablet, final int rowIndex, final long timestamp) { + tablet.timestamps[rowIndex] = timestamp; + tablet.rowSize = Math.max(tablet.rowSize, rowIndex + 1); + } + + public static void putValue( + final Tablet tablet, + final int rowIndex, + final int columnIndex, + final TSDataType dataType, + final Object value) { + switch (dataType) { + case BOOLEAN: + ((boolean[]) tablet.values[columnIndex])[rowIndex] = (Boolean) value; + break; + case INT32: + ((int[]) tablet.values[columnIndex])[rowIndex] = (Integer) value; + break; + case DATE: + ((LocalDate[]) tablet.values[columnIndex])[rowIndex] = (LocalDate) value; + break; + case INT64: + case TIMESTAMP: + ((long[]) tablet.values[columnIndex])[rowIndex] = (Long) value; + break; + case FLOAT: + ((float[]) tablet.values[columnIndex])[rowIndex] = (Float) value; + break; + case DOUBLE: + ((double[]) tablet.values[columnIndex])[rowIndex] = (Double) value; + break; + case TEXT: + case BLOB: + case STRING: + ((Binary[]) tablet.values[columnIndex])[rowIndex] = toBinary(value); + break; + default: + throw new UnSupportedDataTypeException("Unsupported data type: " + dataType); + } + unmarkNullValue(tablet, rowIndex, columnIndex); + } + + private static void unmarkNullValue( + final Tablet tablet, final int rowIndex, final int columnIndex) { + final BitMap[] bitMaps = tablet.bitMaps; + if (Objects.nonNull(bitMaps) + && columnIndex < bitMaps.length + && Objects.nonNull(bitMaps[columnIndex])) { + bitMaps[columnIndex].unmark(rowIndex); + } + } + + private static BitMap[] ensureBitMaps(final Tablet tablet, final int minColumnCount) { + final int columnCount = Math.max(getColumnCount(tablet), minColumnCount); + BitMap[] bitMaps = tablet.bitMaps; + if (Objects.isNull(bitMaps)) { + bitMaps = new BitMap[columnCount]; + tablet.bitMaps = bitMaps; + } else if (bitMaps.length < columnCount) { + final BitMap[] expandedBitMaps = new BitMap[columnCount]; + System.arraycopy(bitMaps, 0, expandedBitMaps, 0, bitMaps.length); + bitMaps = expandedBitMaps; + tablet.bitMaps = bitMaps; + } + return bitMaps; + } + + private static int getColumnCount(final Tablet tablet) { + if (Objects.nonNull(tablet.getSchemas())) { + return tablet.getSchemas().size(); + } + return Objects.nonNull(tablet.values) ? tablet.values.length : 0; + } + + private static Binary toBinary(final Object value) { + if (Objects.isNull(value)) { + return Binary.EMPTY_VALUE; + } + if (value instanceof Binary) { + return (Binary) value; + } + if (value instanceof byte[]) { + return new Binary((byte[]) value); + } + if (value instanceof String) { + return new Binary(((String) value).getBytes(TSFileConfig.STRING_CHARSET)); + } + throw new IllegalArgumentException( + String.format("Expected Binary, byte[] or String, but was %s.", value.getClass())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java index 133dbb5bff8a0..d8c2bccaa97c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java @@ -198,6 +198,7 @@ private void parse(final InsertRowNode insertRowNode, final PipePattern pattern) } this.rowCount = this.timestampColumn.length; + this.nullValueColumnBitmaps = PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount); if (this.rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( "InsertRowNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", @@ -217,7 +218,6 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa this.isAligned = insertTabletNode.isAligned(); final long[] originTimestampColumn = insertTabletNode.getTimes(); - final int originRowSize = originTimestampColumn.length; final List rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); @@ -243,18 +243,7 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa final String[] originColumnNameStringList = insertTabletNode.getMeasurements(); final TSDataType[] originValueColumnTypes = insertTabletNode.getDataTypes(); final Object[] originValueColumns = insertTabletNode.getColumns(); - final BitMap[] originBitMapList = - (insertTabletNode.getBitMaps() == null - ? IntStream.range(0, originColumnSize) - .boxed() - .map(o -> new BitMap(originRowSize)) - .toArray(BitMap[]::new) - : insertTabletNode.getBitMaps()); - for (int i = 0; i < originBitMapList.length; i++) { - if (originBitMapList[i] == null) { - originBitMapList[i] = new BitMap(originRowSize); - } - } + final BitMap[] originBitMapList = insertTabletNode.getBitMaps(); for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { @@ -277,7 +266,7 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa originValueColumns[i], rowIndexList, false, - originBitMapList[i], + getBitMap(originBitMapList, i), bitMap); } this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; @@ -285,6 +274,7 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa } this.rowCount = this.timestampColumn.length; + this.nullValueColumnBitmaps = PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount); if (rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( "InsertTabletNode({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", @@ -338,18 +328,7 @@ private void parse(final Tablet tablet, final boolean isAligned, final PipePatte } final Object[] originValueColumns = tablet.values; // we do not reduce value columns here by origin row size - final BitMap[] originBitMapList = - tablet.bitMaps == null - ? IntStream.range(0, originColumnSize) - .boxed() - .map(o -> new BitMap(tablet.getMaxRowNumber())) - .toArray(BitMap[]::new) - : tablet.bitMaps; // We do not reduce bitmaps here by origin row size - for (int i = 0; i < originBitMapList.length; i++) { - if (originBitMapList[i] == null) { - originBitMapList[i] = new BitMap(tablet.getMaxRowNumber()); - } - } + final BitMap[] originBitMapList = tablet.bitMaps; for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { @@ -372,7 +351,7 @@ private void parse(final Tablet tablet, final boolean isAligned, final PipePatte originValueColumns[i], rowIndexList, false, - originBitMapList[i], + getBitMap(originBitMapList, i), bitMap); } this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap; @@ -380,6 +359,7 @@ private void parse(final Tablet tablet, final boolean isAligned, final PipePatte } this.rowCount = this.timestampColumn.length; + this.nullValueColumnBitmaps = PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount); if (this.rowCount == 0 && LOGGER.isDebugEnabled()) { LOGGER.debug( "Tablet({}) is parsed to zero rows according to the pattern({}) and time range [{}, {}], the corresponding source event({}) will be ignored.", @@ -471,7 +451,7 @@ private static Object filterValueColumnsByRowIndexList( : (int[]) originValueColumn; final int[] valueColumns = new int[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0; nullValueColumnBitmap.mark(i); } else { @@ -493,7 +473,7 @@ private static Object filterValueColumnsByRowIndexList( : (LocalDate[]) originValueColumn; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = EMPTY_LOCALDATE; nullValueColumnBitmap.mark(i); } else { @@ -507,7 +487,7 @@ private static Object filterValueColumnsByRowIndexList( ? new int[] {(int) originValueColumn} : (int[]) originValueColumn; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = EMPTY_LOCALDATE; nullValueColumnBitmap.mark(i); } else { @@ -527,7 +507,7 @@ private static Object filterValueColumnsByRowIndexList( : (long[]) originValueColumn; final long[] valueColumns = new long[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0L; nullValueColumnBitmap.mark(i); } else { @@ -544,7 +524,7 @@ private static Object filterValueColumnsByRowIndexList( : (float[]) originValueColumn; final float[] valueColumns = new float[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0F; nullValueColumnBitmap.mark(i); } else { @@ -561,7 +541,7 @@ private static Object filterValueColumnsByRowIndexList( : (double[]) originValueColumn; final double[] valueColumns = new double[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = 0D; nullValueColumnBitmap.mark(i); } else { @@ -578,7 +558,7 @@ private static Object filterValueColumnsByRowIndexList( : (boolean[]) originValueColumn; final boolean[] valueColumns = new boolean[rowIndexList.size()]; for (int i = 0; i < rowIndexList.size(); ++i) { - if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = false; nullValueColumnBitmap.mark(i); } else { @@ -599,7 +579,7 @@ private static Object filterValueColumnsByRowIndexList( for (int i = 0; i < rowIndexList.size(); ++i) { if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)]) || Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues()) - || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) { + || isNullValue(originNullValueColumnBitmap, rowIndexList.get(i))) { valueColumns[i] = Binary.EMPTY_VALUE; nullValueColumnBitmap.mark(i); } else { @@ -659,6 +639,14 @@ private void fillNullValue( } } + private static BitMap getBitMap(final BitMap[] bitMaps, final int index) { + return Objects.nonNull(bitMaps) && index < bitMaps.length ? bitMaps[index] : null; + } + + private static boolean isNullValue(final BitMap bitMap, final int rowIndex) { + return Objects.nonNull(bitMap) && bitMap.isMarked(rowIndex); + } + //////////////////////////// process //////////////////////////// public List processRowByRow(final BiConsumer consumer) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java index 4353e4984a259..e1fa58f5a0f68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -70,6 +71,7 @@ public class TsFileInsertionQueryDataContainer extends TsFileInsertionDataContai private final Iterator>> deviceMeasurementsMapIterator; private final Map deviceIsAlignedMap; private final Map measurementDataTypeMap; + private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); @TestOnly public TsFileInsertionQueryDataContainer( @@ -385,7 +387,8 @@ public boolean hasNext() { entry.getValue(), timeFilterExpression, allocatedMemoryBlockForTablet, - currentModifications); + currentModifications, + tabletStringInternPool); } catch (final Exception e) { close(); throw new PipeException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java index e16c7113da354..2e81f4aa335a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.container.query; import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; @@ -37,6 +39,7 @@ import org.apache.tsfile.read.expression.IExpression; import org.apache.tsfile.read.expression.QueryExpression; import org.apache.tsfile.read.query.dataset.QueryDataSet; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -57,6 +60,7 @@ public class TsFileInsertionQueryDataTabletIterator implements Iterator private final String deviceId; private final List measurements; + private final List schemas; private final IExpression timeFilterExpression; @@ -76,20 +80,28 @@ public class TsFileInsertionQueryDataTabletIterator implements Iterator final List measurements, final IExpression timeFilterExpression, final PipeMemoryBlock allocatedBlockForTablet, - final PatternTreeMap currentModifications) + final PatternTreeMap currentModifications, + final TabletStringInternPool tabletStringInternPool) throws IOException { this.tsFileReader = tsFileReader; this.measurementDataTypeMap = measurementDataTypeMap; - this.deviceId = deviceId; + this.deviceId = tabletStringInternPool.intern(deviceId); this.measurements = measurements.stream() .filter( measurement -> // time column in aligned time-series should not be a query column measurement != null && !measurement.isEmpty()) + .map(tabletStringInternPool::intern) .sorted() .collect(Collectors.toList()); + this.schemas = new ArrayList<>(); + for (final String measurement : this.measurements) { + final TSDataType dataType = + measurementDataTypeMap.get(this.deviceId + TsFileConstant.PATH_SEPARATOR + measurement); + schemas.add(new MeasurementSchema(measurement, dataType)); + } this.timeFilterExpression = timeFilterExpression; @@ -99,7 +111,7 @@ public class TsFileInsertionQueryDataTabletIterator implements Iterator this.measurementModsList = ModsOperationUtil.initializeMeasurementMods( - deviceId, this.measurements, currentModifications); + this.deviceId, this.measurements, currentModifications); } private QueryDataSet buildQueryDataSet() throws IOException { @@ -133,18 +145,9 @@ public Tablet next() { } private Tablet buildNextTablet() throws IOException { - final List schemas = new ArrayList<>(); - for (final String measurement : measurements) { - final TSDataType dataType = - measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR + measurement); - schemas.add(new MeasurementSchema(measurement, dataType)); - } - Tablet tablet = null; if (!queryDataSet.hasNext()) { - tablet = new Tablet(deviceId, schemas, 1); - tablet.initBitMaps(); - return tablet; + return new Tablet(deviceId, schemas, 1); } boolean isFirstRow = true; @@ -156,7 +159,6 @@ private Tablet buildNextTablet() throws IOException { Pair rowCountAndMemorySize = PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord); tablet = new Tablet(deviceId, schemas, rowCountAndMemorySize.getLeft()); - tablet.initBitMaps(); if (allocatedBlockForTablet.getMemoryUsageInBytes() < rowCountAndMemorySize.getRight()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedBlockForTablet, rowCountAndMemorySize.getRight()); @@ -172,27 +174,30 @@ private Tablet buildNextTablet() throws IOException { final int fieldSize = fields.size(); for (int i = 0; i < fieldSize; i++) { final Field field = fields.get(i); - final String measurement = measurements.get(i); + final TSDataType dataType = schemas.get(i).getType(); // Check if this value is deleted by mods if (field == null || ModsOperationUtil.isDelete(rowRecord.getTimestamp(), measurementModsList.get(i))) { - tablet.bitMaps[i].mark(rowIndex); + if (dataType != null && dataType.isBinary()) { + PipeTabletUtils.putValue(tablet, rowIndex, i, dataType, Binary.EMPTY_VALUE); + } + PipeTabletUtils.markNullValue(tablet, rowIndex, i); } else { - tablet.addValue(measurement, rowIndex, field.getObjectValue(schemas.get(i).getType())); + PipeTabletUtils.putValue( + tablet, rowIndex, i, dataType, field.getObjectValue(schemas.get(i).getType())); isNeedFillTime = true; } } if (isNeedFillTime) { - tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); + PipeTabletUtils.putTimestamp(tablet, rowIndex, rowRecord.getTimestamp()); } - tablet.rowSize++; - if (tablet.rowSize == tablet.getMaxRowNumber()) { break; } } + PipeTabletUtils.compactBitMaps(tablet); return tablet; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index 9366d0f62dfe2..e903c7340e4f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -93,6 +95,7 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain private String currentDevice; private boolean currentIsAligned; private final List currentMeasurements = new ArrayList<>(); + private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); private final List modsInfos = new ArrayList<>(); // Cached time chunk @@ -272,7 +275,6 @@ private Tablet getNextTablet() { if (!data.hasCurrent()) { tablet = new Tablet(currentDevice, currentMeasurements, 1); - tablet.initBitMaps(); // Ignore the memory cost of tablet PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 0); return tablet; @@ -288,7 +290,6 @@ private Tablet getNextTablet() { PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data); tablet = new Tablet(currentDevice, currentMeasurements, rowCountAndMemorySize.getLeft()); - tablet.initBitMaps(); if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes() < rowCountAndMemorySize.getRight()) { PipeDataNodeResourceManager.memory() @@ -300,10 +301,8 @@ private Tablet getNextTablet() { final int rowIndex = tablet.rowSize; if (putValueToColumns(data, tablet, rowIndex)) { - tablet.addTimestamp(rowIndex, data.currentTime()); + PipeTabletUtils.putTimestamp(tablet, rowIndex, data.currentTime()); } - - tablet.rowSize++; } data.next(); @@ -318,13 +317,13 @@ private Tablet getNextTablet() { if (tablet == null) { tablet = new Tablet(currentDevice, currentMeasurements, 1); - tablet.initBitMaps(); } // Switch chunk reader iff current chunk is all consumed if (!data.hasCurrent()) { prepareData(); } + PipeTabletUtils.compactBitMaps(tablet); return tablet; } catch (final Exception e) { close(); @@ -372,81 +371,100 @@ private void resizePageDataMemoryIfNeeded(final long estimatedMemoryUsageInBytes } private boolean putValueToColumns(final BatchData data, final Tablet tablet, final int rowIndex) { - final Object[] columns = tablet.values; boolean isNeedFillTime = false; if (data.getDataType() == TSDataType.VECTOR) { - for (int i = 0; i < columns.length; ++i) { + for (int i = 0; i < tablet.getSchemas().size(); ++i) { final TsPrimitiveType primitiveType = data.getVector()[i]; + final TSDataType type = tablet.getSchemas().get(i).getType(); if (Objects.isNull(primitiveType) || ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(i))) { - tablet.bitMaps[i].mark(rowIndex); - final TSDataType type = tablet.getSchemas().get(i).getType(); if (type == TSDataType.TEXT || type == TSDataType.BLOB || type == TSDataType.STRING) { - ((Binary[]) columns[i])[rowIndex] = Binary.EMPTY_VALUE; - } - if (type == TSDataType.DATE) { - ((LocalDate[]) columns[i])[rowIndex] = EMPTY_DATE; + PipeTabletUtils.putValue(tablet, rowIndex, i, type, Binary.EMPTY_VALUE); } + PipeTabletUtils.markNullValue(tablet, rowIndex, i); continue; } isNeedFillTime = true; - switch (tablet.getSchemas().get(i).getType()) { + switch (type) { case BOOLEAN: - ((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean(); + PipeTabletUtils.putValue(tablet, rowIndex, i, type, primitiveType.getBoolean()); break; case INT32: - ((int[]) columns[i])[rowIndex] = primitiveType.getInt(); + PipeTabletUtils.putValue(tablet, rowIndex, i, type, primitiveType.getInt()); break; case DATE: - ((LocalDate[]) columns[i])[rowIndex] = - DateUtils.parseIntToLocalDate(primitiveType.getInt()); + PipeTabletUtils.putValue( + tablet, rowIndex, i, type, DateUtils.parseIntToLocalDate(primitiveType.getInt())); break; case INT64: case TIMESTAMP: - ((long[]) columns[i])[rowIndex] = primitiveType.getLong(); + PipeTabletUtils.putValue(tablet, rowIndex, i, type, primitiveType.getLong()); break; case FLOAT: - ((float[]) columns[i])[rowIndex] = primitiveType.getFloat(); + PipeTabletUtils.putValue(tablet, rowIndex, i, type, primitiveType.getFloat()); break; case DOUBLE: - ((double[]) columns[i])[rowIndex] = primitiveType.getDouble(); + PipeTabletUtils.putValue(tablet, rowIndex, i, type, primitiveType.getDouble()); break; case TEXT: case BLOB: case STRING: - ((Binary[]) columns[i])[rowIndex] = primitiveType.getBinary(); + final Binary binary = primitiveType.getBinary(); + PipeTabletUtils.putValue( + tablet, + rowIndex, + i, + type, + Objects.isNull(binary) || Objects.isNull(binary.getValues()) + ? Binary.EMPTY_VALUE + : binary); break; default: throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType()); } } } else { + if (!modsInfos.isEmpty() + && ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0))) { + return false; + } + isNeedFillTime = true; - switch (tablet.getSchemas().get(0).getType()) { + final TSDataType type = tablet.getSchemas().get(0).getType(); + switch (type) { case BOOLEAN: - ((boolean[]) columns[0])[rowIndex] = data.getBoolean(); + PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getBoolean()); break; case INT32: - ((int[]) columns[0])[rowIndex] = data.getInt(); + PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getInt()); break; case DATE: - ((LocalDate[]) columns[0])[rowIndex] = DateUtils.parseIntToLocalDate(data.getInt()); + PipeTabletUtils.putValue( + tablet, rowIndex, 0, type, DateUtils.parseIntToLocalDate(data.getInt())); break; case INT64: case TIMESTAMP: - ((long[]) columns[0])[rowIndex] = data.getLong(); + PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getLong()); break; case FLOAT: - ((float[]) columns[0])[rowIndex] = data.getFloat(); + PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getFloat()); break; case DOUBLE: - ((double[]) columns[0])[rowIndex] = data.getDouble(); + PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getDouble()); break; case TEXT: case BLOB: case STRING: - ((Binary[]) columns[0])[rowIndex] = data.getBinary(); + final Binary binary = data.getBinary(); + PipeTabletUtils.putValue( + tablet, + rowIndex, + 0, + type, + Objects.isNull(binary) || Objects.isNull(binary.getValues()) + ? Binary.EMPTY_VALUE + : binary); break; default: throw new UnSupportedDataTypeException("UnSupported" + data.getDataType()); @@ -560,13 +578,13 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { ? new ChunkReader(chunk, filter) : new SinglePageWholeChunkReader(chunk); currentIsAligned = false; + final String measurementID = + tabletStringInternPool.intern(chunkHeader.getMeasurementID()); currentMeasurements.add( - new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType())); + new MeasurementSchema(measurementID, chunkHeader.getDataType())); modsInfos.addAll( ModsOperationUtil.initializeMeasurementMods( - currentDevice, - Collections.singletonList(chunkHeader.getMeasurementID()), - currentModifications)); + currentDevice, Collections.singletonList(measurementID), currentModifications)); return; } case MetaMarker.VALUE_CHUNK_HEADER: @@ -615,9 +633,11 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { } // Increase value index + final String measurementID = + tabletStringInternPool.intern(chunkHeader.getMeasurementID()); final int valueIndex = measurementIndexMap.compute( - chunkHeader.getMeasurementID(), + measurementID, (measurement, index) -> Objects.nonNull(index) ? index + 1 : 0); // Emit when encountered non-sequential value chunk, or the chunk size exceeds @@ -677,13 +697,13 @@ > getPageDataMemoryLimitInBytes()) { valueChunkSize += chunkHeader.getDataSize(); valueChunkPageMemorySize += currentValueChunkPageMemorySize; valueChunkList.add(chunk); + final String measurementID = + tabletStringInternPool.intern(chunkHeader.getMeasurementID()); currentMeasurements.add( - new MeasurementSchema(chunkHeader.getMeasurementID(), chunkHeader.getDataType())); + new MeasurementSchema(measurementID, chunkHeader.getDataType())); modsInfos.addAll( ModsOperationUtil.initializeMeasurementMods( - currentDevice, - Collections.singletonList(chunkHeader.getMeasurementID()), - currentModifications)); + currentDevice, Collections.singletonList(measurementID), currentModifications)); break; } case MetaMarker.CHUNK_GROUP_HEADER: @@ -702,7 +722,10 @@ > getPageDataMemoryLimitInBytes()) { timeChunkPageMemorySizeList.clear(); measurementIndexMap.clear(); - currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID : null; + currentDevice = + pattern.mayOverlapWithDevice(deviceID) + ? tabletStringInternPool.intern(deviceID) + : null; break; } case MetaMarker.OPERATION_INDEX_RANGE: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index a707f554c5184..a22522666c2ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -179,14 +179,14 @@ private static Pair calculateTabletRowCountAndMemoryBySize( return new Pair<>(1, 0); } - // Calculate row number according to the max size of a pipe tablet. - // "-100" is the estimated size of other data structures in a pipe tablet. + // Calculate row number according to the max size of a pipe tablet. "100" is the estimated size + // of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. - // Here we estimate the max use of int sizeLimit = - Math.min( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - (int) (inputNum * rowBytesUsed * 1.2)); + (int) + Math.min( + IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), + Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) rowBytesUsed * 1.2)); int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 36ca10daa725a..2e69b23927743 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -361,7 +362,7 @@ private Tablet tryBestToAggregateTablets( // Aggregate the current tablet's data aggregatedSchemas.addAll(tablet.getSchemas()); aggregatedValues.addAll(Arrays.asList(tablet.values)); - aggregatedBitMaps.addAll(Arrays.asList(tablet.bitMaps)); + aggregatedBitMaps.addAll(Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet))); // Remove the aggregated tablet tablets.pollFirst(); } else { @@ -563,7 +564,7 @@ private void writeTabletsIntoOneFile( .map(schema -> (MeasurementSchema) schema) .toArray(MeasurementSchema[]::new); Object[] values = Arrays.copyOf(tablet.values, tablet.values.length); - BitMap[] bitMaps = Arrays.copyOf(tablet.bitMaps, tablet.bitMaps.length); + BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet); // convert date value to int refer to // org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index 94a838ee0ad69..266894060dcb8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; @@ -34,7 +35,6 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.apache.tsfile.write.record.Tablet; import java.io.DataOutputStream; import java.io.IOException; @@ -130,6 +130,7 @@ public static PipeTransferTabletBatchReq toTPipeTransferReq( public static PipeTransferTabletBatchReq fromTPipeTransferReq( final TPipeTransferReq transferReq) { final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq(); + final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); // Binary req, for rolling upgrading ReadWriteIOUtils.readInt(transferReq.body); @@ -144,8 +145,7 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { batchReq.tabletReqs.add( - PipeTransferTabletRawReq.toTPipeTransferRawReq( - Tablet.deserialize(transferReq.body), ReadWriteIOUtils.readBool(transferReq.body))); + PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool)); } batchReq.version = transferReq.version; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 47bf4d4489700..60619fd426816 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool; import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -82,6 +84,17 @@ public static PipeTransferTabletRawReq toTPipeTransferRawReq( return tabletReq; } + public static PipeTransferTabletRawReq toTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { + final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); + + tabletReq.tablet = + PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); + tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer); + + return tabletReq; + } + /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTabletRawReq toTPipeTransferReq( @@ -105,10 +118,8 @@ public static PipeTransferTabletRawReq toTPipeTransferReq( } public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferReq transferReq) { - final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); - - tabletReq.tablet = Tablet.deserialize(transferReq.body); - tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body); + final PipeTransferTabletRawReq tabletReq = + toTPipeTransferRawReq(transferReq.body, new TabletStringInternPool()); tabletReq.version = transferReq.version; tabletReq.type = transferReq.type; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index 3a42ec8796916..713a87b2e350f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -374,7 +374,10 @@ private void transferTabletForPubSubModel(final Tablet tablet) throws UaExceptio for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) { // Filter null value - if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) { + if (tablet.bitMaps != null + && columnIndex < tablet.bitMaps.length + && tablet.bitMaps[columnIndex] != null + && tablet.bitMaps[columnIndex].isMarked(rowIndex)) { continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java index 17e7a7c13e263..50beb6405e96e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.sink.util; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; + import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; @@ -129,6 +131,7 @@ private void deduplicateTimestamps() { // Col: [6, 1] private void sortAndMayDeduplicateValuesAndBitMaps() { int columnIndex = 0; + boolean bitMapsModified = false; for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { final IMeasurementSchema schema = tablet.getSchemas().get(i); if (schema != null) { @@ -145,10 +148,15 @@ private void sortAndMayDeduplicateValuesAndBitMaps() { if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { tablet.bitMaps[columnIndex] = deDuplicatedBitMap; + bitMapsModified = true; } columnIndex++; } } + + if (bitMapsModified) { + tablet.bitMaps = PipeTabletUtils.compactBitMaps(tablet.bitMaps, deDuplicatedSize); + } } private Object reorderValueListAndBitMap( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index faacc10eccded..6a1e9bf1bb48c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; +import org.apache.iotdb.db.utils.BitMapUtils; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -277,7 +278,7 @@ public List splitByPartition(IAnalysis analysis) { long[] subTimes = new long[count]; int destLoc = 0; Object[] values = initTabletValues(dataTypes.length, count, dataTypes); - BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count); + BitMap[] bitMaps = initBitmapsForSplit(dataTypes.length, count); System.arraycopy(times, start, subTimes, destLoc, end - start); for (int k = 0; k < values.length; k++) { if (dataTypes[k] != null) { @@ -302,6 +303,7 @@ public List splitByPartition(IAnalysis analysis) { subNode.setFailedMeasurementNumber(getFailedMeasurementNumber()); subNode.setRange(locs); subNode.setDataRegionReplicaSet(entry.getKey()); + subNode.bitMaps = BitMapUtils.compactBitMaps(subNode.bitMaps, subNode.rowCount); result.add(subNode); } } @@ -366,6 +368,23 @@ private BitMap[] initBitmaps(int columnSize, int rowSize) { return bitMaps; } + protected BitMap[] initBitmapsForSplit(int columnSize, int rowSize) { + if (this.bitMaps == null) { + return null; + } + + final int sourceRowCount = rowCount > 0 ? rowCount : times == null ? 0 : times.length; + final BitMap[] splitBitMaps = new BitMap[columnSize]; + boolean hasBitMap = false; + for (int i = 0; i < columnSize && i < this.bitMaps.length; ++i) { + if (this.bitMaps[i] != null && !this.bitMaps[i].isAllUnmarked()) { + splitBitMaps[i] = new BitMap(rowSize); + hasBitMap = true; + } + } + return hasBitMap ? splitBitMaps : null; + } + @Override public void markFailedMeasurement(int index) { if (measurements[index] == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index 2e393678c5d52..525bb48f0dde7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.utils.BitMapUtils; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.tsfile.enums.TSDataType; @@ -326,7 +327,7 @@ public List getSplitList() { statement.setMeasurementSchemas(measurementSchemas); statement.setDataTypes(dataTypes); if (this.bitMaps != null) { - statement.setBitMaps(copiedBitMaps); + statement.setBitMaps(BitMapUtils.compactBitMaps(copiedBitMaps, rowCount)); } statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info); insertTabletStatementList.add(statement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java index e4f1170ddb36c..3da4db4b4083a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java @@ -116,9 +116,7 @@ protected void updateEstimateConsumeQuota(int numWrites, int numReads, Statement case BATCH_INSERT: // InsertTabletStatement InsertTabletStatement insertTabletStatement = (InsertTabletStatement) s; - for (BitMap bitMap : insertTabletStatement.getBitMaps()) { - avgSize += bitMap.getSize(); - } + avgSize += calculationWrite(insertTabletStatement.getBitMaps()); break; case BATCH_INSERT_ONE_DEVICE: // InsertRowsOfOneDeviceStatement @@ -151,10 +149,12 @@ protected void updateEstimateConsumeQuota(int numWrites, int numReads, Statement for (int i = 0; i < insertMultiTabletsStatement.getInsertTabletStatementList().size(); i++) { - for (BitMap bitMap : - insertMultiTabletsStatement.getInsertTabletStatementList().get(i).getBitMaps()) { - avgSize += bitMap.getSize(); - } + avgSize += + calculationWrite( + insertMultiTabletsStatement + .getInsertTabletStatementList() + .get(i) + .getBitMaps()); } } break; @@ -178,6 +178,20 @@ private long calculationWrite(Object[] values) { return size; } + private long calculationWrite(BitMap[] bitMaps) { + if (bitMaps == null) { + return 0; + } + + long size = 0; + for (BitMap bitMap : bitMaps) { + if (bitMap != null) { + size += bitMap.getSize(); + } + } + return size; + } + private long estimateConsume(int numReqs, long avgSize) { if (numReqs > 0) { return avgSize * numReqs; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java new file mode 100644 index 0000000000000..ba30c8847b4b5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import org.apache.tsfile.utils.BitMap; + +import java.util.Objects; + +public final class BitMapUtils { + + private BitMapUtils() {} + + public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) { + if (Objects.isNull(bitMaps)) { + return null; + } + + boolean hasMarkedBitMap = false; + for (int i = 0; i < bitMaps.length; ++i) { + if (Objects.nonNull(bitMaps[i]) && bitMaps[i].isAllUnmarked()) { + bitMaps[i] = null; + } + if (Objects.nonNull(bitMaps[i])) { + hasMarkedBitMap = true; + } + } + return hasMarkedBitMap ? bitMaps : null; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index 4a7d77eab8d29..308c5458dc9b9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; import org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -217,7 +218,7 @@ private void createTablet() { tabletForInsertRowNode.values = values; tabletForInsertRowNode.timestamps = new long[] {times[0]}; tabletForInsertRowNode.rowSize = 1; - tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode; + tabletForInsertRowNode.bitMaps = PipeTabletUtils.compactBitMaps(bitMapsForInsertRowNode, 1); // create tablet for insertTabletNode BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length]; @@ -253,7 +254,8 @@ private void createTablet() { tabletForInsertTabletNode.values = values; tabletForInsertTabletNode.timestamps = times; tabletForInsertTabletNode.rowSize = times.length; - tabletForInsertTabletNode.bitMaps = bitMapsForInsertTabletNode; + tabletForInsertTabletNode.bitMaps = + PipeTabletUtils.compactBitMaps(bitMapsForInsertTabletNode, times.length); } @Test @@ -318,6 +320,36 @@ public void convertToAlignedTabletForTest() { Assert.assertTrue(isAligned4); } + @Test + public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception { + final BitMap[] bitMaps = new BitMap[schemas.length]; + bitMaps[0] = new BitMap(times.length); + bitMaps[1] = new BitMap(times.length); + bitMaps[1].mark(1); + + final InsertTabletNode nodeWithSparseColumn = + new InsertTabletNode( + new PlanNodeId("plannode bitmap"), + new PartialPath(deviceId), + false, + measurementIds, + dataTypes, + schemas, + times, + bitMaps, + insertTabletNode.getColumns(), + times.length); + + final Tablet tablet = + new TabletInsertionDataContainer(nodeWithSparseColumn, new PrefixPipePattern(pattern)) + .convertToTablet(); + + Assert.assertNotNull(tablet.bitMaps); + Assert.assertNull(tablet.bitMaps[0]); + Assert.assertNotNull(tablet.bitMaps[1]); + Assert.assertTrue(tablet.bitMaps[1].isMarked(1)); + } + @Test public void convertToTabletWithFilteredRowsForTest() { TabletInsertionDataContainer container1 = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java new file mode 100644 index 0000000000000..8bf32bd066ffb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class PipeTabletUtilsTest { + + @Test + public void testPutValueUnmarksReusedNullRow() { + final List schemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.FLOAT), + new MeasurementSchema("s2", TSDataType.FLOAT)); + final Tablet tablet = new Tablet("root.sg.d1", schemas, 2); + + PipeTabletUtils.markNullValue(tablet, 0, 0); + PipeTabletUtils.markNullValue(tablet, 0, 1); + + PipeTabletUtils.putValue(tablet, 0, 0, TSDataType.FLOAT, 1.0f); + PipeTabletUtils.putTimestamp(tablet, 0, 1L); + PipeTabletUtils.compactBitMaps(tablet); + + Assert.assertNull(tablet.bitMaps[0]); + Assert.assertTrue(tablet.bitMaps[1].isMarked(0)); + } + + @Test + public void testCopyBitMapsOrCreateEmptyWithNullBitMaps() { + final List schemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.FLOAT), + new MeasurementSchema("s2", TSDataType.FLOAT)); + final Tablet tablet = new Tablet("root.sg.d1", schemas, 2); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1.0f); + tablet.addValue("s2", 0, 2.0f); + + Assert.assertNull(tablet.bitMaps); + + final BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet); + + Assert.assertEquals(schemas.size(), bitMaps.length); + Assert.assertNull(bitMaps[0]); + Assert.assertNull(bitMaps[1]); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index ee9b7218dab35..4e4d11aacbf53 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -321,6 +321,46 @@ public void testPipeTransferTabletBatchReq() throws IOException { Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); } + @Test + public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames() throws IOException { + final List tabletBuffers = new ArrayList<>(); + tabletBuffers.add( + serializeTablet(createSingleValueTablet(new String("root.sg.d"), new String("s1")), false)); + tabletBuffers.add( + serializeTablet(createSingleValueTablet(new String("root.sg.d"), new String("s1")), false)); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq( + PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), tabletBuffers)); + final Tablet firstTablet = deserializedReq.getTabletReqs().get(0).getTablet(); + final Tablet secondTablet = deserializedReq.getTabletReqs().get(1).getTablet(); + + Assert.assertSame(firstTablet.deviceId, secondTablet.deviceId); + Assert.assertSame( + firstTablet.getSchemas().get(0).getMeasurementId(), + secondTablet.getSchemas().get(0).getMeasurementId()); + } + + private static Tablet createSingleValueTablet(final String deviceId, final String measurement) { + final List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32)); + final Tablet tablet = new Tablet(deviceId, schemaList, 1); + tablet.addTimestamp(0, 1); + tablet.addValue(measurement, 0, 1); + tablet.rowSize = 1; + return tablet; + } + + private static ByteBuffer serializeTablet(final Tablet tablet, final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + tablet.serialize(outputStream); + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + @Test public void testPipeTransferFilePieceReq() throws IOException { final byte[] body = "testPipeTransferFilePieceReq".getBytes(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java index a93a22b6e9f4a..305a3197cfebf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.BitMap; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -201,6 +202,9 @@ public void testSplitInsertTablet() throws IllegalPathException { insertTabletNode.setColumns( new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}}); insertTabletNode.setRowCount(insertTabletNode.getTimes().length); + final BitMap[] bitMaps = new BitMap[] {new BitMap(insertTabletNode.getRowCount())}; + bitMaps[0].mark(2); + insertTabletNode.setBitMaps(bitMaps); DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); dataPartitionQueryParam.setDevicePath(insertTabletNode.getDevicePath().getFullPath()); @@ -219,6 +223,12 @@ public void testSplitInsertTablet() throws IllegalPathException { Assert.assertEquals(tabletNode.getTimes().length, 2); TConsensusGroupId regionId = tabletNode.getDataRegionReplicaSet().getRegionId(); Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()), regionId.getId()); + if (tabletNode.getTimes()[0] == 1) { + Assert.assertNotNull(tabletNode.getBitMaps()); + Assert.assertTrue(tabletNode.getBitMaps()[0].isMarked(0)); + } else { + Assert.assertNull(tabletNode.getBitMaps()); + } } insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 2")); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java new file mode 100644 index 0000000000000..5f11039eeb312 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.rescon.quotas; + +import org.apache.iotdb.common.rpc.thrift.TTimedQuota; +import org.apache.iotdb.common.rpc.thrift.ThrottleType; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.utils.BitMap; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; + +public class DefaultOperationQuotaTest { + + @Test + public void testCheckQuotaWithNullAndSparseBitMaps() throws Exception { + final DefaultOperationQuota quota = new DefaultOperationQuota(createQuotaLimiter()); + + final InsertTabletStatement tabletWithoutBitMaps = new InsertTabletStatement(); + tabletWithoutBitMaps.setBitMaps(null); + quota.checkQuota(1, 0, tabletWithoutBitMaps); + + final InsertTabletStatement tabletWithSparseBitMaps = new InsertTabletStatement(); + final BitMap bitMap = new BitMap(8); + bitMap.mark(0); + tabletWithSparseBitMaps.setBitMaps(new BitMap[] {null, bitMap}); + quota.checkQuota(1, 0, tabletWithSparseBitMaps); + + final InsertMultiTabletsStatement multiTabletsStatement = new InsertMultiTabletsStatement(); + multiTabletsStatement.setInsertTabletStatementList( + Arrays.asList(tabletWithoutBitMaps, tabletWithSparseBitMaps)); + quota.checkQuota(1, 0, multiTabletsStatement); + } + + private static QuotaLimiter createQuotaLimiter() { + final Map quotas = new EnumMap<>(ThrottleType.class); + for (final ThrottleType throttleType : ThrottleType.values()) { + quotas.put(throttleType, new TTimedQuota(60_000L, 1_000_000_000L)); + } + return QuotaLimiter.fromThrottle(Collections.unmodifiableMap(quotas)); + } +}