Measures the performance of {@link CompressionCodecFactory.BytesInputCompressor} + * and {@link CompressionCodecFactory.BytesInputDecompressor} for each supported codec, + * comparing the heap-based {@link CodecFactory} path (what all production users take) + * against the direct-memory {@code DirectCodecFactory} path (off-heap ByteBuffers). + * + *
This benchmark isolates the codec hot path from file I/O, encoding, and other + * Parquet overhead, making it ideal for measuring compression-specific optimizations. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(1) +@Warmup(iterations = 2, time = 1) +@Measurement(iterations = 3, time = 2) +@State(Scope.Thread) +public class CompressionBenchmark { + + @Param({"SNAPPY", "ZSTD", "LZ4_RAW", "GZIP", "BROTLI", "LZO"}) + public String codec; + + @Param({"65536", "131072", "262144", "1048576"}) + public int pageSize; + + @Param({"HEAP", "DIRECT"}) + public String factoryType; + + private byte[] uncompressedData; + private byte[] compressedData; + private int decompressedSize; + + private CompressionCodecFactory.BytesInputCompressor compressor; + private CompressionCodecFactory.BytesInputDecompressor decompressor; + private CodecFactory factory; + + @Setup(Level.Trial) + public void setup() throws IOException { + uncompressedData = generatePageData(pageSize, 42L); + decompressedSize = uncompressedData.length; + + Configuration conf = new Configuration(); + if ("DIRECT".equals(factoryType)) { + factory = CodecFactory.createDirectCodecFactory(conf, DirectByteBufferAllocator.getInstance(), pageSize); + } else { + factory = new CodecFactory(conf, pageSize); + } + CompressionCodecName codecName = CompressionCodecName.valueOf(codec); + + compressor = factory.getCompressor(codecName); + decompressor = factory.getDecompressor(codecName); + + // Pre-compress for decompression benchmark; copy to a stable byte array + // since the compressor may reuse its internal buffer. + BytesInput compressed = compressor.compress(BytesInput.from(uncompressedData)); + compressedData = compressed.toByteArray(); + } + + @TearDown(Level.Trial) + public void tearDown() { + factory.release(); + } + + @Benchmark + public BytesInput compress() throws IOException { + return compressor.compress(BytesInput.from(uncompressedData)); + } + + @Benchmark + public byte[] decompress() throws IOException { + // Force materialization of the decompressed data. Without this, codecs using + // the stream-based HeapBytesDecompressor (e.g. GZIP) would return a lazy + // StreamBytesInput, deferring the actual work. toByteArray() is essentially + // free for our optimized implementations (returns the existing byte[]). + return decompressor + .decompress(BytesInput.from(compressedData), decompressedSize) + .toByteArray(); + } + + /** + * Generates byte data that approximates realistic Parquet page content. + * Mixes sequential runs, repeated values, low-range random, and full random + * to produce a realistic compression ratio (~2-4x for fast codecs). + */ + static byte[] generatePageData(int size, long seed) { + Random random = new Random(seed); + byte[] data = new byte[size]; + int i = 0; + while (i < size) { + int patternType = random.nextInt(4); + int chunkSize = Math.min(random.nextInt(256) + 64, size - i); + switch (patternType) { + case 0: // Sequential bytes (highly compressible) + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = (byte) (j & 0xFF); + } + break; + case 1: // Repeated value (highly compressible) + byte val = (byte) random.nextInt(256); + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = val; + } + break; + case 2: // Small range random (moderately compressible) + for (int j = 0; j < chunkSize && i < size; j++) { + data[i++] = (byte) random.nextInt(16); + } + break; + case 3: // Full random (low compressibility) + byte[] randomChunk = new byte[chunkSize]; + random.nextBytes(randomChunk); + int toCopy = Math.min(chunkSize, size - i); + System.arraycopy(randomChunk, 0, data, i, toCopy); + i += toCopy; + break; + } + } + return data; + } +} diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java new file mode 100644 index 0000000000..de94b422cf --- /dev/null +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ConcurrentReadWriteBenchmark.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.benchmarks; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Multi-threaded benchmarks measuring independent read and write throughput under + * concurrency. Uses {@code @Threads(4)} by default (overridable via JMH {@code -t} flag). + * + *
This benchmark does not assert correctness; it measures the cost of each thread + * writing a full file to a stateless sink or reading a shared pre-generated file. + * The set of rows used by {@link #concurrentWrite(Blackhole)} is built once during + * setup and shared (read-only) across all threads, so the timed section measures + * the encoder/serializer pipeline rather than per-row data construction. + * + *
{@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full file write or read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows)
+ * that JIT amortization across invocations is unnecessary.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+@Warmup(iterations = 2, batchSize = 1)
+@Measurement(iterations = 5, batchSize = 1)
+@Threads(4)
+@State(Scope.Benchmark)
+public class ConcurrentReadWriteBenchmark {
+
+ private File tempFile;
+ private Group[] rows;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+
+ // Generate a shared file for concurrent reads
+ tempFile = File.createTempFile("parquet-concurrent-bench-", ".parquet");
+ tempFile.deleteOnExit();
+ tempFile.delete();
+
+ try (ParquetWriter Parameterized across compression codec and writer version. For end-to-end benchmarks
+ * that include filesystem I/O, see {@link FileReadBenchmark}.
+ *
+ * {@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary. Ten measurement iterations
+ * provide stable statistics for SS mode.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 5, batchSize = 1)
+@Measurement(iterations = 10, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class CpuReadBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP", "LZ4_RAW", "BROTLI", "LZO"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ private byte[] fileBytes;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ Group[] rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ InMemoryOutputFile outputFile = new InMemoryOutputFile();
+ try (ParquetWriter Writes are sent to a {@link BlackHoleOutputFile} that discards all bytes, so the
+ * results reflect pure CPU cost (encoding, compression, index generation) without any
+ * filesystem noise. For end-to-end benchmarks that include filesystem I/O, see
+ * {@link FileWriteBenchmark}.
+ *
+ * Parameterized across compression codec, writer version, dictionary encoding,
+ * row-group block size, and data page size. Block size controls how many rows accumulate
+ * before a row-group flush (triggering encoding, compression, and index generation).
+ * Page size controls the unit of encoding and compression within a column chunk. Use JMH
+ * {@code -p blockSize=...} and {@code -p pageSize=...} to select specific combinations
+ * and avoid the full cross-product when not needed.
+ *
+ * {@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full write of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary. Ten measurement iterations
+ * provide stable statistics for SS mode.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 5, batchSize = 1)
+@Measurement(iterations = 10, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class CpuWriteBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP", "LZ4_RAW", "BROTLI", "LZO"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ @Param({"true", "false"})
+ public String dictionary;
+
+ // Row-group block size in bytes: 128 MB (default), 256 MB (common production), 512 MB (stress)
+ @Param({"134217728", "268435456", "536870912"})
+ public int blockSize;
+
+ // Data page size in bytes: 1 MB (default), 4 MB (reduced overhead), 8 MB (max throughput)
+ @Param({"1048576", "4194304", "8388608"})
+ public int pageSize;
+
+ private Group[] rows;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ }
+
+ @Benchmark
+ public void writeFile() throws IOException {
+ try (ParquetWriter Parameterized across compression codec and writer version. The footer parse
+ * (via {@link LocalInputFile} open) is included in the timed section so the result
+ * reflects the full open-and-read cost a typical caller would observe.
+ *
+ * {@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full read of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary. Ten measurement iterations
+ * provide stable statistics for SS mode.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 5, batchSize = 1)
+@Measurement(iterations = 10, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class FileReadBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP", "LZ4_RAW", "BROTLI", "LZO"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ private File tempFile;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ tempFile = File.createTempFile("parquet-read-bench-", ".parquet");
+ tempFile.deleteOnExit();
+ tempFile.delete(); // remove so the writer can create it
+
+ Group[] rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ try (ParquetWriter Writes are sent to a {@link BlackHoleOutputFile} to isolate CPU and encoding cost
+ * from filesystem I/O. Parameterized across compression codec, writer version, and
+ * dictionary encoding.
+ *
+ * {@link Mode#SingleShotTime} is used because each invocation does enough work
+ * (a full write of {@value TestDataFactory#DEFAULT_ROW_COUNT} rows) that JIT
+ * amortization across invocations is unnecessary. Ten measurement iterations
+ * provide stable statistics for SS mode.
+ */
+@BenchmarkMode(Mode.SingleShotTime)
+@Fork(1)
+@Warmup(iterations = 5, batchSize = 1)
+@Measurement(iterations = 10, batchSize = 1)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class FileWriteBenchmark {
+
+ @Param({"UNCOMPRESSED", "SNAPPY", "ZSTD", "GZIP", "LZ4_RAW", "BROTLI", "LZO"})
+ public String codec;
+
+ @Param({"PARQUET_1_0", "PARQUET_2_0"})
+ public String writerVersion;
+
+ @Param({"true", "false"})
+ public String dictionary;
+
+ private Group[] rows;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ rows = TestDataFactory.generateRows(
+ TestDataFactory.newGroupFactory(), TestDataFactory.DEFAULT_ROW_COUNT, TestDataFactory.DEFAULT_SEED);
+ }
+
+ @Benchmark
+ public void writeFile() throws IOException {
+ try (ParquetWriter
Uses {@code Encoder.compress(byte[], Encoder.Parameters)} for compression and + * {@code Decoder.decompress(byte[], int, int)} for decompression — the latter returns + * {@code byte[]} directly and avoids loading {@code DirectDecompress} which references + * {@code io.netty.buffer.ByteBuf} (optional Netty dependency not on our classpath). + */ + static final class Brotli4j { + static final boolean AVAILABLE; + // Encoder.compress(byte[], Object/*Encoder.Parameters*/) -> byte[] + private static final Method COMPRESS; + // Decoder.decompress(byte[], int/*offset*/, int/*length*/) -> byte[] + private static final Method DECOMPRESS; + // Encoder.Parameters class + private static final Class> PARAMS_CLASS; + // Encoder.Parameters.setQuality(int) -> Encoder.Parameters + private static final Method SET_QUALITY; + + static { + boolean loaded = false; + Method compress = null, decompress = null, setQuality = null; + Class> paramsClass = null; + try { + // Load native library + Class> loader = Class.forName("com.aayushatharva.brotli4j.Brotli4jLoader"); + loader.getMethod("ensureAvailability").invoke(null); + + // Encoder.compress(byte[], Encoder.Parameters) -> byte[] + paramsClass = Class.forName("com.aayushatharva.brotli4j.encoder.Encoder$Parameters"); + Class> encoder = Class.forName("com.aayushatharva.brotli4j.encoder.Encoder"); + compress = encoder.getMethod("compress", byte[].class, paramsClass); + + // Decoder.decompress(byte[], int, int) -> byte[] + // This avoids loading DirectDecompress which references io.netty.buffer.ByteBuf + Class> decoder = Class.forName("com.aayushatharva.brotli4j.decoder.Decoder"); + decompress = decoder.getMethod("decompress", byte[].class, int.class, int.class); + + // Encoder.Parameters.setQuality(int) -> Encoder.Parameters + setQuality = paramsClass.getMethod("setQuality", int.class); + + loaded = true; + } catch (Throwable t) { + // brotli4j not available — BROTLI will fall through to Hadoop codec path + LOG.info("brotli4j not available, BROTLI codec will use Hadoop codec path: {}", t.toString()); + } + AVAILABLE = loaded; + COMPRESS = compress; + DECOMPRESS = decompress; + PARAMS_CLASS = paramsClass; + SET_QUALITY = setQuality; + } + + /** Create an {@code Encoder.Parameters} instance with the given quality. */ + static Object newParams(int quality) { + try { + Object params = PARAMS_CLASS.getConstructor().newInstance(); + SET_QUALITY.invoke(params, quality); + return params; + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to create Brotli encoder parameters", e); + } + } + + /** Compress using {@code Encoder.compress(byte[], Encoder.Parameters)}. */ + static byte[] compress(byte[] input, Object params) throws IOException { + try { + return (byte[]) COMPRESS.invoke(null, input, params); + } catch (ReflectiveOperationException e) { + throw new IOException("Brotli compression failed", e); + } + } + + /** Decompress using {@code Decoder.decompress(byte[], offset, length)}. */ + static byte[] decompress(byte[] input) throws IOException { + try { + return (byte[]) DECOMPRESS.invoke(null, input, 0, input.length); + } catch (ReflectiveOperationException e) { + throw new IOException("Brotli decompression failed", e); + } + } + } + static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() { @Override public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) { @@ -170,18 +271,7 @@ public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOEx decompressor.reset(); } InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); - - // Eagerly materialize the decompressed stream for codecs that require all input in a single buffer. - // ZSTD: releases off-heap resources early to avoid fragmentation (see parquet-format#398). - // LZ4_RAW: requires one-shot decompression; the lazy StreamBytesInput.writeInto() path reads via - // Channels.newChannel() in ~8KB chunks, causing the decompressor to be called with an undersized - // output buffer (see #3478). - if (codec instanceof ZstandardCodec || codec instanceof Lz4RawCodec) { - decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize)); - is.close(); - } else { - decompressed = BytesInput.from(is, decompressedSize); - } + decompressed = BytesInput.from(is, decompressedSize); return decompressed; } @@ -271,13 +361,61 @@ public BytesDecompressor getDecompressor(CompressionCodecName codecName) { } protected BytesCompressor createCompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + switch (codecName) { + case UNCOMPRESSED: + return NO_OP_COMPRESSOR; + case SNAPPY: + return new SnappyBytesCompressor(); + case ZSTD: + return new ZstdBytesCompressor( + conf.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, + ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL), + conf.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, + ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); + case LZ4_RAW: + return new Lz4RawBytesCompressor(); + case GZIP: + int gzipLevel = conf.getInt("zlib.compress.level", Deflater.DEFAULT_COMPRESSION); + return new GzipBytesCompressor(gzipLevel, pageSize); + case LZO: + return new LzoBytesCompressor(pageSize); + case BROTLI: + if (Brotli4j.AVAILABLE) { + int brotliQuality = conf.getInt("compression.brotli.quality", 1); + return new BrotliBytesCompressor(brotliQuality); + } + // fall through to Hadoop codec path + default: + CompressionCodec codec = getCodec(codecName); + return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + } } protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); + switch (codecName) { + case UNCOMPRESSED: + return NO_OP_DECOMPRESSOR; + case SNAPPY: + return new SnappyBytesDecompressor(); + case ZSTD: + return new ZstdBytesDecompressor(); + case LZ4_RAW: + return new Lz4RawBytesDecompressor(); + case GZIP: + return new GzipBytesDecompressor(); + case LZO: + return new LzoBytesDecompressor(); + case BROTLI: + if (Brotli4j.AVAILABLE) { + return new BrotliBytesDecompressor(); + } + // fall through to Hadoop codec path + default: + CompressionCodec codec = getCodec(codecName); + return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); + } } /** @@ -315,15 +453,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) { private String cacheKey(CompressionCodecName codecName) { String level = null; switch (codecName) { - case GZIP: - level = conf.get("zlib.compress.level"); - break; case BROTLI: level = conf.get("compression.brotli.quality"); break; - case ZSTD: - level = conf.get("parquet.compression.codec.zstd.level"); - break; default: // compression level is not supported; ignore it } @@ -367,4 +499,526 @@ public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer public abstract void release(); } + + // ---- Optimized Snappy compressor/decompressor using direct JNI calls ---- + + /** + * Compresses using Snappy's byte-array JNI API directly, bypassing the Hadoop + * stream abstraction. This avoids intermediate direct ByteBuffer copies and + * reduces the compression to a single native call per page. + */ + static class SnappyBytesCompressor extends BytesCompressor { + private byte[] outputBuffer; + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + int maxLen = Snappy.maxCompressedLength(input.length); + if (outputBuffer == null || outputBuffer.length < maxLen) { + outputBuffer = new byte[maxLen]; + } + int compressed = Snappy.compress(input, 0, input.length, outputBuffer, 0); + return BytesInput.from(outputBuffer, 0, compressed); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.SNAPPY; + } + + @Override + public void release() { + outputBuffer = null; + } + } + + /** + * Decompresses using Snappy's JNI API directly. The {@link ByteBuffer} overload uses + * {@link Snappy#uncompress(ByteBuffer, ByteBuffer)} which, for direct buffers, passes + * native memory addresses straight to the snappy library with no JNI array pinning or + * intermediate copies. + */ + static class SnappyBytesDecompressor extends BytesDecompressor { + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + byte[] input = bytes.toByteArray(); + byte[] output = new byte[decompressedSize]; + Snappy.uncompress(input, 0, input.length, output, 0); + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + int origInputLimit = input.limit(); + input.limit(input.position() + compressedSize); + int origOutputLimit = output.limit(); + output.limit(output.position() + decompressedSize); + // Use slices so native API works on independent buffers; advance positions manually. + Snappy.uncompress(input.slice(), output.slice()); + input.position(input.limit()); + input.limit(origInputLimit); + output.position(output.limit()); + output.limit(origOutputLimit); + } + + @Override + public void release() {} + } + + // ---- Optimized ZSTD compressor/decompressor using zstd-jni context API directly ---- + + /** + * Compresses using a reusable {@link ZstdCompressCtx}, bypassing the Hadoop codec + * framework ({@code ZstandardCodec}, {@code CodecPool}, {@code CompressionOutputStream} + * wrapper). The context is created once at construction and reused across calls, + * avoiding per-call JNI context creation, internal buffer allocation, and Java stream + * overhead. This is 1.5-3.4x faster than the streaming approach for typical Parquet + * page sizes (64KB-1MB). Multi-threaded compression via {@code workers > 0} is + * supported through {@link ZstdCompressCtx#setWorkers(int)}. + */ + static class ZstdBytesCompressor extends BytesCompressor { + private final ZstdCompressCtx context; + private byte[] outputBuffer; + + ZstdBytesCompressor(int level, int workers) { + this.context = new ZstdCompressCtx(); + this.context.setLevel(level); + if (workers > 0) { + this.context.setWorkers(workers); + } + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + int maxLen = (int) Zstd.compressBound(input.length); + if (outputBuffer == null || outputBuffer.length < maxLen) { + outputBuffer = new byte[maxLen]; + } + int compressed = context.compressByteArray(outputBuffer, 0, outputBuffer.length, input, 0, input.length); + return BytesInput.from(outputBuffer, 0, compressed); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.ZSTD; + } + + @Override + public void release() { + context.close(); + outputBuffer = null; + } + } + + /** + * Decompresses using a reusable {@link ZstdDecompressCtx}, bypassing the Hadoop + * codec framework. The context is created once at construction and reused across + * calls, avoiding per-call JNI context creation, internal buffer allocation, and + * Java stream overhead. The {@link ByteBuffer} overload uses + * {@link Zstd#decompress(ByteBuffer, ByteBuffer)} to pass buffers directly to the + * native library without intermediate copies. + */ + static class ZstdBytesDecompressor extends BytesDecompressor { + private final ZstdDecompressCtx context; + + ZstdBytesDecompressor() { + this.context = new ZstdDecompressCtx(); + } + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + byte[] input = bytes.toByteArray(); + byte[] output = new byte[decompressedSize]; + int decompressed = context.decompressByteArray(output, 0, decompressedSize, input, 0, input.length); + if (decompressed != decompressedSize) { + throw new IOException("Unexpected decompressed size: " + decompressed + " != " + decompressedSize); + } + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + int origInputLimit = input.limit(); + input.limit(input.position() + compressedSize); + int origOutputLimit = output.limit(); + output.limit(output.position() + decompressedSize); + // Zstd.decompress uses (dst, src) parameter order, matching the native zstd convention. + // Use slices so native API works on independent buffers; advance positions manually. + Zstd.decompress(output.slice(), input.slice()); + input.position(input.limit()); + input.limit(origInputLimit); + output.position(output.limit()); + output.limit(origOutputLimit); + } + + @Override + public void release() { + context.close(); + } + } + + // ---- Optimized LZ4_RAW compressor/decompressor using airlift LZ4 directly ---- + + /** + * Compresses using airlift's LZ4 compressor directly with heap ByteBuffers, + * bypassing the Hadoop stream abstraction and NonBlockedCompressor's direct + * buffer copies. + */ + static class Lz4RawBytesCompressor extends BytesCompressor { + private final Lz4Compressor compressor = new Lz4Compressor(); + private ByteBuffer directInputBuf; + private ByteBuffer directOutputBuf; + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + int maxLen = compressor.maxCompressedLength(input.length); + + // Grow reusable direct input buffer if needed + if (directInputBuf == null || directInputBuf.capacity() < input.length) { + directInputBuf = ByteBuffer.allocateDirect(input.length); + } + directInputBuf.clear(); + directInputBuf.put(input); + directInputBuf.flip(); + + // Grow reusable direct output buffer if needed + if (directOutputBuf == null || directOutputBuf.capacity() < maxLen) { + directOutputBuf = ByteBuffer.allocateDirect(maxLen); + } + directOutputBuf.clear(); + + compressor.compress(directInputBuf, directOutputBuf); + int compressedSize = directOutputBuf.position(); + + // Copy result to heap byte array + directOutputBuf.flip(); + byte[] output = new byte[compressedSize]; + directOutputBuf.get(output); + return BytesInput.from(output, 0, compressedSize); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.LZ4_RAW; + } + + @Override + public void release() { + directInputBuf = null; + directOutputBuf = null; + } + } + + /** + * Decompresses using airlift's LZ4 decompressor with reusable direct ByteBuffers. + * Aircompressor's LZ4 native implementation is significantly faster (~25%) on direct + * (off-heap) buffers because it can use raw native pointers via Unsafe, avoiding the + * overhead of JNI array pinning required for heap-backed buffers. The cost of copying + * data to/from the reusable direct buffers is more than offset by the faster native + * decompression, especially at typical Parquet page sizes (64KB-1MB). + */ + static class Lz4RawBytesDecompressor extends BytesDecompressor { + private final Lz4Decompressor decompressor = new Lz4Decompressor(); + private ByteBuffer directInputBuf; + private ByteBuffer directOutputBuf; + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + int inputSize = Math.toIntExact(bytes.size()); + + // Grow reusable direct input buffer if needed + if (directInputBuf == null || directInputBuf.capacity() < inputSize) { + directInputBuf = ByteBuffer.allocateDirect(inputSize); + } + directInputBuf.clear().limit(inputSize); + // toByteArray() is zero-copy for ByteArrayBytesInput (returns backing array directly) + directInputBuf.put(bytes.toByteArray(), 0, inputSize); + directInputBuf.flip(); + + // Grow reusable direct output buffer if needed + if (directOutputBuf == null || directOutputBuf.capacity() < decompressedSize) { + directOutputBuf = ByteBuffer.allocateDirect(decompressedSize); + } + directOutputBuf.clear().limit(decompressedSize); + + decompressor.decompress(directInputBuf.slice(), directOutputBuf.slice()); + + // Copy result to heap — returning a ByteArrayBytesInput allows callers to + // get the byte[] via toByteArray() without an additional copy. + byte[] output = new byte[decompressedSize]; + directOutputBuf.position(0).limit(decompressedSize); + directOutputBuf.get(output); + return BytesInput.from(output); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + int origInputLimit = input.limit(); + input.limit(input.position() + compressedSize); + int origOutputLimit = output.limit(); + output.limit(output.position() + decompressedSize); + // Use slices so native API works on independent buffers; advance positions manually. + decompressor.decompress(input.slice(), output.slice()); + input.position(input.limit()); + input.limit(origInputLimit); + output.position(output.limit()); + output.limit(origOutputLimit); + } + + @Override + public void release() { + directInputBuf = null; + directOutputBuf = null; + } + } + + // ---- Optimized GZIP compressor/decompressor using JDK GZIPOutputStream/GZIPInputStream directly ---- + + /** + * Compresses using {@link GZIPOutputStream} directly, bypassing Hadoop's + * GzipCodec and the associated codec pool / stream wrapper overhead. + * + *
Note: this implementation always uses Java's built-in zlib via + * {@link GZIPOutputStream}. It does not use Hadoop native libraries, + * so hardware-accelerated compression via Intel ISA-L will not be used even if + * the native libraries are installed. The overhead reduction from bypassing the + * Hadoop codec framework typically outweighs the ISA-L advantage for the page + * sizes used by Parquet. + */ + static class GzipBytesCompressor extends BytesCompressor { + private final int level; + private final ByteArrayOutputStream baos; + + GzipBytesCompressor(int level, int pageSize) { + this.level = level; + this.baos = new ByteArrayOutputStream(pageSize); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + baos.reset(); + try (GZIPOutputStream gos = new GZIPOutputStream(baos) { + { + def.setLevel(level); + } + }) { + bytes.writeAllTo(gos); + } + return BytesInput.from(baos); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.GZIP; + } + + @Override + public void release() {} + } + + /** + * Decompresses using {@link GZIPInputStream} directly, bypassing Hadoop's + * GzipCodec and the associated codec pool / stream wrapper overhead. + * CRC32 and size verification is handled by the JDK implementation. + * + *
Note: this implementation always uses Java's built-in zlib via + * {@link GZIPInputStream}. It does not use Hadoop native libraries, + * so hardware-accelerated decompression via Intel ISA-L will not be used even if + * the native libraries are installed. + */ + static class GzipBytesDecompressor extends BytesDecompressor { + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + try (GZIPInputStream gis = new GZIPInputStream(bytes.toInputStream())) { + byte[] output = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = gis.read(output, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of GZIP stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + return BytesInput.from(output); + } + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + // Wrap the input ByteBuffer slice in an InputStream to avoid allocating a temp byte array. + // GZIPInputStream is stream-based so we still need a temp output array. + ByteBuffer inputSlice = input.slice(); + inputSlice.limit(compressedSize); + try (GZIPInputStream gis = new GZIPInputStream(ByteBufferInputStream.wrap(inputSlice))) { + byte[] outputBytes = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = gis.read(outputBytes, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of GZIP stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + output.put(outputBytes); + } + input.position(input.position() + compressedSize); + } + + @Override + public void release() {} + } + + // ---- Optimized LZO compressor/decompressor using aircompressor's Hadoop-framed LZO directly ---- + + /** + * Compresses using aircompressor's LZO Hadoop-framed streams directly, + * bypassing the GPL-licensed {@code com.hadoop.compression.lzo.LzoCodec} and + * the associated Hadoop codec pool / stream wrapper overhead. The framing + * format (big-endian length-prefixed blocks) is wire-compatible with Hadoop's + * LzoCodec, so files produced by this compressor are readable by any standard + * Parquet reader. + */ + static class LzoBytesCompressor extends BytesCompressor { + private static final LzoHadoopStreams LZO_STREAMS = new LzoHadoopStreams(); + private final ByteArrayOutputStream baos; + + LzoBytesCompressor(int pageSize) { + this.baos = new ByteArrayOutputStream(pageSize); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + baos.reset(); + try (OutputStream los = LZO_STREAMS.createOutputStream(baos)) { + bytes.writeAllTo(los); + } + return BytesInput.from(baos); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.LZO; + } + + @Override + public void release() {} + } + + /** + * Decompresses using aircompressor's LZO Hadoop-framed streams directly, + * bypassing the GPL-licensed Hadoop LzoCodec. Reads the same big-endian + * length-prefixed block framing that Hadoop's LzoCodec produces. + */ + static class LzoBytesDecompressor extends BytesDecompressor { + private static final LzoHadoopStreams LZO_STREAMS = new LzoHadoopStreams(); + + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + try (InputStream lis = LZO_STREAMS.createInputStream(bytes.toInputStream())) { + byte[] output = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = lis.read(output, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of LZO stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + return BytesInput.from(output); + } + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + ByteBuffer inputSlice = input.slice(); + inputSlice.limit(compressedSize); + try (InputStream lis = LZO_STREAMS.createInputStream(ByteBufferInputStream.wrap(inputSlice))) { + byte[] outputBytes = new byte[decompressedSize]; + int offset = 0; + while (offset < decompressedSize) { + int read = lis.read(outputBytes, offset, decompressedSize - offset); + if (read < 0) { + throw new IOException( + "Unexpected end of LZO stream at offset " + offset + " of " + decompressedSize); + } + offset += read; + } + output.put(outputBytes); + } + input.position(input.position() + compressedSize); + } + + @Override + public void release() {} + } + + /** + * Brotli compressor using brotli4j ({@code com.aayushatharva.brotli4j}) via reflection. + * Single-call byte-array API — no streaming overhead. Default quality=1 + * matches the old jbrotli default and gives a good speed/ratio trade-off. + */ + static class BrotliBytesCompressor extends BytesCompressor { + private final Object params; + + BrotliBytesCompressor(int quality) { + this.params = Brotli4j.newParams(quality); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + byte[] input = bytes.toByteArray(); + byte[] compressed = Brotli4j.compress(input, params); + return BytesInput.from(compressed); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.BROTLI; + } + + @Override + public void release() {} + } + + /** + * Brotli decompressor using brotli4j ({@code com.aayushatharva.brotli4j}) via reflection. + * Single-call byte-array API. For the ByteBuffer overload the input slice + * is copied to a heap array, decompressed, and the result put into the + * output buffer — Brotli is slow enough that the copy overhead is negligible. + */ + static class BrotliBytesDecompressor extends BytesDecompressor { + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + byte[] compressed = bytes.toByteArray(); + byte[] decompressed = Brotli4j.decompress(compressed); + return BytesInput.from(decompressed); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) + throws IOException { + ByteBuffer inputSlice = input.slice(); + inputSlice.limit(compressedSize); + byte[] compressedBytes = new byte[compressedSize]; + inputSlice.get(compressedBytes); + + byte[] decompressed = Brotli4j.decompress(compressedBytes); + output.put(decompressed); + input.position(input.position() + compressedSize); + } + + @Override + public void release() {} + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index b2b5233eeb..e6bc6891e8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -103,8 +103,14 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) return new SnappyCompressor(); case ZSTD: return new ZstdCompressor(); - // todo: create class similar to the SnappyCompressor for zlib and exclude it as - // snappy is above since it also generates allocateDirect calls. + case LZ4_RAW: + return new Lz4RawCompressor(); + case BROTLI: + if (Brotli4j.AVAILABLE) { + return new BrotliDirectCompressor(); + } + return super.createCompressor(codecName); + case LZO: default: return super.createCompressor(codecName); } @@ -117,6 +123,17 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN return new SnappyDecompressor(); case ZSTD: return new ZstdDecompressor(); + case LZ4_RAW: + return new Lz4RawDecompressor(); + case BROTLI: + if (Brotli4j.AVAILABLE) { + return new BrotliDirectDecompressor(); + } + // fall through to super (which also checks Brotli4j, then Hadoop codec) + case GZIP: + case LZO: + case UNCOMPRESSED: + return super.createDecompressor(codecName); default: CompressionCodec codec = getCodec(codecName); if (codec == null) { @@ -405,6 +422,26 @@ void closeDecompressor() { } } + /** + * Direct-memory LZ4_RAW decompressor using airlift's LZ4 decompressor with + * direct ByteBuffers, avoiding reflection-based {@link FullDirectDecompressor}. + */ + private class Lz4RawDecompressor extends BaseDecompressor { + private final io.airlift.compress.lz4.Lz4Decompressor decompressor = + new io.airlift.compress.lz4.Lz4Decompressor(); + + @Override + int decompress(ByteBuffer input, ByteBuffer output) { + decompressor.decompress(input, output); + return output.position(); + } + + @Override + void closeDecompressor() { + // no-op + } + } + private class ZstdCompressor extends BaseCompressor { private final ZstdCompressCtx context; @@ -437,6 +474,95 @@ void closeCompressor() { } } + /** + * Direct-memory LZ4_RAW compressor using airlift's LZ4 compressor with + * direct ByteBuffers, avoiding the stream-based heap path. + */ + private class Lz4RawCompressor extends BaseCompressor { + private final io.airlift.compress.lz4.Lz4Compressor compressor = new io.airlift.compress.lz4.Lz4Compressor(); + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.LZ4_RAW; + } + + @Override + int maxCompressedSize(int size) { + return compressor.maxCompressedLength(size); + } + + @Override + int compress(ByteBuffer input, ByteBuffer output) { + compressor.compress(input, output); + return output.position(); + } + + @Override + void closeCompressor() { + // no-op + } + } + + /** + * Direct-memory Brotli decompressor using brotli4j via reflection. + * brotli4j only exposes a byte-array API, so input/output are copied through heap arrays. + * Brotli is slow enough that the copy overhead is negligible. + */ + private class BrotliDirectDecompressor extends BaseDecompressor { + + @Override + int decompress(ByteBuffer input, ByteBuffer output) throws IOException { + byte[] compressedBytes = new byte[input.remaining()]; + input.get(compressedBytes); + byte[] decompressed = Brotli4j.decompress(compressedBytes); + output.put(decompressed); + return decompressed.length; + } + + @Override + void closeDecompressor() { + // no-op + } + } + + /** + * Direct-memory Brotli compressor using brotli4j via reflection. + * Uses quality=1 by default (fast compression, matching the old jbrotli default). + * brotli4j only exposes a byte-array API, so input/output are copied through heap arrays. + */ + private class BrotliDirectCompressor extends BaseCompressor { + private final Object params; + + BrotliDirectCompressor() { + this.params = Brotli4j.newParams(1); + } + + @Override + public CompressionCodecName getCodecName() { + return CompressionCodecName.BROTLI; + } + + @Override + int maxCompressedSize(int size) { + // Brotli worst case: input size + (input size >> 2) + 1K overhead for small inputs + return size + (size >> 2) + 1024; + } + + @Override + int compress(ByteBuffer input, ByteBuffer output) throws IOException { + byte[] inputBytes = new byte[input.remaining()]; + input.get(inputBytes); + byte[] compressed = Brotli4j.compress(inputBytes, params); + output.put(compressed); + return compressed.length; + } + + @Override + void closeCompressor() { + // no-op + } + } + /** * @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestCompressionInterop.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestCompressionInterop.java new file mode 100644 index 0000000000..e894ffff9d --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestCompressionInterop.java @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import io.airlift.compress.lzo.LzoCodec; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * End-to-end interoperability tests between the new direct compression path + * (bypassing Hadoop codec abstraction) and the old Hadoop CompressionCodec path. + * + *
These tests verify that: + *
This ensures backward/forward compatibility of compressed Parquet files regardless + * of which compression implementation produced them. + * + *
For LZO and BROTLI, the original Hadoop codec classes are not on the test classpath: + *
For LZO: uses aircompressor's {@link LzoCodec} which implements Hadoop's + * {@link CompressionCodec} interface with Hadoop-compatible framing. + * + *
For BROTLI: uses a stream-based compressor/decompressor backed by brotli4j's
+ * {@code BrotliOutputStream}/{@code BrotliInputStream} (via reflection since brotli4j
+ * is a runtime-only dependency).
+ */
+ static class HadoopOnlyCodecFactory extends CodecFactory {
+ HadoopOnlyCodecFactory(Configuration conf, int pageSize) {
+ super(conf, pageSize);
+ }
+
+ @Override
+ protected BytesCompressor createCompressor(CompressionCodecName codecName) {
+ switch (codecName) {
+ case UNCOMPRESSED:
+ return NO_OP_COMPRESSOR;
+ case LZO:
+ // Use aircompressor's LzoCodec which implements Hadoop's CompressionCodec
+ return new HeapBytesCompressor(codecName, new LzoCodec());
+ case BROTLI:
+ if (CodecFactory.Brotli4j.AVAILABLE) {
+ return new BrotliStreamCompressor();
+ }
+ // fall through if brotli4j not available
+ default:
+ CompressionCodec codec = getCodec(codecName);
+ if (codec == null) {
+ return NO_OP_COMPRESSOR;
+ }
+ return new HeapBytesCompressor(codecName, codec);
+ }
+ }
+
+ @Override
+ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
+ switch (codecName) {
+ case UNCOMPRESSED:
+ return NO_OP_DECOMPRESSOR;
+ case LZO:
+ // Use aircompressor's LzoCodec which implements Hadoop's CompressionCodec
+ return new HeapBytesDecompressor(new LzoCodec());
+ case BROTLI:
+ if (CodecFactory.Brotli4j.AVAILABLE) {
+ return new BrotliStreamDecompressor();
+ }
+ // fall through if brotli4j not available
+ default:
+ CompressionCodec codec = getCodec(codecName);
+ if (codec == null) {
+ return NO_OP_DECOMPRESSOR;
+ }
+ return new HeapBytesDecompressor(codec);
+ }
+ }
+ }
+
+ /**
+ * Stream-based Brotli compressor using brotli4j's BrotliOutputStream via reflection.
+ * This mimics what a Hadoop BrotliCodec would do: wrap the output in a BrotliOutputStream.
+ */
+ static class BrotliStreamCompressor extends CodecFactory.BytesCompressor {
+ private static final Constructor> BROTLI_OS_CTOR;
+
+ static {
+ Constructor> ctor = null;
+ try {
+ Class> bosClass = Class.forName("com.aayushatharva.brotli4j.encoder.BrotliOutputStream");
+ ctor = bosClass.getConstructor(OutputStream.class);
+ } catch (Throwable t) {
+ // will be null — checked before use
+ }
+ BROTLI_OS_CTOR = ctor;
+ }
+
+ @Override
+ public BytesInput compress(BytesInput bytes) throws IOException {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream((int) bytes.size());
+ OutputStream bos = (OutputStream) BROTLI_OS_CTOR.newInstance(baos);
+ bytes.writeAllTo(bos);
+ bos.close();
+ return BytesInput.from(baos.toByteArray());
+ } catch (ReflectiveOperationException e) {
+ throw new IOException("Brotli stream compression failed", e);
+ }
+ }
+
+ @Override
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.BROTLI;
+ }
+
+ @Override
+ public void release() {}
+ }
+
+ /**
+ * Stream-based Brotli decompressor using brotli4j's BrotliInputStream via reflection.
+ * This mimics what a Hadoop BrotliCodec would do: wrap the input in a BrotliInputStream.
+ */
+ static class BrotliStreamDecompressor extends CodecFactory.BytesDecompressor {
+ private static final Constructor> BROTLI_IS_CTOR;
+
+ static {
+ Constructor> ctor = null;
+ try {
+ Class> bisClass = Class.forName("com.aayushatharva.brotli4j.decoder.BrotliInputStream");
+ ctor = bisClass.getConstructor(InputStream.class);
+ } catch (Throwable t) {
+ // will be null — checked before use
+ }
+ BROTLI_IS_CTOR = ctor;
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException {
+ try {
+ InputStream bis = (InputStream) BROTLI_IS_CTOR.newInstance(bytes.toInputStream());
+ byte[] output = new byte[decompressedSize];
+ int offset = 0;
+ while (offset < decompressedSize) {
+ int read = bis.read(output, offset, decompressedSize - offset);
+ if (read < 0) {
+ throw new IOException(
+ "Unexpected end of Brotli stream at offset " + offset + " of " + decompressedSize);
+ }
+ offset += read;
+ }
+ bis.close();
+ return BytesInput.from(output);
+ } catch (ReflectiveOperationException e) {
+ throw new IOException("Brotli stream decompression failed", e);
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize)
+ throws IOException {
+ byte[] compressed = new byte[compressedSize];
+ input.get(compressed);
+ BytesInput decompressed = decompress(BytesInput.from(compressed), decompressedSize);
+ output.put(decompressed.toByteArray());
+ }
+
+ @Override
+ public void release() {}
+ }
+
+ // ---- Write with Hadoop path, read with Direct path ----
+
+ @Test
+ public void writeHadoopReadDirect_SNAPPY() throws Exception {
+ testWriteHadoopReadDirect(CompressionCodecName.SNAPPY);
+ }
+
+ @Test
+ public void writeHadoopReadDirect_GZIP() throws Exception {
+ testWriteHadoopReadDirect(CompressionCodecName.GZIP);
+ }
+
+ @Test
+ public void writeHadoopReadDirect_ZSTD() throws Exception {
+ testWriteHadoopReadDirect(CompressionCodecName.ZSTD);
+ }
+
+ @Test
+ public void writeHadoopReadDirect_LZ4_RAW() throws Exception {
+ testWriteHadoopReadDirect(CompressionCodecName.LZ4_RAW);
+ }
+
+ @Test
+ public void writeHadoopReadDirect_LZO() throws Exception {
+ testWriteHadoopReadDirect(CompressionCodecName.LZO);
+ }
+
+ @Test
+ public void writeHadoopReadDirect_BROTLI() throws Exception {
+ testWriteHadoopReadDirect(CompressionCodecName.BROTLI);
+ }
+
+ // ---- Write with Direct path, read with Hadoop path ----
+
+ @Test
+ public void writeDirectReadHadoop_SNAPPY() throws Exception {
+ testWriteDirectReadHadoop(CompressionCodecName.SNAPPY);
+ }
+
+ @Test
+ public void writeDirectReadHadoop_GZIP() throws Exception {
+ testWriteDirectReadHadoop(CompressionCodecName.GZIP);
+ }
+
+ @Test
+ public void writeDirectReadHadoop_ZSTD() throws Exception {
+ testWriteDirectReadHadoop(CompressionCodecName.ZSTD);
+ }
+
+ @Test
+ public void writeDirectReadHadoop_LZ4_RAW() throws Exception {
+ testWriteDirectReadHadoop(CompressionCodecName.LZ4_RAW);
+ }
+
+ @Test
+ public void writeDirectReadHadoop_LZO() throws Exception {
+ testWriteDirectReadHadoop(CompressionCodecName.LZO);
+ }
+
+ @Test
+ public void writeDirectReadHadoop_BROTLI() throws Exception {
+ testWriteDirectReadHadoop(CompressionCodecName.BROTLI);
+ }
+
+ // ---- Bidirectional test: both directions for all codecs ----
+
+ @Test
+ public void bidirectionalInteropAllCodecs() throws Exception {
+ for (CompressionCodecName codec : INTEROP_CODECS) {
+ LOG.info("Testing bidirectional interop for codec: {}", codec);
+ testWriteHadoopReadDirect(codec);
+ testWriteDirectReadHadoop(codec);
+ }
+ }
+
+ // ---- Multi-row-group test to validate interop across row group boundaries ----
+
+ @Test
+ public void writeHadoopReadDirect_multiRowGroup() throws Exception {
+ for (CompressionCodecName codec : INTEROP_CODECS) {
+ testInteropMultiRowGroup(codec, /* writeWithHadoop= */ true);
+ }
+ }
+
+ @Test
+ public void writeDirectReadHadoop_multiRowGroup() throws Exception {
+ for (CompressionCodecName codec : INTEROP_CODECS) {
+ testInteropMultiRowGroup(codec, /* writeWithHadoop= */ false);
+ }
+ }
+
+ // ---- Implementation ----
+
+ private void testWriteHadoopReadDirect(CompressionCodecName codec) throws Exception {
+ Configuration conf = new Configuration();
+ Path file = tempFolder.newFolder().toPath().resolve("hadoop_write_" + codec.name() + ".parquet");
+
+ // Write using Hadoop codec path
+ CompressionCodecFactory hadoopFactory = new HadoopOnlyCodecFactory(conf, PAGE_SIZE);
+ List