Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/org/apache/sysds/parser/DMLTranslator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ public void constructHops(StatementBlock sb) {
case LIBSVM:
case HDF5:
case DELTA:
case PARQUET:
// columnar/text formats: no block layout (blocksize -1)
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public static FrameReader createFrameReader(FileFormat fmt, FileFormatProperties
case PROTO:
// TODO performance improvement: add parallel reader
return new FrameReaderProto();
case PARQUET:
return binaryParallel ? new FrameReaderParquetParallel() : new FrameReaderParquet();
default:
throw new DMLRuntimeException("Failed to create frame reader for unknown format: " + fmt.toString());
}
Expand Down
192 changes: 141 additions & 51 deletions src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.sysds.common.Types.ValueType;
Expand Down Expand Up @@ -78,7 +86,7 @@ public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] n
/**
* Reads data from a Parquet file on HDFS and fills the provided FrameBlock.
* The method retrieves the Parquet schema from the file footer, maps the required column names
* to their corresponding indices, and then uses a ParquetReader to iterate over each row.
* to their corresponding indices, and then uses the column API to iterate over each column.
* Data is extracted based on the column type and set into the output FrameBlock.
*
* @param path The HDFS path to the Parquet file.
Expand All @@ -89,65 +97,147 @@ public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] n
* @param clen The expected number of columns.
*/
protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException {
// Retrieve schema from Parquet footer
ParquetMetadata metadata = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)).getFooter();
MessageType parquetSchema = metadata.getFileMetaData().getSchema();
int row = readSingleParquetFile(path, conf, dest, clen, 0);

// Map column names to Parquet schema indices
String[] columnNames = dest.getColumnNames();
int[] columnIndices = new int[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
columnIndices[i] = parquetSchema.getFieldIndex(columnNames[i]);
// Check frame dimensions
if (row != rlen) {
throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row);
}
}

// Constants for decoding legacy INT96 timestamps
private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

/**
* Reads a single Parquet file into the destination FrameBlock using the column API.
* Iterates row groups; within each row group reads each requested column and writes the
* value into the FrameBlock.
*
* @param path The HDFS path to the Parquet file.
* @param conf The Hadoop configuration.
* @param dest The FrameBlock to populate.
* @param clen The number of columns.
* @param rowOffset The starting row offset in the destination FrameBlock.
* @return The number of rows read.
*/
protected int readSingleParquetFile(Path path, Configuration conf, FrameBlock dest,
long clen, long rowOffset) throws IOException
{
String[] columnNames = dest.getColumnNames();

try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
ParquetMetadata metadata = reader.getFooter();
MessageType parquetSchema = metadata.getFileMetaData().getSchema();
String createdBy = metadata.getFileMetaData().getCreatedBy();

// Map each requested frame column (by name) to its Parquet column descriptor.
ColumnDescriptor[] descriptors = new ColumnDescriptor[(int) clen];
for (int col = 0; col < clen; col++) {
ColumnDescriptor desc = parquetSchema.getColumnDescription(new String[]{ columnNames[col] });
// Nested columns cannot be represented by a flat FrameBlock column
if (desc.getMaxRepetitionLevel() > 0)
throw new IOException("Nested Parquet columns are not supported: " + columnNames[col]);
descriptors[col] = desc;
}

// no-op converter tree, only used by ColumnReadStoreImpl to resolve a converter per column
GroupConverter rootConverter = newNoOpRootConverter(parquetSchema);

int row = (int) rowOffset;
PageReadStore pages;

while (true) {
pages = reader.readNextRowGroup();

// Read data usind ParquetReader
try (ParquetReader<Group> rowReader = ParquetReader.builder(new GroupReadSupport(), path)
.withConf(conf)
.build()) {
if (pages == null)
break;

int rowsInGroup = (int) pages.getRowCount();
ColumnReadStoreImpl colStore = new ColumnReadStoreImpl(pages, rootConverter, parquetSchema, createdBy);

Group group;
int row = 0;
while ((group = rowReader.read()) != null) {
for (int col = 0; col < clen; col++) {
int colIndex = columnIndices[col];
if (group.getFieldRepetitionCount(colIndex) > 0) {
PrimitiveType.PrimitiveTypeName type = parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName();
switch (type) {
case INT32:
dest.set(row, col, group.getInteger(colIndex, 0));
break;
case INT64:
dest.set(row, col, group.getLong(colIndex, 0));
break;
case FLOAT:
dest.set(row, col, group.getFloat(colIndex, 0));
break;
case DOUBLE:
dest.set(row, col, group.getDouble(colIndex, 0));
break;
case BOOLEAN:
dest.set(row, col, group.getBoolean(colIndex, 0));
break;
case BINARY:
dest.set(row, col, group.getBinary(colIndex, 0).toStringUsingUTF8());
break;
default:
throw new IOException("Unsupported data type: " + type);
}
} else {
dest.set(row, col, null);
}
ColumnDescriptor desc = descriptors[col];
ColumnReader creader = colStore.getColumnReader(desc);
int maxDef = desc.getMaxDefinitionLevel();
PrimitiveType.PrimitiveTypeName ptype = desc.getPrimitiveType().getPrimitiveTypeName();
readColumn(dest, creader, col, row, rowsInGroup, maxDef, ptype);
}
row++;
row += rowsInGroup;
}
return row - (int) rowOffset;
}
}

// Check frame dimensions
if (row != rlen) {
throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row);
/**
* Reads one column of a row group, writing each value (or null) into the destination FrameBlock.
*/
private void readColumn(FrameBlock dest, ColumnReader creader, int col, int rowStart,
int rowsInGroup, int maxDef, PrimitiveType.PrimitiveTypeName ptype) throws IOException
{
for (int i = 0; i < rowsInGroup; i++) {
int row = rowStart + i;
if (creader.getCurrentDefinitionLevel() == maxDef) {
switch (ptype) {
case INT32:
dest.set(row, col, creader.getInteger());
break;
case INT64:
dest.set(row, col, creader.getLong());
break;
case FLOAT:
dest.set(row, col, creader.getFloat());
break;
case DOUBLE:
dest.set(row, col, creader.getDouble());
break;
case BOOLEAN:
dest.set(row, col, creader.getBoolean());
break;
case INT96: {
// Legacy INT96 timestamp, narrowed to epoch millis.
// See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
Binary binary = creader.getBinary();
ByteBuffer buf = ByteBuffer.wrap(binary.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
long nanosOfDay = buf.getLong();
int julianDay = buf.getInt();
long millis = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY
+ nanosOfDay / NANOS_PER_MILLISECOND;
dest.set(row, col, millis);
break;
}
case BINARY:
dest.set(row, col, creader.getBinary().toStringUsingUTF8());
break;
default:
throw new IOException("Unsupported data type: " + ptype);
}
}
else {
dest.set(row, col, null);
}
creader.consume();
}
}

/**
* Builds a no-op converter tree matching the (flat) Parquet schema. The converter
* callbacks are never invoked because values are read through the typed creader.getX accessors in readColumn().
* The tree merely serves to satisfy the ColumnReadStoreImpl constructor
*/
private static GroupConverter newNoOpRootConverter(MessageType schema) {
final int n = schema.getFieldCount();
final PrimitiveConverter[] leaves = new PrimitiveConverter[n];
for (int i = 0; i < n; i++)
leaves[i] = new PrimitiveConverter() {};
return new GroupConverter() {
@Override public Converter getConverter(int fieldIndex) { return leaves[fieldIndex]; }
@Override public void start() {}
@Override public void end() {}
};
}

//not implemented
@Override
public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
Expand Down Expand Up @@ -59,14 +62,35 @@ public class FrameReaderParquetParallel extends FrameReaderParquet {
protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException, DMLRuntimeException {
FileSystem fs = IOUtilFunctions.getFileSystem(path);
Path[] files = IOUtilFunctions.getSequenceFilePaths(fs, path);
Arrays.sort(files, Comparator.comparing(Path::getName));
int numThreads = Math.min(OptimizerUtils.getParallelBinaryReadParallelism(), files.length);

long[] offsets = new long[files.length];
long cumulative = 0;

for (int i = 0; i < files.length; i++) {
long fileRows = 0;

try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(files[i], conf))) {

ParquetMetadata meta = reader.getFooter();

for (BlockMetaData block : meta.getBlocks()) {
fileRows += block.getRowCount();
}
}

offsets[i] = cumulative;
cumulative += fileRows;
}

// Create and execute read tasks
ExecutorService pool = CommonThreadPool.get(numThreads);
try {
List<ReadFileTask> tasks = new ArrayList<>();
for (Path file : files) {
tasks.add(new ReadFileTask(file, conf, dest, schema, clen));
for (int i = 0; i < files.length; i++) {
tasks.add(new ReadFileTask(files[i], conf, dest, clen, offsets[i]));
}

for (Future<Object> task : pool.invokeAll(tasks)) {
Expand All @@ -83,35 +107,21 @@ private class ReadFileTask implements Callable<Object> {
private Path path;
private Configuration conf;
private FrameBlock dest;
@SuppressWarnings("unused")
private ValueType[] schema;
private long clen;
private long rowOffset;

public ReadFileTask(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long clen) {
public ReadFileTask(Path path, Configuration conf, FrameBlock dest, long clen, long rowOffset) {
this.path = path;
this.conf = conf;
this.dest = dest;
this.schema = schema;
this.clen = clen;
this.rowOffset = rowOffset;
}

// When executed, a ParquetReader for the assigned file opens and iterates over each row processing every column.
@Override
public Object call() throws Exception {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) {
Group group;
int row = 0;
while ((group = reader.read()) != null) {
for (int col = 0; col < clen; col++) {
if (group.getFieldRepetitionCount(col) > 0) {
dest.set(row, col, group.getValueToString(col, 0));
} else {
dest.set(row, col, null);
}
}
row++;
}
}
FrameReaderParquetParallel.super.readSingleParquetFile(path, conf, dest, clen, rowOffset);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public static FrameWriter createFrameWriter(FileFormat fmt, FileFormatProperties
return binaryParallel ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock();
case PROTO:
return new FrameWriterProto();
case PARQUET:
return binaryParallel ? new FrameWriterParquetParallel() : new FrameWriterParquet();
default:
throw new DMLRuntimeException("Failed to create frame writer for unknown format: " + fmt.toString());
}
Expand Down
Loading
Loading