diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java deleted file mode 100644 index 64cf3c596d..0000000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.parquet.hadoop.util; - -import java.io.IOException; -import java.util.List; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.bytes.ByteBufferAllocator; -import org.apache.parquet.hadoop.util.wrapped.io.VectorIoBridge; -import org.apache.parquet.io.DelegatingSeekableInputStream; -import org.apache.parquet.io.ParquetFileRange; - -/** - * SeekableInputStream implementation that implements read(ByteBuffer) for - * Hadoop 1 FSDataInputStream. - * It implements {@link #readVectored(List, ByteBufferAllocator)}) by - * handing off to VectorIoBridge which uses reflection to offer the API if it is - * found. - * The return value of {@link #readVectoredAvailable(ByteBufferAllocator)} - * reflects the availability of the API. - */ -class H1SeekableInputStream extends DelegatingSeekableInputStream { - - private final FSDataInputStream stream; - - public H1SeekableInputStream(FSDataInputStream stream) { - super(stream); - this.stream = stream; - } - - @Override - public long getPos() throws IOException { - return stream.getPos(); - } - - @Override - public void seek(long newPos) throws IOException { - stream.seek(newPos); - } - - @Override - public void readFully(byte[] bytes) throws IOException { - stream.readFully(bytes, 0, bytes.length); - } - - @Override - public void readFully(byte[] bytes, int start, int len) throws IOException { - stream.readFully(bytes, start, len); - } - - @Override - public boolean readVectoredAvailable(final ByteBufferAllocator allocator) { - return VectorIoBridge.instance().readVectoredAvailable(stream, allocator); - } - - @Override - public void readVectored(List ranges, ByteBufferAllocator allocator) throws IOException { - VectorIoBridge.instance().readVectoredRanges(stream, ranges, allocator); - } -} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index fc84729c72..b739dfc524 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -19,24 +19,16 @@ package org.apache.parquet.hadoop.util; -import java.io.InputStream; import java.util.Objects; -import java.util.function.Function; -import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Convenience methods to get Parquet abstractions for Hadoop data streams. */ public class HadoopStreams { - - private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class); - /** * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} * implementation for Parquet readers. @@ -46,79 +38,7 @@ public class HadoopStreams { */ public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); - - // Try to check using hasCapabilities(str) - Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream); - - // If it is null, then fall back to the old method - if (hasCapabilitiesResult != null) { - if (hasCapabilitiesResult) { - return new H2SeekableInputStream(stream); - } else { - return new H1SeekableInputStream(stream); - } - } - - return unwrapByteBufferReadableLegacy(stream).apply(stream); - } - - /** - * Is the inner stream byte buffer readable? - * The test is 'the stream is not FSDataInputStream - * and implements ByteBufferReadable' - *

- * This logic is only used for Hadoop <2.9.x, and <3.x.x - * - * @param stream stream to probe - * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable - */ - private static Function unwrapByteBufferReadableLegacy( - FSDataInputStream stream) { - InputStream wrapped = stream.getWrappedStream(); - if (wrapped instanceof FSDataInputStream) { - LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); - return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped)); - } - if (stream.getWrappedStream() instanceof ByteBufferReadable) { - return H2SeekableInputStream::new; - } else { - return H1SeekableInputStream::new; - } - } - - /** - * Is the inner stream byte buffer readable? - * The test is 'the stream is not FSDataInputStream - * and implements ByteBufferReadable' - *

- * That is: all streams which implement ByteBufferReadable - * other than FSDataInputStream successfully support read(ByteBuffer). - * This is true for all filesystem clients the hadoop codebase. - *

- * In hadoop 3.3.0+, the StreamCapabilities probe can be used to - * check this: only those streams which provide the read(ByteBuffer) - * semantics MAY return true for the probe "in:readbytebuffer"; - * FSDataInputStream will pass the probe down to the underlying stream. - * - * @param stream stream to probe - * @return true if it is safe to a H2SeekableInputStream to access - * the data, null when it cannot be determined because of missing hasCapabilities - */ - private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { - boolean isByteBufferReadable = stream.hasCapability("in:readbytebuffer"); - - if (isByteBufferReadable) { - // stream is issuing the guarantee that it implements the - // API. Holds for all implementations in hadoop-* - // since Hadoop 3.3.0 (HDFS-14111). - return true; - } - InputStream wrapped = stream.getWrappedStream(); - if (wrapped instanceof FSDataInputStream) { - LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); - return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped)); - } - return wrapped instanceof ByteBufferReadable; + return new H2SeekableInputStream(stream); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java index 0232ccf984..ca53d87c14 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java @@ -381,18 +381,6 @@ public void testDirectReadFullyPositionAndLimit() throws Exception { Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); } - @Test - public void testCreateStreamNoByteBufferReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); - } - - @Test - public void testDoubleWrapNoByteBufferReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new FSDataInputStream(new MockHadoopInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); - } - @Test public void testCreateStreamWithByteBufferReadable() { final SeekableInputStream s = wrap(new FSDataInputStream(new MockByteBufferInputStream()));