From 57b155d52991a431e4f5f4da263af9aebf594e20 Mon Sep 17 00:00:00 2001 From: Alkis Evlogimenos Date: Sat, 23 May 2026 00:22:41 +0200 Subject: [PATCH 1/2] Add Approach 2 (micro-row-group) reader prototype Prototype for a format extension that lets multiple BlockMetaData entries share a single physical column chunk. Sharing is signaled by the sentinel data_page_offset == -1 (ColumnChunkMetaData.SENTINEL_OFFSET); per-block page locations come from each block's OffsetIndex with absolute first_row_index values, and SynchronizingColumnReader trims rows that spill across block boundaries via an absolute RowRanges window. ParquetFileReader.readNextRowGroup() dispatches to a new internalReadApproach2RowGroup() when BlockMetaData.isApproach2() is true; legacy contiguous chunks take the existing path unchanged. Dictionary pages are read out-of-band (size not in OffsetIndex). Known prototype limitations (no cross-block dict/page cache, boundary pages re-read) are documented at the call sites. Co-authored-by: Isaac --- .../filter2/columnindex/RowRanges.java | 16 ++ .../filter2/columnindex/TestRowRanges.java | 34 +++ .../parquet/hadoop/ParquetFileReader.java | 212 ++++++++++++++- .../hadoop/PhysicalChunkPageSource.java | 246 ++++++++++++++++++ .../hadoop/metadata/BlockMetaData.java | 24 +- .../hadoop/metadata/ColumnChunkMetaData.java | 40 ++- .../metadata/TestApproach2BlockMetaData.java | 80 ++++++ .../metadata/TestColumnChunkMetaData.java | 18 ++ 8 files changed, 667 insertions(+), 3 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PhysicalChunkPageSource.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 0b2257a6bc..4ea630fd88 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -118,6 +118,22 @@ public static RowRanges createSingle(long rowCount) { return new RowRanges(new Range(0L, rowCount - 1L)); } + /** + * Creates an immutable {@link RowRanges} with the single closed range {@code [from, to]}. + * Used by the Approach 2 (micro-row-group) reader path to express a logical micro-row-group's + * absolute row range against a shared physical column chunk. + * + * @param from inclusive first row index (must be non-negative) + * @param to inclusive last row index (must be {@code >= from}) + * @return an immutable {@link RowRanges} representing {@code [from, to]} + */ + public static RowRanges createBetween(long from, long to) { + if (from < 0 || to < from) { + throw new IllegalArgumentException("Invalid row range [" + from + ", " + to + ']'); + } + return new RowRanges(new Range(from, to)); + } + /** * Creates a mutable RowRanges object with the following ranges: *
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
index 9c6b9f737c..41ec4edc56 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestRowRanges.java
@@ -152,4 +152,38 @@ public void testIntersection() {
     assertAllRowsEqual(intersection(empty, ranges2).iterator());
     assertAllRowsEqual(intersection(empty, empty).iterator());
   }
+
+  @Test
+  public void testCreateBetween() {
+    // Single-element range
+    RowRanges single = RowRanges.createBetween(42L, 42L);
+    assertEquals(1L, single.rowCount());
+    assertAllRowsEqual(single.iterator(), 42L);
+
+    // Multi-element range starting at zero (matches createSingle semantics)
+    RowRanges fromZero = RowRanges.createBetween(0L, 4L);
+    assertEquals(5L, fromZero.rowCount());
+    assertAllRowsEqual(fromZero.iterator(), 0L, 1L, 2L, 3L, 4L);
+    assertEquals(
+        RowRanges.createSingle(5L).getRanges().toString(),
+        fromZero.getRanges().toString());
+
+    // Multi-element range with non-zero (file-absolute) start, the Approach 2 use case
+    RowRanges absolute = RowRanges.createBetween(100_000L, 100_004L);
+    assertEquals(5L, absolute.rowCount());
+    assertAllRowsEqual(absolute.iterator(), 100_000L, 100_001L, 100_002L, 100_003L, 100_004L);
+    assertTrue(absolute.isOverlapping(100_002L, 100_003L));
+    assertFalse(absolute.isOverlapping(99_000L, 99_999L));
+    assertFalse(absolute.isOverlapping(100_005L, 100_010L));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCreateBetweenRejectsNegativeFrom() {
+    RowRanges.createBetween(-1L, 0L);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCreateBetweenRejectsInvertedRange() {
+    RowRanges.createBetween(10L, 5L);
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e0b0d76e0e..c3bba9a989 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1140,7 +1140,14 @@ public PageReadStore readRowGroup(int blockIndex) throws IOException {
   public PageReadStore readNextRowGroup() throws IOException {
     ColumnChunkPageReadStore rowGroup = null;
     try {
-      rowGroup = internalReadRowGroup(currentBlock);
+      // Approach 2 (micro-row-group) dispatch: if this block's column chunks are physically
+      // shared with other blocks, use a path that locates pages via OffsetIndex and slices
+      // rows via an absolute RowRanges.
+      if (currentBlock < blocks.size() && blocks.get(currentBlock).isApproach2()) {
+        rowGroup = internalReadApproach2RowGroup(currentBlock);
+      } else {
+        rowGroup = internalReadRowGroup(currentBlock);
+      }
     } catch (ParquetEmptyBlockException e) {
       LOG.warn("Read empty block at index {} from {}", currentBlock, getFile());
       advanceToNextBlock();
@@ -1199,6 +1206,209 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
     return rowGroup;
   }
 
+  /**
+   * Reads a row group whose column chunks are physically shared with other blocks via the
+   * Approach 2 (micro-row-group) format extension. See
+   * {@link org.apache.parquet.hadoop.metadata.BlockMetaData#isApproach2()}.
+   *
+   * 

For each column the page list is taken straight from the block's OffsetIndex sidecar + * — the writer is responsible for emitting an OffsetIndex that contains exactly the + * pages whose absolute row range overlaps this block's {@code [rowIndexOffset, + * rowIndexOffset + rowCount)} window, with {@code PageLocation.first_row_index} as a + * file-absolute row index. The dictionary page (if any) is located via + * {@link org.apache.parquet.hadoop.metadata.ColumnChunkMetaData#getDictionaryPageOffset()} + * (which the spec leaves valid even when {@code data_page_offset == -1}), and its body is + * sized by reading its page header in-band. + * + *

The returned {@link ColumnChunkPageReadStore} carries a {@link RowRanges} covering + * {@code [rowIndexOffset, rowIndexOffset + rowCount - 1]} in absolute coordinates, so the + * downstream {@link org.apache.parquet.column.impl.SynchronizingColumnReader} discards + * rows that fall outside this block's window when a physical page spans into the next + * block. + * + *

Prototype-grade limitations: + *

+ * These are acceptable for a correctness-first prototype and are called out in the + * accompanying design plan. + * + * @param blockIndex the index of the Approach 2 block to read + * @return a {@link ColumnChunkPageReadStore} whose pages cover the block's absolute row + * range, or {@code null} if {@code blockIndex} is out of range + */ + private ColumnChunkPageReadStore internalReadApproach2RowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + BlockMetaData block = blocks.get(blockIndex); + if (block.getRowCount() == 0) { + throw new ParquetEmptyBlockException("Illegal row group of 0 rows"); + } + if (block.getRowIndexOffset() < 0) { + throw new IOException( + "Approach 2 block must declare an absolute rowIndexOffset; block " + blockIndex + " has none"); + } + + final long absFrom = block.getRowIndexOffset(); + final long absTo = absFrom + block.getRowCount() - 1L; + final RowRanges rowRanges = RowRanges.createBetween(absFrom, absTo); + final ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges, absFrom); + + // Use the absolute upper bound (absTo + 1) as the ChunkListBuilder's rowCount: the + // resulting Chunk passes it to ColumnChunkPageReader which uses it as the upper bound + // for OffsetIndex#getLastRowIndex on the final page. Under Approach 2 the OffsetIndex + // stores absolute first_row_index values, so the upper bound must also be absolute. + final long absoluteUpperBound = absTo + 1L; + final ChunkListBuilder builder = new ChunkListBuilder(absoluteUpperBound); + final List allParts = new ArrayList<>(); + final Map dictPagesByDescriptor = new HashMap<>(); + + final ColumnIndexStore ciStore = getColumnIndexStore(blockIndex); + for (ColumnChunkMetaData mc : block.getColumns()) { + ColumnPath pathKey = mc.getPath(); + ColumnDescriptor columnDescriptor = paths.get(pathKey); + if (columnDescriptor == null) { + continue; + } + if (!mc.isPhysicallyShared()) { + throw new IOException("Approach 2 block " + blockIndex + " mixes physically-shared and legacy columns; " + + "mixed-mode blocks are not supported by this prototype"); + } + + OffsetIndex offsetIndex = ciStore.getOffsetIndex(pathKey); + if (offsetIndex == null) { + throw new IOException("Approach 2 column missing OffsetIndex: " + pathKey); + } + + // Build per-page byte ranges. Each page gets its own ChunkDescriptor; consecutive + // pages collapse into a single ConsecutivePartList for vectored IO. + ConsecutivePartList currentParts = null; + for (int i = 0, n = offsetIndex.getPageCount(); i < n; i++) { + long off = offsetIndex.getOffset(i); + int len = offsetIndex.getCompressedPageSize(i); + BenchmarkCounter.incrementTotalBytes(len); + if (currentParts == null || currentParts.endPos() != off) { + currentParts = new ConsecutivePartList(off); + allParts.add(currentParts); + } + ChunkDescriptor cd = new ChunkDescriptor(columnDescriptor, mc, off, len); + currentParts.addChunk(cd); + builder.setOffsetIndex(cd, offsetIndex); + } + + // Dictionary page is located via getDictionaryPageOffset() (still valid under the + // sentinel per spec); read its bytes out-of-band because its compressed size is not + // in the OffsetIndex. + long dictOffset = mc.getDictionaryPageOffset(); + if (dictOffset > 0) { + DictionaryPage dictPage = readDictionaryPageDirect(dictOffset); + if (dictPage != null) { + // Stash by a synthetic descriptor that won't clash with data-page descriptors + // (descriptors equal by ColumnDescriptor; we encode a unique fileOffset/size + // pair to keep the map well-formed). + dictPagesByDescriptor.put(new ChunkDescriptor(columnDescriptor, mc, -dictOffset, 0), dictPage); + } + } + } + + if (!allParts.isEmpty()) { + readAllPartsVectoredOrNormal(allParts, builder); + } + rowGroup.setReleaser(builder.releaser); + + // Each Chunk now wraps one column's data-page bytes (concatenated in OffsetIndex order) + // plus its OffsetIndex; readAllPages parses the bytes and returns a ColumnChunkPageReader. + // For Approach 2 we splice in the separately-read dictionary page if present. + for (Chunk chunk : builder.build()) { + ColumnChunkPageReader dataPagesReader = chunk.readAllPages(); + DictionaryPage dictPage = null; + for (Map.Entry e : dictPagesByDescriptor.entrySet()) { + if (e.getKey().col.equals(chunk.descriptor.col)) { + dictPage = e.getValue(); + break; + } + } + if (dictPage == null) { + rowGroup.addColumn(chunk.descriptor.col, dataPagesReader); + } else { + rowGroup.addColumn( + chunk.descriptor.col, + new ColumnChunkPageReader( + options.getCodecFactory().getDecompressor(chunk.descriptor.metadata.getCodec()), + drainDataPagesQueue(dataPagesReader), + dictPage, + chunk.offsetIndex, + chunk.rowCount, + null /* blockDecryptor */, + null /* fileAAD */, + block.getOrdinal(), + 0 /* columnOrdinal — not used in non-encrypted prototype */, + options)); + } + } + + return rowGroup; + } + + /** + * Reads a single dictionary page directly from {@code f} at {@code dictOffset}. Used by + * the Approach 2 reader path because the compressed dictionary page size is not in the + * OffsetIndex; we discover it by parsing the page header in-band. + * + * @param dictOffset absolute file offset of the dictionary page header + * @return the compressed {@link DictionaryPage}, or {@code null} if the page header + * indicates this is not a dictionary page (shouldn't happen for a well-formed file) + */ + private DictionaryPage readDictionaryPageDirect(long dictOffset) throws IOException { + f.seek(dictOffset); + PageHeader header = Util.readPageHeader(f); + if (header.type != org.apache.parquet.format.PageType.DICTIONARY_PAGE) { + LOG.warn("Expected DICTIONARY_PAGE at offset {} but got {}", dictOffset, header.type); + return null; + } + int compressedSize = header.getCompressed_page_size(); + int uncompressedSize = header.getUncompressed_page_size(); + DictionaryPageHeader dicHeader = header.getDictionary_page_header(); + ByteBuffer buf = options.getAllocator().allocate(compressedSize); + try { + f.readFully(buf); + buf.flip(); + DictionaryPage page = new DictionaryPage( + org.apache.parquet.bytes.BytesInput.from(buf), + uncompressedSize, + dicHeader.getNum_values(), + converter.getEncoding(dicHeader.getEncoding())); + if (header.isSetCrc()) { + page.setCrc(header.getCrc()); + } + return page; + } catch (RuntimeException | IOException e) { + options.getAllocator().release(buf); + throw e; + } + } + + /** + * Drains all data pages out of a {@link ColumnChunkPageReader} returned by + * {@code Chunk.readAllPages()} so we can rebuild it with a different dictionary page. + * The returned list preserves page order. The {@code dataPagesReader} should not be + * used after this call. + */ + private static List drainDataPagesQueue(ColumnChunkPageReader dataPagesReader) { + List drained = new ArrayList<>(); + DataPage p; + while ((p = dataPagesReader.readPage()) != null) { + drained.add(p); + } + return drained; + } + /** * Reads all the columns requested from the specified row group. It may skip specific pages based on the column * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PhysicalChunkPageSource.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PhysicalChunkPageSource.java new file mode 100644 index 0000000000..86a5551c61 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PhysicalChunkPageSource.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.util.ArrayList; +import java.util.List; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +/** + * Cached, per-physical-column-chunk page state used by the Approach 2 (micro-row-group) + * reader path. + * + *

An Approach 2 file represents the row groups in its footer as logical + * micro-row-groups that share a common physical column chunk per column. Multiple + * {@link org.apache.parquet.hadoop.metadata.BlockMetaData} entries point at overlapping + * page ranges in their {@link OffsetIndex} sidecars, and a single physical {@link DataPage} + * may straddle the boundary between adjacent micro-row-groups. + * + *

This class is the in-memory model of that physical column chunk after IO + page-header + * parse. It holds: + *

    + *
  • The compressed {@link DictionaryPage} (or {@code null} if the column has none).
  • + *
  • The full, ordered list of compressed {@link DataPage}s for the chunk.
  • + *
  • The OffsetIndex that describes those pages with file-absolute + * {@code first_row_index} values.
  • + *
  • The {@link ColumnChunkMetaData} from the first physical row group in the group, + * used for codec lookup and other shared properties.
  • + *
+ * + *

Sharing across micro-row-groups: each visiting {@code readNextRowGroup()} call hands + * the caller a fresh {@link org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader} + * built from a slice of this source's page list (only the pages that overlap the + * visiting block's row range). The {@code DataPage} objects themselves are shared by + * reference; their {@link org.apache.parquet.bytes.BytesInput} payloads are re-readable + * because they wrap underlying {@code ByteBuffer}s rather than consuming a single stream. + * + *

The compressed dictionary page is shared by reference too. Each micro-row-group's + * column reader re-decompresses and re-decodes it independently — that is the + * known prototype-grade inefficiency called out in the Approach 2 plan; a production + * implementation would memoize the decoded {@link org.apache.parquet.column.Dictionary}. + * + *

Instances are constructed at file open by {@link ParquetFileReader} when an + * Approach 2 file is detected (any block returns {@code true} from + * {@link org.apache.parquet.hadoop.metadata.BlockMetaData#isApproach2()}). + */ +final class PhysicalChunkPageSource { + + /** + * Column chunk metadata from the first physical row group in the group. Used by the + * caller to resolve codec, encoding stats, and (when present) the compressed + * dictionary page's location during IO planning. Note that + * {@link ColumnChunkMetaData#isPhysicallyShared()} returns {@code true} on this + * instance; callers must not use {@link ColumnChunkMetaData#getStartingPos()} or + * {@link ColumnChunkMetaData#getTotalSize()} for IO planning. + */ + private final ColumnChunkMetaData metadata; + + /** + * The full ordered list of compressed data pages for this physical column chunk. + * Each {@link DataPage} carries a re-readable {@link org.apache.parquet.bytes.BytesInput}. + */ + private final List compressedPages; + + /** + * The compressed dictionary page, or {@code null} if this column chunk has none. + */ + private final DictionaryPage compressedDictionaryPage; + + /** + * OffsetIndex covering every page in {@link #compressedPages}. {@code firstRowIndex(i)} + * values are file-absolute under Approach 2. + */ + private final OffsetIndex absoluteOffsetIndex; + + PhysicalChunkPageSource( + ColumnChunkMetaData metadata, + List compressedPages, + DictionaryPage compressedDictionaryPage, + OffsetIndex absoluteOffsetIndex) { + this.metadata = metadata; + this.compressedPages = List.copyOf(compressedPages); + this.compressedDictionaryPage = compressedDictionaryPage; + this.absoluteOffsetIndex = absoluteOffsetIndex; + } + + ColumnChunkMetaData getMetadata() { + return metadata; + } + + DictionaryPage getCompressedDictionaryPage() { + return compressedDictionaryPage; + } + + OffsetIndex getAbsoluteOffsetIndex() { + return absoluteOffsetIndex; + } + + int getPageCount() { + return compressedPages.size(); + } + + /** + * A slice of {@link PhysicalChunkPageSource} pages restricted to those that overlap an + * absolute row range. The {@link OffsetIndex} returned by {@link #getOffsetIndex()} is + * indexed 0..{@code pages.size()-1} and matches {@link #getPages()} entry-for-entry, so + * a {@code ColumnChunkPageReader} built from this slice can index either by ordinal + * without confusion. + */ + static final class PageSlice { + private final List pages; + private final OffsetIndex offsetIndex; + + private PageSlice(List pages, OffsetIndex offsetIndex) { + this.pages = pages; + this.offsetIndex = offsetIndex; + } + + List getPages() { + return pages; + } + + OffsetIndex getOffsetIndex() { + return offsetIndex; + } + } + + /** + * Returns the slice of pages whose absolute row span intersects the closed range + * {@code [fromAbsoluteRow, toAbsoluteRow]}. The returned slice's {@link OffsetIndex} + * is a 0-based view into the surviving subset, so callers can pass it to + * {@code ColumnChunkPageReader} without further translation. + * + *

A page is included if any row index in its {@code [firstRowIndex, lastRowIndex]} + * span falls inside {@code [fromAbsoluteRow, toAbsoluteRow]}. The OffsetIndex must be + * non-null and its entries must be sorted by {@code firstRowIndex}. + * + * @param fromAbsoluteRow inclusive lower bound, file-absolute + * @param toAbsoluteRow inclusive upper bound, file-absolute + * @return the filtered page slice, possibly empty but never {@code null} + */ + PageSlice sliceForRowRange(long fromAbsoluteRow, long toAbsoluteRow) { + if (toAbsoluteRow < fromAbsoluteRow) { + return new PageSlice(List.of(), new SlicedOffsetIndex(absoluteOffsetIndex, new int[0])); + } + if (absoluteOffsetIndex == null) { + throw new IllegalStateException( + "Approach 2 column chunk has no OffsetIndex; cannot slice pages by row range"); + } + final int n = absoluteOffsetIndex.getPageCount(); + final long upperRowSentinel = toAbsoluteRow + 1L; + final List resultPages = new ArrayList<>(); + final List keptIndexes = new ArrayList<>(); + for (int i = 0; i < n; i++) { + final long pageFirst = absoluteOffsetIndex.getFirstRowIndex(i); + // Last row of page i is firstRow(i+1) - 1 when there is a next page; for the final + // page the source has no easy upper bound, so we treat it as open-ended and trust + // the SynchronizingColumnReader to stop at toAbsoluteRow. + final long pageLast = (i + 1 < n) ? absoluteOffsetIndex.getFirstRowIndex(i + 1) - 1L : Long.MAX_VALUE; + if (pageLast < fromAbsoluteRow) { + continue; + } + if (pageFirst >= upperRowSentinel) { + break; + } + resultPages.add(compressedPages.get(i)); + keptIndexes.add(i); + } + int[] indexMap = new int[keptIndexes.size()]; + for (int k = 0; k < indexMap.length; k++) { + indexMap[k] = keptIndexes.get(k); + } + return new PageSlice(resultPages, new SlicedOffsetIndex(absoluteOffsetIndex, indexMap)); + } + + /** + * A read-only view onto another {@link OffsetIndex} that exposes only the entries at + * positions listed in {@code indexMap}. Used to align the sliced page list with a fresh + * {@code ColumnChunkPageReader}, which addresses its OffsetIndex by sequential page + * ordinal. + */ + private static final class SlicedOffsetIndex implements OffsetIndex { + private final OffsetIndex source; + private final int[] indexMap; + + SlicedOffsetIndex(OffsetIndex source, int[] indexMap) { + this.source = source; + this.indexMap = indexMap; + } + + @Override + public int getPageCount() { + return indexMap.length; + } + + @Override + public long getOffset(int pageIndex) { + return source.getOffset(indexMap[pageIndex]); + } + + @Override + public int getCompressedPageSize(int pageIndex) { + return source.getCompressedPageSize(indexMap[pageIndex]); + } + + @Override + public long getFirstRowIndex(int pageIndex) { + return source.getFirstRowIndex(indexMap[pageIndex]); + } + + @Override + public long getLastRowIndex(int pageIndex, long totalRowCount) { + int srcIdx = indexMap[pageIndex]; + int nextIdx = srcIdx + 1; + return (nextIdx >= source.getPageCount() ? totalRowCount : source.getFirstRowIndex(nextIdx)) - 1L; + } + + @Override + public int getPageOrdinal(int pageIndex) { + return source.getPageOrdinal(indexMap[pageIndex]); + } + + @Override + public java.util.Optional getUnencodedByteArrayDataBytes(int pageIndex) { + return source.getUnencodedByteArrayDataBytes(indexMap[pageIndex]); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java index e1fe9b894c..5f3b5a801a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java @@ -107,12 +107,34 @@ public List getColumns() { } /** - * @return the starting pos of first column + * @return the starting pos of first column, or {@link ColumnChunkMetaData#SENTINEL_OFFSET} + * if this block is part of an Approach 2 (micro-row-group) physical group and + * its columns are physically shared with other blocks. */ public long getStartingPos() { return getColumns().get(0).getStartingPos(); } + /** + * @return {@code true} if every column in this block is physically shared with other + * blocks via the Approach 2 (micro-row-group) format extension. When this + * returns {@code true}, the block's pages must be located via per-column + * {@code OffsetIndex} sidecars and a single physical IO covers multiple blocks + * of the same physical group. Returns {@code false} for legacy (contiguous) + * column chunks. Empty blocks return {@code false}. + */ + public boolean isApproach2() { + if (columns.isEmpty()) { + return false; + } + for (ColumnChunkMetaData col : columns) { + if (!col.isPhysicallyShared()) { + return false; + } + } + return true; + } + @Override public String toString() { String rowIndexOffsetStr = ""; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 4ba52dec2c..85d9ed9a0a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -48,6 +48,17 @@ * Column meta data for a block stored in the file footer and passed in the InputSplit */ public abstract class ColumnChunkMetaData { + /** + * Sentinel value used for {@link #getFirstDataPageOffset()} and the on-disk + * {@code data_page_offset} / {@code ColumnChunk.file_offset} fields to mark + * a column chunk as physically shared with other logical row groups + * (Approach 2 of the micro-row-group format extension). When set, the + * column chunk's pages are owned by a physical group that spans multiple + * {@link BlockMetaData} entries, and page locations must be looked up via + * the {@code OffsetIndex} sidecar rather than derived from this metadata. + */ + public static final long SENTINEL_OFFSET = -1L; + protected int rowGroupOrdinal = -1; @Deprecated @@ -308,12 +319,18 @@ public int getRowGroupOrdinal() { } /** - * @return the offset of the first byte in the chunk + * @return the offset of the first byte in the chunk, or {@link #SENTINEL_OFFSET} if this + * column chunk is physically shared (see {@link #isPhysicallyShared()}). Callers + * that need an actual byte offset must consult the column's {@code OffsetIndex} + * in that case. */ public long getStartingPos() { decryptIfNeeded(); long dictionaryPageOffset = getDictionaryPageOffset(); long firstDataPageOffset = getFirstDataPageOffset(); + if (firstDataPageOffset == SENTINEL_OFFSET) { + return SENTINEL_OFFSET; + } if (dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) { // if there's a dictionary and it's before the first data page, start from there return dictionaryPageOffset; @@ -321,6 +338,27 @@ public long getStartingPos() { return firstDataPageOffset; } + /** + * @return {@code true} if this column chunk's pages are not contiguous in the file + * and must be located via the {@code OffsetIndex} sidecar (Approach 2 of the + * micro-row-group format extension). When {@code true}, callers must not use + * {@link #getStartingPos()} or {@link #getTotalSize()} to plan a single IO over + * the column chunk. + * + *

This probe must not trigger decryption: Approach 2 interacts with the + * encrypted-metadata path in ways that are out of scope for this prototype, so + * encrypted column chunks always return {@code false}. This also keeps the cheap + * dispatch in {@link org.apache.parquet.hadoop.ParquetFileReader#readNextRowGroup()} + * from throwing on plaintext-footer-encrypted-columns files when no decryptor is + * configured. + */ + public boolean isPhysicallyShared() { + if (isEncrypted()) { + return false; + } + return getFirstDataPageOffset() == SENTINEL_OFFSET; + } + /** * checks that a positive long value fits in an int. * (reindexed on Integer.MIN_VALUE) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java new file mode 100644 index 0000000000..a3b3b2d2fb --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestApproach2BlockMetaData.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.metadata; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashSet; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.junit.Test; + +/** + * Tests for {@link BlockMetaData#isApproach2()} sentinel detection. Approach 2 (micro-row-group) + * blocks are detected by checking that every {@link ColumnChunkMetaData} in the block reports + * {@link ColumnChunkMetaData#isPhysicallyShared()} as {@code true}. + */ +public class TestApproach2BlockMetaData { + + @Test + public void testEmptyBlockIsNotApproach2() { + BlockMetaData block = new BlockMetaData(); + assertFalse(block.isApproach2()); + } + + @Test + public void testAllSentinelBlockIsApproach2() { + BlockMetaData block = new BlockMetaData(); + block.addColumn(buildColumn("a", ColumnChunkMetaData.SENTINEL_OFFSET)); + block.addColumn(buildColumn("b", ColumnChunkMetaData.SENTINEL_OFFSET)); + assertTrue(block.isApproach2()); + } + + @Test + public void testMixedSentinelBlockIsNotApproach2() { + BlockMetaData block = new BlockMetaData(); + block.addColumn(buildColumn("a", ColumnChunkMetaData.SENTINEL_OFFSET)); + block.addColumn(buildColumn("b", 100L)); + assertFalse(block.isApproach2()); + } + + @Test + public void testLegacyBlockIsNotApproach2() { + BlockMetaData block = new BlockMetaData(); + block.addColumn(buildColumn("a", 100L)); + block.addColumn(buildColumn("b", 200L)); + assertFalse(block.isApproach2()); + } + + private static ColumnChunkMetaData buildColumn(String name, long firstDataPage) { + return ColumnChunkMetaData.get( + ColumnPath.get(name), + BINARY, + CompressionCodecName.UNCOMPRESSED, + new HashSet<>(Collections.emptySet()), + new BinaryStatistics(), + firstDataPage, + 0L, + 0L, + 0L, + 0L); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java index 99da0fa7fb..9850736f92 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java @@ -20,6 +20,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.HashSet; @@ -67,6 +68,23 @@ public void testConversionNeg() { assertEquals(neg, md.getFirstDataPageOffset()); } + @Test + public void testSentinelIsPhysicallyShared() { + // Approach 2 (micro-row-group) sentinel: firstDataPage == -1 marks the column as + // physically shared with other blocks. + ColumnChunkMetaData md = newMD(ColumnChunkMetaData.SENTINEL_OFFSET); + assertTrue(md.isPhysicallyShared()); + assertEquals(ColumnChunkMetaData.SENTINEL_OFFSET, md.getStartingPos()); + } + + @Test + public void testLegacyIsNotPhysicallyShared() { + // A normal column chunk must not look like the Approach 2 sentinel. + ColumnChunkMetaData md = newMD(100L); + assertFalse(md.isPhysicallyShared()); + assertEquals(100L, md.getStartingPos()); + } + private ColumnChunkMetaData newMD(long big) { Set e = new HashSet(); PrimitiveTypeName t = BINARY; From c6cd039f7568312d9dbc887dc4c3643bd098fc59 Mon Sep 17 00:00:00 2001 From: Alkis Evlogimenos Date: Wed, 27 May 2026 10:07:07 +0200 Subject: [PATCH 2/2] Add Approach 2 (micro-row-group) writer prototype MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Public knob `parquet.writer.micro-row-group.row-count` (and `ParquetWriter.Builder.withMicroRowGroupRowCount(long)`) opts in to emitting K logical BlockMetaData entries per physical column chunk on each record-batch flush, sized by this target row count. When unset (default 0), the legacy single-block-per-flush path runs unchanged. `InternalParquetRecordWriter.flushRowGroupToStore()` routes through a new `ParquetFileWriter.writeMicroRowGroups(...)` which writes one physical column chunk per logical column and emits K BlockMetaData entries with `data_page_offset == ColumnChunkMetaData.SENTINEL_OFFSET` plus per-block OffsetIndex sidecars carrying file-absolute `first_row_index` values — exactly the shape consumed by the reader's `internalReadApproach2RowGroup`. A round-trip test exercises the path via `ExampleParquetWriter`. Prototype limitations (documented in PR body): per-block stats, per-block ColumnIndex, and bloom filters are intentionally omitted; encryption falls back to the legacy path; per-block valueCount is exact only for non-repeated columns. Co-authored-by: Isaac --- .../parquet/column/ParquetProperties.java | 48 +++++ .../columnindex/OffsetIndexBuilder.java | 30 +++ .../hadoop/ColumnChunkPageWriteStore.java | 45 ++++ .../hadoop/InternalParquetRecordWriter.java | 41 +++- .../hadoop/MicroRowGroupColumnData.java | 90 ++++++++ .../parquet/hadoop/ParquetFileWriter.java | 199 ++++++++++++++++++ .../parquet/hadoop/ParquetOutputFormat.java | 22 +- .../apache/parquet/hadoop/ParquetWriter.java | 18 ++ .../hadoop/TestApproach2WriterReadWrite.java | 168 +++++++++++++++ 9 files changed, 654 insertions(+), 7 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MicroRowGroupColumnData.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestApproach2WriterReadWrite.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..17f5758e54 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -69,6 +69,14 @@ public class ParquetProperties { public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true; + /** + * Default for the Approach 2 (micro-row-group) writer knob: 0 means the legacy writer + * path is used and each {@code ParquetWriter} record-batch flush produces exactly one + * {@code BlockMetaData}. A positive value opts in to producing K logical + * micro-row-groups per physical column chunk on every flush, sized by this value. + */ + public static final long DEFAULT_MICRO_ROW_GROUP_ROW_COUNT = 0L; + /** * @deprecated This shared instance can cause thread safety issues when used by multiple builders concurrently. * Use {@code new DefaultValuesWriterFactory()} instead to create individual instances. @@ -135,6 +143,7 @@ public static WriterVersion fromString(String name) { private final Map extraMetaData; private final ColumnProperty statistics; private final ColumnProperty sizeStatistics; + private final long microRowGroupRowCount; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -167,6 +176,7 @@ private ParquetProperties(Builder builder) { this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); this.sizeStatistics = builder.sizeStatistics.build(); + this.microRowGroupRowCount = builder.microRowGroupRowCount; } public static Builder builder() { @@ -322,6 +332,18 @@ public boolean getPageWriteChecksumEnabled() { return pageWriteChecksumEnabled; } + /** + * @return the Approach 2 (micro-row-group) target row count per logical block, or + * {@code 0} if disabled. When positive, every record-batch flush in + * {@code InternalParquetRecordWriter} produces {@code ceil(flushRowCount / + * microRowGroupRowCount)} logical {@code BlockMetaData} entries that share one + * physical column chunk; readers consume them via + * {@code ParquetFileReader.readNextRowGroup()}'s Approach 2 dispatch. + */ + public long getMicroRowGroupRowCount() { + return microRowGroupRowCount; + } + public OptionalLong getBloomFilterNDV(ColumnDescriptor column) { Long ndv = bloomFilterNDVs.getValue(column); return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv); @@ -419,6 +441,7 @@ public static class Builder { private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; private final ColumnProperty.Builder sizeStatistics; + private long microRowGroupRowCount = DEFAULT_MICRO_ROW_GROUP_ROW_COUNT; private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -460,6 +483,7 @@ private Builder(ParquetProperties toCopy) { this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); + this.microRowGroupRowCount = toCopy.microRowGroupRowCount; } /** @@ -756,6 +780,30 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) { return this; } + /** + * Opt in to the Approach 2 (micro-row-group) writer path: each record-batch flush + * produces one physical column chunk whose pages are split into multiple logical + * {@code BlockMetaData} entries of approximately this row count. Set to {@code 0} to + * use the legacy single-block-per-flush behavior (the default). + * + *

Prototype limitations apply (see {@link + * org.apache.parquet.hadoop.metadata.ColumnChunkMetaData#SENTINEL_OFFSET}): + * encryption is unsupported and per-block statistics / ColumnIndex / bloom filters + * are not emitted. + * + * @param microRowGroupRowCount target row count per logical micro-row-group, or + * {@code 0} to disable. Must be non-negative. + * @return this builder for method chaining + */ + public Builder withMicroRowGroupRowCount(long microRowGroupRowCount) { + Preconditions.checkArgument( + microRowGroupRowCount >= 0, + "Invalid micro-row-group row count (negative): %s", + microRowGroupRowCount); + this.microRowGroupRowCount = microRowGroupRowCount; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java index bd729ad97b..ab9d48ca00 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java @@ -111,6 +111,10 @@ public OffsetIndex build(long shift) { private final IntList compressedPageSizes = new IntArrayList(); private final LongList firstRowIndexes = new LongArrayList(); private final LongList unencodedDataBytes = new LongArrayList(); + // Per-page row counts captured by the two-arg add(compressedPageSize, rowCount) call. + // Kept alongside firstRowIndexes so the micro-row-group writer path can slice pages + // by absolute row range without recomputing row deltas from the builder's running state. + private final LongList rowCounts = new LongArrayList(); private long previousOffset; private int previousPageSize; private long previousRowIndex; @@ -161,6 +165,7 @@ public void add(int compressedPageSize, long rowCount, Optional unencodedD previousRowIndex + previousRowCount, unencodedDataBytes); previousRowCount = rowCount; + rowCounts.add(rowCount); } /** @@ -257,4 +262,29 @@ public OffsetIndex build(long shift) { return offsetIndex; } + + /** + * @return the number of pages added via the two-arg {@link #add(int, long)} variant + * (used by the micro-row-group writer path). Returns 0 if pages were added via + * the explicit-offset variant only. + */ + public int getPageCount() { + return rowCounts.size(); + } + + /** + * @param pageIndex page ordinal in [0, {@link #getPageCount()}) + * @return the compressed page size recorded for page {@code pageIndex} + */ + public int getCompressedPageSize(int pageIndex) { + return compressedPageSizes.getInt(pageIndex); + } + + /** + * @param pageIndex page ordinal in [0, {@link #getPageCount()}) + * @return the row count recorded for page {@code pageIndex} via {@link #add(int, long)} + */ + public long getRowCount(int pageIndex) { + return rowCounts.getLong(pageIndex); + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index c0cf216cc9..9a1753c2d4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -426,6 +426,35 @@ public long getMemSize() { return buf.size(); } + /** + * Snapshot this page writer's accumulated state as a {@link MicroRowGroupColumnData} + * for the Approach 2 (micro-row-group) writer path. Returns the SAME {@code buf} and + * {@code offsetIndexBuilder} references this writer owns — the caller (typically + * {@link ParquetFileWriter#writeMicroRowGroups}) will write the buffer to disk once + * and reuse the builder; this writer must not be touched again after the drain. + * + * @throws IllegalStateException if the writer is configured for encrypted output — + * Approach 2 does not support encryption in this prototype. + */ + MicroRowGroupColumnData drainForMicroRowGroups() { + if (headerBlockEncryptor != null) { + throw new IllegalStateException( + "drainForMicroRowGroups (Approach 2) does not support encryption"); + } + return new MicroRowGroupColumnData( + path, + compressor.getCodecName(), + dictionaryPage, + buf, + uncompressedLength, + compressedLength, + totalValueCount, + offsetIndexBuilder, + rlEncodings, + dlEncodings, + dataEncodings); + } + public void writeToFileWriter(ParquetFileWriter writer) throws IOException { if (null == headerBlockEncryptor) { writer.writeColumnChunk( @@ -700,4 +729,20 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException { pageWriter.writeToFileWriter(writer); } } + + /** + * Snapshot every column's accumulated pages as {@link MicroRowGroupColumnData} entries + * (in {@link #schema} column order) for the Approach 2 (micro-row-group) writer path. + * After this call, the underlying page writers must not be used to write any more pages; + * the typical caller closes this store immediately afterwards. + * + * @throws IllegalStateException if any column writer was configured for encrypted output + */ + List drainForMicroRowGroups() { + List result = new ArrayList<>(schema.getColumns().size()); + for (ColumnDescriptor path : schema.getColumns()) { + result.add(writers.get(path).drainForMicroRowGroups()); + } + return result; + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index dd51d1ef09..90074feb21 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -225,12 +225,25 @@ private void flushRowGroupToStore() throws IOException { if (recordCount > 0) { rowGroupOrdinal++; - parquetFileWriter.startBlock(recordCount); - columnStore.flush(); - pageStore.flushToFileWriter(parquetFileWriter); - recordCount = 0; - parquetFileWriter.endBlock(); - this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold); + long microRowGroupRowCount = props.getMicroRowGroupRowCount(); + if (microRowGroupRowCount > 0 && fileEncryptor == null && recordCount > microRowGroupRowCount) { + // Approach 2 path: flush all pages, then write one physical column chunk per + // column whose pages are sliced into K logical micro-row-groups, each marked + // with data_page_offset == SENTINEL_OFFSET. Encryption short-circuits to the + // legacy path because writeMicroRowGroups does not support encrypted columns. + columnStore.flush(); + long[] microRowGroupRowCounts = splitIntoMicroRowGroupCounts(recordCount, microRowGroupRowCount); + parquetFileWriter.writeMicroRowGroups(pageStore.drainForMicroRowGroups(), microRowGroupRowCounts); + recordCount = 0; + this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold); + } else { + parquetFileWriter.startBlock(recordCount); + columnStore.flush(); + pageStore.flushToFileWriter(parquetFileWriter); + recordCount = 0; + parquetFileWriter.endBlock(); + this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold); + } } } finally { AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore); @@ -240,6 +253,22 @@ private void flushRowGroupToStore() throws IOException { } } + /** + * Split a total record count into K logical micro-row-group row counts of + * approximately {@code target} rows each, with the final entry absorbing the remainder. + */ + private static long[] splitIntoMicroRowGroupCounts(long total, long target) { + int k = Math.toIntExact((total + target - 1) / target); + long[] counts = new long[k]; + long remaining = total; + for (int i = 0; i < k - 1; i++) { + counts[i] = target; + remaining -= target; + } + counts[k - 1] = remaining; + return counts; + } + long getRowGroupSizeThreshold() { return rowGroupSizeThreshold; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MicroRowGroupColumnData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MicroRowGroupColumnData.java new file mode 100644 index 0000000000..823316250b --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MicroRowGroupColumnData.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.util.List; +import java.util.Set; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; + +/** + * Description of one physical column chunk for the Approach 2 (micro-row-group) writer + * path. Carries the buffered, page-encoded bytes for the entire physical chunk plus + * everything {@link ParquetFileWriter#writeMicroRowGroups} needs to (a) write the chunk + * to disk in one shot and (b) slice it into K logical {@link + * org.apache.parquet.hadoop.metadata.BlockMetaData} entries with sentinel + * {@code data_page_offset == -1} and per-block absolute-row-index OffsetIndex sidecars. + * + *

Package-private and immutable. Produced by {@link + * ColumnChunkPageWriteStore#drainForMicroRowGroups(int[])} once the in-memory column + * store has finished accumulating pages for a logical record-batch flush. + */ +final class MicroRowGroupColumnData { + final ColumnDescriptor descriptor; + final CompressionCodecName codec; + /** Dictionary page, or {@code null} if this column chunk has none. */ + final DictionaryPage dictionaryPage; + /** Page headers + compressed bodies concatenated in page order. Written verbatim to disk. */ + final BytesInput concatenatedPageBytes; + final long totalUncompressedSize; + final long totalCompressedSize; + final long totalValueCount; + /** + * Pre-populated builder (via the two-arg {@code add(compressedPageSize, rowCount)} variant) + * carrying per-page sizes and row counts. {@link ParquetFileWriter#writeMicroRowGroups} + * feeds it to {@code writeColumnChunk} for the shared chunk and then re-walks it via + * {@link OffsetIndexBuilder#getPageCount()} / {@link OffsetIndexBuilder#getRowCount(int)} + * / {@link OffsetIndexBuilder#getCompressedPageSize(int)} to compute per-micro-block + * page slices. + */ + final OffsetIndexBuilder offsetIndexBuilder; + + final Set rlEncodings; + final Set dlEncodings; + final List dataEncodings; + + MicroRowGroupColumnData( + ColumnDescriptor descriptor, + CompressionCodecName codec, + DictionaryPage dictionaryPage, + BytesInput concatenatedPageBytes, + long totalUncompressedSize, + long totalCompressedSize, + long totalValueCount, + OffsetIndexBuilder offsetIndexBuilder, + Set rlEncodings, + Set dlEncodings, + List dataEncodings) { + this.descriptor = descriptor; + this.codec = codec; + this.dictionaryPage = dictionaryPage; + this.concatenatedPageBytes = concatenatedPageBytes; + this.totalUncompressedSize = totalUncompressedSize; + this.totalCompressedSize = totalCompressedSize; + this.totalValueCount = totalValueCount; + this.offsetIndexBuilder = offsetIndexBuilder; + this.rlEncodings = rlEncodings; + this.dlEncodings = dlEncodings; + this.dataEncodings = dataEncodings; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 82f4577b83..bec8a9a377 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1563,6 +1563,205 @@ void writeColumnChunk( }); } + /** + * Writes one physical column-chunk group as a sequence of {@code K} logical + * {@link BlockMetaData} entries that share the chunk's physical bytes — the writer side + * of the Approach 2 (micro-row-group) format extension consumed by + * {@link ParquetFileReader#readNextRowGroup()} via + * {@code BlockMetaData.isApproach2()} dispatch. + * + *

What this method emits on disk: + *

    + *
  • One row-group-aligned region per call (mirroring {@link #startBlock}'s alignment).
  • + *
  • For each column, an optional dictionary page at a real file offset, followed by all + * data pages concatenated in page order (one physical column chunk per logical column).
  • + *
  • {@code K} entries in {@link #blocks} where each block's columns carry + * {@code firstDataPageOffset == ColumnChunkMetaData.SENTINEL_OFFSET} and a real + * {@code dictionaryPageOffset}, and each per-column {@link OffsetIndex} sidecar lists + * exactly the pages whose absolute row range overlaps that micro-block's window, with + * file-absolute {@code first_row_index} values.
  • + *
+ * + *

Encryption is not supported: this method throws if a {@code fileEncryptor} is + * configured, matching {@link + * org.apache.parquet.hadoop.metadata.ColumnChunkMetaData#isPhysicallyShared()} returning + * {@code false} for encrypted columns on the reader side. + * + *

Per-block statistics, per-block ColumnIndex, and bloom filters are intentionally + * empty/absent (prototype scope) — those become follow-ups for predicate pushdown over + * micro-row-groups. Per-block {@code valueCount} is approximated as the sum of per-page + * row counts in the slice, which is exact for non-repeated columns; repeated columns will + * report value counts equal to row counts (a known prototype limitation). + * + *

Boundary pages that straddle two adjacent micro-blocks are listed in both blocks' + * OffsetIndex; the reader's {@code SynchronizingColumnReader} trims rows that fall outside + * each block's absolute {@link + * org.apache.parquet.internal.filter2.columnindex.RowRanges} window. + * + * @param perColumn one entry per column in {@link #schema} order; column ordering is + * the caller's responsibility + * @param microRowGroupRowCounts row counts of the K micro-row-groups; must sum to the + * total row count of the shared physical chunk and all be positive + * @throws IOException on IO error + * @throws IllegalArgumentException if {@code microRowGroupRowCounts} is empty or contains + * non-positive entries + * @throws IllegalStateException if the writer has an encryptor configured + */ + public void writeMicroRowGroups(List perColumn, long[] microRowGroupRowCounts) + throws IOException { + if (fileEncryptor != null) { + throw new IllegalStateException( + "writeMicroRowGroups (Approach 2) does not support encryption in this prototype"); + } + if (microRowGroupRowCounts == null || microRowGroupRowCounts.length == 0) { + throw new IllegalArgumentException("microRowGroupRowCounts must be non-empty"); + } + long totalRows = 0; + for (long r : microRowGroupRowCounts) { + if (r <= 0) { + throw new IllegalArgumentException("micro-row-group row count must be positive: " + r); + } + totalRows += r; + } + + // Absolute row index at the start of this physical group — derived from preceding + // blocks just like the reader does in ParquetMetadataConverter.generateRowGroupOffsets. + long absRowStart = 0; + for (BlockMetaData prev : blocks) { + absRowStart += prev.getRowCount(); + } + + // Step 1: open a transient block, write each column's bytes via writeColumnChunk, then + // close the block. This produces one "wrong" BlockMetaData with real firstDataPageOffset + // values and one per-column OffsetIndex carrying file-absolute page byte offsets but + // block-relative first_row_index values. We pop and replace it below. + startBlock(totalRows); + for (MicroRowGroupColumnData col : perColumn) { + // SizeStatistics / GeospatialStatistics are constructed fresh and left invalid/empty + // — they're not used by the reader's Approach 2 path. + writeColumnChunk( + col.descriptor, + col.totalValueCount, + col.codec, + col.dictionaryPage, + col.concatenatedPageBytes, + col.totalUncompressedSize, + col.totalCompressedSize, + Statistics.getBuilderForReading(col.descriptor.getPrimitiveType()).build(), + SizeStatistics.newBuilder( + col.descriptor.getPrimitiveType(), + col.descriptor.getMaxRepetitionLevel(), + col.descriptor.getMaxDefinitionLevel()) + .build(), + GeospatialStatistics.newBuilder(col.descriptor.getPrimitiveType()) + .build(), + ColumnIndexBuilder.getNoOpBuilder(), + col.offsetIndexBuilder, + null /* bloomFilter */, + col.rlEncodings, + col.dlEncodings, + col.dataEncodings); + } + endBlock(); + + // Pop the transient block; its data feeds the K replacements. + BlockMetaData sharedBlock = blocks.remove(blocks.size() - 1); + List sharedOffsetIndexes = offsetIndexes.remove(offsetIndexes.size() - 1); + columnIndexes.remove(columnIndexes.size() - 1); + bloomFilters.remove(bloomFilters.size() - 1); + + List sharedColumns = sharedBlock.getColumns(); + int numColumns = sharedColumns.size(); + + // Step 2: emit K micro-blocks. For each, per column: slice pages whose [absRowStart + + // pageFirstRowIndex, ...) overlaps the micro-block's absolute row window, build a fresh + // OffsetIndex with absolute first_row_index values, and construct a ColumnChunkMetaData + // with firstDataPageOffset = SENTINEL_OFFSET and the real shared dictionaryPageOffset. + long blockRelStart = 0; + for (int m = 0; m < microRowGroupRowCounts.length; m++) { + long rowCountM = microRowGroupRowCounts[m]; + long blockRelEnd = blockRelStart + rowCountM; + long absStartM = absRowStart + blockRelStart; + + BlockMetaData microBlock = new BlockMetaData(); + microBlock.setRowCount(rowCountM); + microBlock.setOrdinal(blocks.size()); + + List microOIList = new ArrayList<>(numColumns); + List microCIList = new ArrayList<>(numColumns); + long microBlockTotalByteSize = 0; + + for (int c = 0; c < numColumns; c++) { + ColumnChunkMetaData sharedCcm = sharedColumns.get(c); + OffsetIndex sharedOI = sharedOffsetIndexes.get(c); + MicroRowGroupColumnData colDesc = perColumn.get(c); + + OffsetIndexBuilder microOIB = OffsetIndexBuilder.getBuilder(); + long valueCountSlice = 0; + long compressedSlice = 0; + int pageCount = sharedOI.getPageCount(); + for (int p = 0; p < pageCount; p++) { + long pageRelStart = sharedOI.getFirstRowIndex(p); + long pageRelEnd = + (p + 1 < pageCount) ? sharedOI.getFirstRowIndex(p + 1) : totalRows; + if (pageRelEnd <= blockRelStart) { + continue; + } + if (pageRelStart >= blockRelEnd) { + break; + } + long absoluteFirstRowIndex = absRowStart + pageRelStart; + long pageOffset = sharedOI.getOffset(p); + int pageSize = sharedOI.getCompressedPageSize(p); + microOIB.add(pageOffset, pageSize, absoluteFirstRowIndex); + valueCountSlice += (pageRelEnd - pageRelStart); + compressedSlice += pageSize; + } + OffsetIndex microOI = microOIB.build(); + microOIList.add(microOI); + microCIList.add(null); + + // Distribute uncompressed total proportionally by compressed bytes (a rough + // estimate — these fields are informational on the Approach 2 read path). + long uncompressedSlice = sharedCcm.getTotalSize() == 0 + ? 0 + : Math.round(((double) compressedSlice / sharedCcm.getTotalSize()) + * sharedCcm.getTotalUncompressedSize()); + + ColumnChunkMetaData microCcm = ColumnChunkMetaData.get( + sharedCcm.getPath(), + sharedCcm.getPrimitiveType(), + sharedCcm.getCodec(), + sharedCcm.getEncodingStats(), + sharedCcm.getEncodings(), + Statistics.getBuilderForReading(sharedCcm.getPrimitiveType()).build(), + ColumnChunkMetaData.SENTINEL_OFFSET, + sharedCcm.getDictionaryPageOffset(), + valueCountSlice, + compressedSlice, + uncompressedSlice, + SizeStatistics.newBuilder( + colDesc.descriptor.getPrimitiveType(), + colDesc.descriptor.getMaxRepetitionLevel(), + colDesc.descriptor.getMaxDefinitionLevel()) + .build(), + GeospatialStatistics.newBuilder(colDesc.descriptor.getPrimitiveType()) + .build()); + microCcm.setRowGroupOrdinal(microBlock.getOrdinal()); + microBlock.addColumn(microCcm); + microBlockTotalByteSize += uncompressedSlice; + } + microBlock.setTotalByteSize(microBlockTotalByteSize); + + blocks.add(microBlock); + offsetIndexes.add(microOIList); + columnIndexes.add(microCIList); + bloomFilters.add(new HashMap<>()); + + blockRelStart = blockRelEnd; + } + } + /** * Overwrite the column total statistics. This special used when the column total statistics * is known while all the page statistics are invalid, for example when rewriting the column. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..fc5b07be8e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -161,6 +161,13 @@ public static enum JobSummaryLevel { public static final String BLOCK_ROW_COUNT_LIMIT = "parquet.block.row.count.limit"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + /** + * Approach 2 (micro-row-group) writer knob: target row count per logical + * {@code BlockMetaData} when emitting K logical blocks per physical column chunk. + * A value of {@code 0} (the default) selects the legacy single-block-per-flush path. + * See {@link org.apache.parquet.column.ParquetProperties#getMicroRowGroupRowCount()}. + */ + public static final String MICRO_ROW_GROUP_ROW_COUNT = "parquet.writer.micro-row-group.row-count"; public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled"; @@ -408,6 +415,18 @@ public static void setPageWriteChecksumEnabled(Configuration conf, boolean val) conf.setBoolean(PAGE_WRITE_CHECKSUM_ENABLED, val); } + public static void setMicroRowGroupRowCount(JobContext jobContext, long rowCount) { + setMicroRowGroupRowCount(getConfiguration(jobContext), rowCount); + } + + public static void setMicroRowGroupRowCount(Configuration conf, long rowCount) { + conf.setLong(MICRO_ROW_GROUP_ROW_COUNT, rowCount); + } + + static long getMicroRowGroupRowCount(Configuration conf) { + return conf.getLong(MICRO_ROW_GROUP_ROW_COUNT, ParquetProperties.DEFAULT_MICRO_ROW_GROUP_ROW_COUNT); + } + public static boolean getPageWriteChecksumEnabled(Configuration conf) { return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } @@ -526,7 +545,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withRowGroupRowCountLimit(getBlockRowCountLimit(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)) - .withStatisticsEnabled(getStatisticsEnabled(conf)); + .withStatisticsEnabled(getStatisticsEnabled(conf)) + .withMicroRowGroupRowCount(getMicroRowGroupRowCount(conf)); new ColumnConfigParser() .withColumnConfig( ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..8eeef27044 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -649,6 +649,24 @@ public SELF withPageRowCountLimit(int rowCount) { return self(); } + /** + * Opt in to the Approach 2 (micro-row-group) writer path: each record-batch flush + * produces one physical column chunk whose pages are split into multiple logical + * {@code BlockMetaData} entries of approximately this row count, all marked with + * {@code data_page_offset == ColumnChunkMetaData.SENTINEL_OFFSET}. Set to {@code 0} + * (the default) to use the legacy single-block-per-flush behavior. + * + *

Prototype limitations: encryption is unsupported, and per-block statistics / + * ColumnIndex / bloom filters are not emitted. + * + * @param rowCount target row count per logical micro-row-group, or {@code 0} to disable + * @return this builder for method chaining + */ + public SELF withMicroRowGroupRowCount(long rowCount) { + encodingPropsBuilder.withMicroRowGroupRowCount(rowCount); + return self(); + } + /** * Set the Parquet format dictionary page size used by the constructed * writer. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestApproach2WriterReadWrite.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestApproach2WriterReadWrite.java new file mode 100644 index 0000000000..7692108dc9 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestApproach2WriterReadWrite.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.Preconditions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * End-to-end round-trip test for the Approach 2 (micro-row-group) writer path. + * + *

Writes a small Parquet file via the public {@link ExampleParquetWriter} builder with + * {@code withMicroRowGroupRowCount(MICRO_SIZE)} set, which routes the flush through + * {@link InternalParquetRecordWriter}'s Approach 2 path and ultimately through + * {@link ParquetFileWriter#writeMicroRowGroups}. Then reopens the file and asserts: + *

    + *
  • The footer carries {@code K} blocks, each with {@link BlockMetaData#isApproach2()} + * returning {@code true} and {@link BlockMetaData#getRowIndexOffset()} equal to the + * cumulative row offset.
  • + *
  • Each column chunk in each block has + * {@code firstDataPageOffset == ColumnChunkMetaData.SENTINEL_OFFSET}.
  • + *
  • Each per-block per-column {@link OffsetIndex} reports + * {@link OffsetIndex#getFirstRowIndex(int)} values in absolute (file-global) row + * coordinates whose union covers the block's window.
  • + *
  • Reading records back via the high-level {@link ParquetReader} returns all rows + * in order with the expected INT32 values, which exercises the reader's Approach 2 + * dispatch in {@link ParquetFileReader#readNextRowGroup()}.
  • + *
+ */ +public class TestApproach2WriterReadWrite { + + /** Single-column INT32 schema is enough; multi-column is a follow-up. */ + private static final MessageType SCHEMA = Types.buildMessage() + .required(PrimitiveTypeName.INT32) + .named("value") + .named("test"); + + private static final int TOTAL_ROWS = 3500; + private static final long MICRO_SIZE = 1000L; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testApproach2RoundTrip() throws IOException { + Path path = newTempPath(); + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(SCHEMA, conf); + + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + // rowGroupSize is set high so the size-based flush never triggers within TOTAL_ROWS — + // the flush we exercise is the close()-triggered final flush, which gives us + // exactly one physical group of TOTAL_ROWS rows split into K micro-blocks. + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withRowGroupSize(1L << 24) + .withMicroRowGroupRowCount(MICRO_SIZE) + .build()) { + for (int i = 0; i < TOTAL_ROWS; i++) { + writer.write(factory.newGroup().append("value", i)); + } + } + + // Footer assertions + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + ParquetMetadata footer = reader.getFooter(); + // ceil(3500 / 1000) = 4 micro-blocks: 1000 / 1000 / 1000 / 500 + assertEquals("expected 4 micro-row-groups", 4, footer.getBlocks().size()); + + long[] expectedRowCounts = {1000L, 1000L, 1000L, 500L}; + long expectedAbsRowOffset = 0L; + for (int b = 0; b < footer.getBlocks().size(); b++) { + BlockMetaData block = footer.getBlocks().get(b); + assertEquals("block " + b + " rowCount", expectedRowCounts[b], block.getRowCount()); + assertTrue("block " + b + " isApproach2", block.isApproach2()); + assertEquals( + "block " + b + " rowIndexOffset", + expectedAbsRowOffset, + block.getRowIndexOffset()); + + assertEquals("block " + b + " column count", 1, block.getColumns().size()); + ColumnChunkMetaData col = block.getColumns().get(0); + assertEquals( + "block " + b + " column firstDataPageOffset is SENTINEL", + ColumnChunkMetaData.SENTINEL_OFFSET, + col.getFirstDataPageOffset()); + assertTrue( + "block " + b + " column isPhysicallyShared", + col.isPhysicallyShared()); + + // Each micro-block's OffsetIndex must list at least one page, and every page + // listed must start strictly before the block's absolute end (otherwise the page + // could not contribute any rows to this block). Boundary pages from the previous + // block may start before this block's absolute start — that is intentional. + OffsetIndex oi = reader.readOffsetIndex(col); + assertNotNull("block " + b + " offset index", oi); + assertTrue("block " + b + " has pages", oi.getPageCount() >= 1); + long blockAbsEnd = expectedAbsRowOffset + expectedRowCounts[b]; + for (int p = 0; p < oi.getPageCount(); p++) { + long absFirstRow = oi.getFirstRowIndex(p); + assertTrue( + "block " + b + " page " + p + " first_row_index < block end", + absFirstRow < blockAbsEnd); + } + + expectedAbsRowOffset += expectedRowCounts[b]; + } + } + + // Record-level round-trip via the high-level reader: this exercises + // ParquetFileReader.readNextRowGroup()'s Approach 2 dispatch transparently. + int seen = 0; + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) { + Group g; + while ((g = reader.read()) != null) { + assertEquals("row " + seen, seen, g.getInteger("value", 0)); + seen++; + } + } + assertEquals("total rows", TOTAL_ROWS, seen); + } + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return new Path(file.getAbsolutePath()); + } +}