diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java index a8e1667d049..0db739cc901 100644 --- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java @@ -1058,6 +1058,7 @@ public void constructHops(StatementBlock sb) { case LIBSVM: case HDF5: case DELTA: + case PARQUET: // columnar/text formats: no block layout (blocksize -1) ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1); break; diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java index 4e21d2c3f60..6afd3836a15 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java @@ -51,6 +51,8 @@ public static FrameReader createFrameReader(FileFormat fmt, FileFormatProperties case PROTO: // TODO performance improvement: add parallel reader return new FrameReaderProto(); + case PARQUET: + return binaryParallel ? new FrameReaderParquetParallel() : new FrameReaderParquet(); default: throw new DMLRuntimeException("Failed to create frame reader for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java index ff23e9ea316..504602ef713 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java @@ -20,15 +20,23 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.parquet.example.data.Group; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.sysds.common.Types.ValueType; @@ -78,7 +86,7 @@ public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] n /** * Reads data from a Parquet file on HDFS and fills the provided FrameBlock. * The method retrieves the Parquet schema from the file footer, maps the required column names - * to their corresponding indices, and then uses a ParquetReader to iterate over each row. + * to their corresponding indices, and then uses the column API to iterate over each column. * Data is extracted based on the column type and set into the output FrameBlock. * * @param path The HDFS path to the Parquet file. @@ -89,65 +97,147 @@ public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] n * @param clen The expected number of columns. */ protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException { - // Retrieve schema from Parquet footer - ParquetMetadata metadata = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)).getFooter(); - MessageType parquetSchema = metadata.getFileMetaData().getSchema(); + int row = readSingleParquetFile(path, conf, dest, clen, 0); - // Map column names to Parquet schema indices - String[] columnNames = dest.getColumnNames(); - int[] columnIndices = new int[columnNames.length]; - for (int i = 0; i < columnNames.length; i++) { - columnIndices[i] = parquetSchema.getFieldIndex(columnNames[i]); + // Check frame dimensions + if (row != rlen) { + throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row); } + } + + // Constants for decoding legacy INT96 timestamps + private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588; + private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); + private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); + + /** + * Reads a single Parquet file into the destination FrameBlock using the column API. + * Iterates row groups; within each row group reads each requested column and writes the + * value into the FrameBlock. + * + * @param path The HDFS path to the Parquet file. + * @param conf The Hadoop configuration. + * @param dest The FrameBlock to populate. + * @param clen The number of columns. + * @param rowOffset The starting row offset in the destination FrameBlock. + * @return The number of rows read. + */ + protected int readSingleParquetFile(Path path, Configuration conf, FrameBlock dest, + long clen, long rowOffset) throws IOException + { + String[] columnNames = dest.getColumnNames(); + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + ParquetMetadata metadata = reader.getFooter(); + MessageType parquetSchema = metadata.getFileMetaData().getSchema(); + String createdBy = metadata.getFileMetaData().getCreatedBy(); + + // Map each requested frame column (by name) to its Parquet column descriptor. + ColumnDescriptor[] descriptors = new ColumnDescriptor[(int) clen]; + for (int col = 0; col < clen; col++) { + ColumnDescriptor desc = parquetSchema.getColumnDescription(new String[]{ columnNames[col] }); + // Nested columns cannot be represented by a flat FrameBlock column + if (desc.getMaxRepetitionLevel() > 0) + throw new IOException("Nested Parquet columns are not supported: " + columnNames[col]); + descriptors[col] = desc; + } + + // no-op converter tree, only used by ColumnReadStoreImpl to resolve a converter per column + GroupConverter rootConverter = newNoOpRootConverter(parquetSchema); + + int row = (int) rowOffset; + PageReadStore pages; + + while (true) { + pages = reader.readNextRowGroup(); - // Read data usind ParquetReader - try (ParquetReader rowReader = ParquetReader.builder(new GroupReadSupport(), path) - .withConf(conf) - .build()) { + if (pages == null) + break; + + int rowsInGroup = (int) pages.getRowCount(); + ColumnReadStoreImpl colStore = new ColumnReadStoreImpl(pages, rootConverter, parquetSchema, createdBy); - Group group; - int row = 0; - while ((group = rowReader.read()) != null) { for (int col = 0; col < clen; col++) { - int colIndex = columnIndices[col]; - if (group.getFieldRepetitionCount(colIndex) > 0) { - PrimitiveType.PrimitiveTypeName type = parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName(); - switch (type) { - case INT32: - dest.set(row, col, group.getInteger(colIndex, 0)); - break; - case INT64: - dest.set(row, col, group.getLong(colIndex, 0)); - break; - case FLOAT: - dest.set(row, col, group.getFloat(colIndex, 0)); - break; - case DOUBLE: - dest.set(row, col, group.getDouble(colIndex, 0)); - break; - case BOOLEAN: - dest.set(row, col, group.getBoolean(colIndex, 0)); - break; - case BINARY: - dest.set(row, col, group.getBinary(colIndex, 0).toStringUsingUTF8()); - break; - default: - throw new IOException("Unsupported data type: " + type); - } - } else { - dest.set(row, col, null); - } + ColumnDescriptor desc = descriptors[col]; + ColumnReader creader = colStore.getColumnReader(desc); + int maxDef = desc.getMaxDefinitionLevel(); + PrimitiveType.PrimitiveTypeName ptype = desc.getPrimitiveType().getPrimitiveTypeName(); + readColumn(dest, creader, col, row, rowsInGroup, maxDef, ptype); } - row++; + row += rowsInGroup; } + return row - (int) rowOffset; + } + } - // Check frame dimensions - if (row != rlen) { - throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row); + /** + * Reads one column of a row group, writing each value (or null) into the destination FrameBlock. + */ + private void readColumn(FrameBlock dest, ColumnReader creader, int col, int rowStart, + int rowsInGroup, int maxDef, PrimitiveType.PrimitiveTypeName ptype) throws IOException + { + for (int i = 0; i < rowsInGroup; i++) { + int row = rowStart + i; + if (creader.getCurrentDefinitionLevel() == maxDef) { + switch (ptype) { + case INT32: + dest.set(row, col, creader.getInteger()); + break; + case INT64: + dest.set(row, col, creader.getLong()); + break; + case FLOAT: + dest.set(row, col, creader.getFloat()); + break; + case DOUBLE: + dest.set(row, col, creader.getDouble()); + break; + case BOOLEAN: + dest.set(row, col, creader.getBoolean()); + break; + case INT96: { + // Legacy INT96 timestamp, narrowed to epoch millis. + // See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp + Binary binary = creader.getBinary(); + ByteBuffer buf = ByteBuffer.wrap(binary.getBytes()).order(ByteOrder.LITTLE_ENDIAN); + long nanosOfDay = buf.getLong(); + int julianDay = buf.getInt(); + long millis = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY + + nanosOfDay / NANOS_PER_MILLISECOND; + dest.set(row, col, millis); + break; + } + case BINARY: + dest.set(row, col, creader.getBinary().toStringUsingUTF8()); + break; + default: + throw new IOException("Unsupported data type: " + ptype); + } + } + else { + dest.set(row, col, null); } + creader.consume(); } } + /** + * Builds a no-op converter tree matching the (flat) Parquet schema. The converter + * callbacks are never invoked because values are read through the typed creader.getX accessors in readColumn(). + * The tree merely serves to satisfy the ColumnReadStoreImpl constructor + */ + private static GroupConverter newNoOpRootConverter(MessageType schema) { + final int n = schema.getFieldCount(); + final PrimitiveConverter[] leaves = new PrimitiveConverter[n]; + for (int i = 0; i < n; i++) + leaves[i] = new PrimitiveConverter() {}; + return new GroupConverter() { + @Override public Converter getConverter(int fieldIndex) { return leaves[fieldIndex]; } + @Override public void start() {} + @Override public void end() {} + }; + } + //not implemented @Override public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java index 3d40f53c626..da25157512a 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderParquetParallel.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -27,9 +29,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.example.data.Group; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; @@ -59,14 +62,35 @@ public class FrameReaderParquetParallel extends FrameReaderParquet { protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException, DMLRuntimeException { FileSystem fs = IOUtilFunctions.getFileSystem(path); Path[] files = IOUtilFunctions.getSequenceFilePaths(fs, path); + Arrays.sort(files, Comparator.comparing(Path::getName)); int numThreads = Math.min(OptimizerUtils.getParallelBinaryReadParallelism(), files.length); + long[] offsets = new long[files.length]; + long cumulative = 0; + + for (int i = 0; i < files.length; i++) { + long fileRows = 0; + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(files[i], conf))) { + + ParquetMetadata meta = reader.getFooter(); + + for (BlockMetaData block : meta.getBlocks()) { + fileRows += block.getRowCount(); + } + } + + offsets[i] = cumulative; + cumulative += fileRows; + } + // Create and execute read tasks ExecutorService pool = CommonThreadPool.get(numThreads); try { List tasks = new ArrayList<>(); - for (Path file : files) { - tasks.add(new ReadFileTask(file, conf, dest, schema, clen)); + for (int i = 0; i < files.length; i++) { + tasks.add(new ReadFileTask(files[i], conf, dest, clen, offsets[i])); } for (Future task : pool.invokeAll(tasks)) { @@ -83,35 +107,21 @@ private class ReadFileTask implements Callable { private Path path; private Configuration conf; private FrameBlock dest; - @SuppressWarnings("unused") - private ValueType[] schema; private long clen; + private long rowOffset; - public ReadFileTask(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long clen) { + public ReadFileTask(Path path, Configuration conf, FrameBlock dest, long clen, long rowOffset) { this.path = path; this.conf = conf; this.dest = dest; - this.schema = schema; this.clen = clen; + this.rowOffset = rowOffset; } // When executed, a ParquetReader for the assigned file opens and iterates over each row processing every column. @Override public Object call() throws Exception { - try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) { - Group group; - int row = 0; - while ((group = reader.read()) != null) { - for (int col = 0; col < clen; col++) { - if (group.getFieldRepetitionCount(col) > 0) { - dest.set(row, col, group.getValueToString(col, 0)); - } else { - dest.set(row, col, null); - } - } - row++; - } - } + FrameReaderParquetParallel.super.readSingleParquetFile(path, conf, dest, clen, rowOffset); return null; } } diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java index 3fb3968c96f..6e60af8288b 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java @@ -50,6 +50,8 @@ public static FrameWriter createFrameWriter(FileFormat fmt, FileFormatProperties return binaryParallel ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock(); case PROTO: return new FrameWriterProto(); + case PARQUET: + return binaryParallel ? new FrameWriterParquetParallel() : new FrameWriterParquet(); default: throw new DMLRuntimeException("Failed to create frame writer for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java index ccaeeb56d51..35e457e334d 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquet.java @@ -19,19 +19,23 @@ package org.apache.sysds.runtime.io; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; -import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.frame.data.FrameBlock; @@ -44,6 +48,26 @@ */ public class FrameWriterParquet extends FrameWriter { + public enum DictEncoding { ALL_ON, ALL_OFF, STRING_ONLY } + + private final CompressionCodecName codec; + private final DictEncoding dictEncoding; + private final long rowGroupSize; + + public FrameWriterParquet() { + this(CompressionCodecName.ZSTD, DictEncoding.STRING_ONLY, ParquetWriter.DEFAULT_BLOCK_SIZE); + } + + public FrameWriterParquet(CompressionCodecName codec, DictEncoding dictEncoding) { + this(codec, dictEncoding, ParquetWriter.DEFAULT_BLOCK_SIZE); + } + + public FrameWriterParquet(CompressionCodecName codec, DictEncoding dictEncoding, long rowGroupSize) { + this.codec = codec; + this.dictEncoding = dictEncoding; + this.rowGroupSize = rowGroupSize; + } + /** * Writes a FrameBlock to a Parquet file on HDFS. * @@ -73,7 +97,7 @@ public final void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long /** * Writes the FrameBlock data to a Parquet file using a ParquetWriter. * The method generates a Parquet schema based on the metadata of the FrameBlock, initializes a ParquetWriter with specified configurations, - * iterates over each row and column, adding values (in batches for improved performance) using type-specific conversions. + * iterates over each row and column, writing directly to the RecordConsumer, using type-specific conversions. * * @param path The HDFS path where the Parquet file will be written. * @param conf The Hadoop configuration. @@ -87,70 +111,25 @@ protected void writeParquetFrameToHDFS(Path path, Configuration conf, FrameBlock // Create schema based on frame block metadata MessageType schema = createParquetSchema(src); - // TODO:Experiment with different batch sizes? - int batchSize = 1000; - int rowCount = 0; + String[] columnNames = src.getColumnNames(); + ValueType[] columnTypes = src.getSchema(); - // Write data using ParquetWriter //FIXME replace example writer? - try (ParquetWriter writer = ExampleParquetWriter.builder(path) + FrameParquetWriterBuilder writerBuilder = new FrameParquetWriterBuilder(path, schema, src) .withConf(conf) - .withType(schema) - .withCompressionCodec(ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME) - .withRowGroupSize((long) ParquetWriter.DEFAULT_BLOCK_SIZE) - .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) - .withDictionaryEncoding(true) - .build()) - { - - SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); - - List rowBuffer = new ArrayList<>(batchSize); - - for (int i = 0; i < src.getNumRows(); i++) { - Group group = groupFactory.newGroup(); - for (int j = 0; j < src.getNumColumns(); j++) { - Object value = src.get(i, j); - if (value != null) { - ValueType type = src.getSchema()[j]; - switch (type) { - case STRING: - group.add(src.getColumnNames()[j], value.toString()); - break; - case INT32: - group.add(src.getColumnNames()[j], (int) value); - break; - case INT64: - group.add(src.getColumnNames()[j], (long) value); - break; - case FP32: - group.add(src.getColumnNames()[j], (float) value); - break; - case FP64: - group.add(src.getColumnNames()[j], (double) value); - break; - case BOOLEAN: - group.add(src.getColumnNames()[j], (boolean) value); - break; - default: - throw new IOException("Unsupported value type: " + type); - } - } - } - rowBuffer.add(group); - rowCount++; + .withCompressionCodec(CompressionCodecName.fromConf(conf.get(ParquetOutputFormat.COMPRESSION, codec.name()))) + .withRowGroupSize(conf.getLong(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize)) + .withPageSize(conf.getInt(ParquetOutputFormat.PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE)) + .withDictionaryPageSize(conf.getInt(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE)) + .withDictionaryEncoding(conf.getBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, dictEncoding == DictEncoding.ALL_ON)); - if (rowCount >= batchSize) { - for (Group g : rowBuffer) { - writer.write(g); - } - rowBuffer.clear(); - rowCount = 0; - } - } - - for (Group g : rowBuffer) { - writer.write(g); - } + if (dictEncoding == DictEncoding.STRING_ONLY) + for (int j = 0; j < src.getNumColumns(); j++) + if (columnTypes[j] == ValueType.STRING) + writerBuilder = writerBuilder.withDictionaryEncoding(columnNames[j], true); + + try (ParquetWriter writer = writerBuilder.build()) { + for (int i = 0; i < src.getNumRows(); i++) + writer.write(i); } // Delete CRC files created by Hadoop if necessary @@ -164,36 +143,128 @@ protected void writeParquetFrameToHDFS(Path path, Configuration conf, FrameBlock * @return The generated Parquet MessageType schema. */ protected MessageType createParquetSchema(FrameBlock src) { - StringBuilder schemaBuilder = new StringBuilder("message FrameSchema {"); String[] columnNames = src.getColumnNames(); ValueType[] columnTypes = src.getSchema(); + Types.MessageTypeBuilder builder = Types.buildMessage(); for (int i = 0; i < src.getNumColumns(); i++) { - schemaBuilder.append("optional "); switch (columnTypes[i]) { case STRING: - schemaBuilder.append("binary ").append(columnNames[i]).append(" (UTF8);"); + builder.optional(PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named(columnNames[i]); break; case INT32: - schemaBuilder.append("int32 ").append(columnNames[i]).append(";"); + builder.optional(PrimitiveTypeName.INT32).named(columnNames[i]); break; case INT64: - schemaBuilder.append("int64 ").append(columnNames[i]).append(";"); + builder.optional(PrimitiveTypeName.INT64).named(columnNames[i]); break; case FP32: - schemaBuilder.append("float ").append(columnNames[i]).append(";"); + builder.optional(PrimitiveTypeName.FLOAT).named(columnNames[i]); break; case FP64: - schemaBuilder.append("double ").append(columnNames[i]).append(";"); + builder.optional(PrimitiveTypeName.DOUBLE).named(columnNames[i]); break; case BOOLEAN: - schemaBuilder.append("boolean ").append(columnNames[i]).append(";"); + builder.optional(PrimitiveTypeName.BOOLEAN).named(columnNames[i]); break; default: throw new IllegalArgumentException("Unsupported data type: " + columnTypes[i]); } } - schemaBuilder.append("}"); - return MessageTypeParser.parseMessageType(schemaBuilder.toString()); + return builder.named("FrameSchema"); + } + + /** + * WriteSupport implementation that writes rows from a FrameBlock directly to the + * Parquet RecordConsumer. + */ + private static class FrameWriteSupport extends WriteSupport { + private final MessageType schema; + private final FrameBlock src; + private RecordConsumer recordConsumer; + // constant across all rows + private String[] colNames; + private ValueType[] colTypes; + private int numCols; + + FrameWriteSupport(MessageType schema, FrameBlock src) { + this.schema = schema; + this.src = src; + } + + @Override + public WriteContext init(Configuration configuration) { + Map metadata = new HashMap<>(); + return new WriteContext(schema, metadata); + } + + @Override + public void prepareForWrite(RecordConsumer consumer) { + this.recordConsumer = consumer; + this.colNames = src.getColumnNames(); + this.colTypes = src.getSchema(); + this.numCols = src.getNumColumns(); + } + + @Override + public void write(Integer rowIndex) { + recordConsumer.startMessage(); + for (int j = 0; j < numCols; j++) { + Object value = src.get(rowIndex, j); + if (value != null) { + recordConsumer.startField(colNames[j], j); + switch (colTypes[j]) { + case STRING: + recordConsumer.addBinary(Binary.fromString(value.toString())); + break; + case INT32: + recordConsumer.addInteger((int) value); + break; + case INT64: + recordConsumer.addLong((long) value); + break; + case FP32: + recordConsumer.addFloat((float) value); + break; + case FP64: + recordConsumer.addDouble((double) value); + break; + case BOOLEAN: + recordConsumer.addBoolean((boolean) value); + break; + default: + throw new IllegalArgumentException("Unsupported value type: " + colTypes[j]); + } + recordConsumer.endField(colNames[j], j); + } + } + recordConsumer.endMessage(); + } + } + + /** + * ParquetWriter builder wired to FrameWriteSupport. + */ + private static class FrameParquetWriterBuilder extends ParquetWriter.Builder { + private final MessageType schema; + private final FrameBlock src; + + FrameParquetWriterBuilder(Path path, MessageType schema, FrameBlock src) { + super(path); + this.schema = schema; + this.src = src; + } + + @Override + protected FrameParquetWriterBuilder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return new FrameWriteSupport(schema, src); + } } } diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java index 0ef4431ef47..441ec2db550 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterParquetParallel.java @@ -35,14 +35,14 @@ import org.apache.sysds.utils.stats.InfrastructureAnalyzer; /** - * Multi-threaded frame parquet reader. + * Multi-threaded frame parquet writer. * */ public class FrameWriterParquetParallel extends FrameWriterParquet { /** * Writes the FrameBlock data to HDFS in parallel. - * The method estimates the number of output partitions by comparing the total number of cells in the FrameBlock with the + * The method estimates the number of output partitions by comparing the estimated output size of the FrameBlock with the * HDFS block size. It then determines the number of threads to use based on the parallelism configuration and the * number of partitions. In case of parallelism, it divides the FrameBlock into chunks and a thread pool is created to * execute a write task for each partition concurrently. @@ -55,14 +55,14 @@ public class FrameWriterParquetParallel extends FrameWriterParquet { protected void writeParquetFrameToHDFS(Path path, Configuration conf, FrameBlock src) throws IOException, DMLRuntimeException { - // Estimate number of output partitions - int numPartFiles = Math.max((int) (src.getNumRows() * src.getNumColumns() / InfrastructureAnalyzer.getHDFSBlockSize()), 1); - + // Estimate output partitions from output size in bytes + int numPartFiles = Math.max((int) (OptimizerUtils.estimateSizeExactFrame(src.getNumRows(), src.getNumColumns()) + / InfrastructureAnalyzer.getHDFSBlockSize()), 1); + // Determine parallelism int numThreads = Math.min(OptimizerUtils.getParallelBinaryWriteParallelism(), numPartFiles); - // Fall back to sequential write if numThreads <= 1 - if (numThreads <= 1) { + if (!_forcedParallel && numThreads <= 1) { super.writeParquetFrameToHDFS(path, conf, src); return; } diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java index 1e4334891ed..54945f1689a 100644 --- a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameParquetSchemaTest.java @@ -22,6 +22,7 @@ import org.junit.Assert; import org.junit.Test; +import org.apache.sysds.test.TestUtils; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.io.FrameReader; @@ -76,11 +77,11 @@ public void testParquetWriteReadAllSchemaTypes() { // Populate frame block Object[][] rows = new Object[][] { - { 1.0, 1.1f, 10, 100L, true, "A" }, - { 2.0, 2.1f, 20, 200L, false, "B" }, - { 3.0, 3.1f, 30, 300L, true, "C" }, - { 4.0, 4.1f, 40, 400L, false, "D" }, - { 5.0, 5.1f, 50, 500L, true, "E" } + { 1.0, 1.1f, 10, 100L, true, "A" }, + { 2.0, 2.1f, 20, 200L, false, "B" }, + { 3.0, 3.1f, 30, 300L, true, "C" }, + { 4.0, 4.1f, 40, 400L, false, "D" }, + { 5.0, 5.1f, 50, 500L, true, "E" } }; for (Object[] row : rows) { @@ -115,7 +116,7 @@ public void testParquetWriteReadAllSchemaTypes() { } // Compare the original and the read frame blocks - compareFrameBlocks(fb, fbRead, 1e-6); + TestUtils.compareFrames(fb, fbRead, false); } /** @@ -138,11 +139,11 @@ public void testParquetWriteReadAllSchemaTypesParallel() { FrameBlock fb = new FrameBlock(schema); Object[][] rows = new Object[][] { - { 1.0, 1.1f, 10, 100L, true, "A" }, - { 2.0, 2.1f, 20, 200L, false, "B" }, - { 3.0, 3.1f, 30, 300L, true, "C" }, - { 4.0, 4.1f, 40, 400L, false, "D" }, - { 5.0, 5.1f, 50, 500L, true, "E" } + { 1.0, 1.1f, 10, 100L, true, "A" }, + { 2.0, 2.1f, 20, 200L, false, "B" }, + { 3.0, 3.1f, 30, 300L, true, "C" }, + { 4.0, 4.1f, 40, 400L, false, "D" }, + { 5.0, 5.1f, 50, 500L, true, "E" } }; for (Object[] row : rows) { @@ -172,52 +173,6 @@ public void testParquetWriteReadAllSchemaTypesParallel() { Assert.fail("Failed to read frame block from Parquet (parallel): " + e.getMessage()); } - compareFrameBlocks(fb, fbRead, 1e-6); - } - - private void compareFrameBlocks(FrameBlock expected, FrameBlock actual, double eps) { - Assert.assertEquals("Number of rows mismatch", expected.getNumRows(), actual.getNumRows()); - Assert.assertEquals("Number of columns mismatch", expected.getNumColumns(), actual.getNumColumns()); - - int rows = expected.getNumRows(); - int cols = expected.getNumColumns(); - - for (int i = 0; i < rows; i++) { - for (int j = 0; j < cols; j++) { - Object expVal = expected.get(i, j); - Object actVal = actual.get(i, j); - ValueType vt = expected.getSchema()[j]; - - // Handle nulls first - if(expVal == null || actVal == null) { - Assert.assertEquals("Mismatch at (" + i + "," + j + ")", expVal, actVal); - } else { - switch(vt) { - case FP64: - case FP32: - double dExp = ((Number) expVal).doubleValue(); - double dAct = ((Number) actVal).doubleValue(); - Assert.assertEquals("Mismatch at (" + i + "," + j + ")", dExp, dAct, eps); - break; - case INT32: - case INT64: - long lExp = ((Number) expVal).longValue(); - long lAct = ((Number) actVal).longValue(); - Assert.assertEquals("Mismatch at (" + i + "," + j + ")", lExp, lAct); - break; - case BOOLEAN: - boolean bExp = (Boolean) expVal; - boolean bAct = (Boolean) actVal; - Assert.assertEquals("Mismatch at (" + i + "," + j + ")", bExp, bAct); - break; - case STRING: - Assert.assertEquals("Mismatch at (" + i + "," + j + ")", expVal.toString(), actVal.toString()); - break; - default: - Assert.fail("Unsupported type in comparison: " + vt); - } - } - } - } + TestUtils.compareFrames(fb, fbRead, false); } } diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameReaderParquetLegacy.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameReaderParquetLegacy.java new file mode 100644 index 00000000000..0b2510e4508 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameReaderParquetLegacy.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.test.functions.io.parquet; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReader; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.parquet.io.api.Binary; + +/** + * Single-threaded frame parquet reader using the {@code Group} record API (legacy baseline). + * + */ +public class FrameReaderParquetLegacy extends FrameReader { + + /** + * Reads a Parquet file from HDFS and converts it into a FrameBlock. + * + * @param fname The HDFS file path to the Parquet file. + * @param schema The expected data types of the columns. + * @param names The names of the columns. + * @param rlen The expected number of rows. + * @param clen The expected number of columns. + * @return A FrameBlock containing the data read from the Parquet file. + */ + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException { + // Prepare file access + Configuration conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + + // Check existence and non-empty file + if (!HDFSTool.existsFileOnHDFS(path.toString())) { + throw new IOException("File does not exist on HDFS: " + fname); + } + + // Allocate output frame block + ValueType[] lschema = createOutputSchema(schema, clen); + String[] lnames = createOutputNames(names, clen); + FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen); + + // Read Parquet file + readParquetFrameFromHDFS(path, conf, ret, lschema, rlen, clen); + + return ret; + } + + /** + * Reads data from a Parquet file on HDFS and fills the provided FrameBlock. + * The method retrieves the Parquet schema from the file footer, maps the required column names + * to their corresponding indices, and then uses a ParquetReader to iterate over each row. + * Data is extracted based on the column type and set into the output FrameBlock. + * + * @param path The HDFS path to the Parquet file. + * @param conf The Hadoop configuration. + * @param dest The FrameBlock to populate with data. + * @param schema The expected value types for the output columns. + * @param rlen The expected number of rows. + * @param clen The expected number of columns. + */ + protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException { + int row = readSingleParquetFile(path, conf, dest, clen, 0); + + // Check frame dimensions + if (row != rlen) { + throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row); + } + } + + // Constants for decoding legacy INT96 timestamps + private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588; + private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); + private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); + + protected int readSingleParquetFile(Path path, Configuration conf, FrameBlock dest, + long clen, long rowOffset) throws IOException { + // Retrieve schema from Parquet footer + ParquetMetadata metadata; + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {metadata = reader.getFooter();} + MessageType parquetSchema = metadata.getFileMetaData().getSchema(); + + // Map column names to Parquet schema indices + String[] columnNames = dest.getColumnNames(); + int[] columnIndices = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + columnIndices[i] = parquetSchema.getFieldIndex(columnNames[i]); + } + + // Read data usind ParquetReader + try (ParquetReader rowReader = ParquetReader.builder(new GroupReadSupport(), path) + .withConf(conf) + .build()) { + + Group group; + int row = (int) rowOffset; + while ((group = rowReader.read()) != null) { + for (int col = 0; col < clen; col++) { + int colIndex = columnIndices[col]; + if (group.getFieldRepetitionCount(colIndex) > 0) { + PrimitiveType.PrimitiveTypeName type = parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName(); + switch (type) { + case INT32: + dest.set(row, col, group.getInteger(colIndex, 0)); + break; + case INT64: + dest.set(row, col, group.getLong(colIndex, 0)); + break; + case FLOAT: + dest.set(row, col, group.getFloat(colIndex, 0)); + break; + case DOUBLE: + dest.set(row, col, group.getDouble(colIndex, 0)); + break; + case BOOLEAN: + dest.set(row, col, group.getBoolean(colIndex, 0)); + break; + case INT96: { + // Legacy INT96 timestamp (Spark/Impala), narrowed to epoch millis. + // See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp + Binary binary = group.getInt96(colIndex, 0); + ByteBuffer buf = ByteBuffer.wrap(binary.getBytes()).order(ByteOrder.LITTLE_ENDIAN); + long nanosOfDay = buf.getLong(); + int julianDay = buf.getInt(); + long millis = (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY + + nanosOfDay / NANOS_PER_MILLISECOND; + dest.set(row, col, millis); + break; + } + case BINARY: + dest.set(row, col, group.getBinary(colIndex, 0).toStringUsingUTF8()); + break; + default: + throw new IOException("Unsupported data type: " + type); + } + } else { + dest.set(row, col, null); + } + } + row++; + } + return row - (int) rowOffset; + } + } + + //not implemented + @Override + public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException { + throw new UnsupportedOperationException("Unimplemented method 'readFrameFromInputStream'"); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameReaderWriterParquetTest.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameReaderWriterParquetTest.java new file mode 100644 index 00000000000..b4d87e9f905 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameReaderWriterParquetTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.parquet; + +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReaderParquet; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +/** + * Random-frame write/read round-trip test for the parquet frame reader/writer. + */ +public class FrameReaderWriterParquetTest { + + private static final String FILENAME = "target/testTemp/functions/io/parquet/FrameReaderWriterParquetTest/frame.parquet"; + + // Parquet-supported value types + private static final ValueType[] SCHEMA = { + ValueType.FP64, ValueType.FP32, ValueType.INT32, ValueType.INT64, ValueType.BOOLEAN, ValueType.STRING + }; + + @Test + public void testSingleRowSingleCol() throws IOException { + runWriteReadRoundTrip(1, 1, 4669201); + } + + @Test + public void testSingleRowMultiCol() throws IOException { + runWriteReadRoundTrip(1, 6, 4669201); + } + + @Test + public void testMultiRowSingleCol() throws IOException { + runWriteReadRoundTrip(21, 1, 4669201); + } + + @Test + public void testMultiRowMultiCol() throws IOException { + runWriteReadRoundTrip(42, 5, 4669201); + } + + @Test + public void testLargerFrame() throws IOException { + runWriteReadRoundTrip(694, 6, 4669201); + } + + @Test + public void testValueTypeEdgeCases() throws IOException { + // type min/max, empty string, special chars (comma/quote/newline/unicode) + ValueType[] schema = { ValueType.FP32, ValueType.FP64, ValueType.INT32, ValueType.INT64, + ValueType.BOOLEAN, ValueType.STRING }; + String[] names = { "f32", "f64", "i32", "i64", "b", "s" }; + String[][] data = { + { String.valueOf(Float.MAX_VALUE), String.valueOf(Double.MAX_VALUE), + String.valueOf(Integer.MAX_VALUE), String.valueOf(Long.MAX_VALUE), "true", "" }, + { String.valueOf(-Float.MAX_VALUE), String.valueOf(-Double.MAX_VALUE), + String.valueOf(Integer.MIN_VALUE), String.valueOf(Long.MIN_VALUE), "false", "a,b\"c\nd" }, + { "0.0", "0.0", "0", "0", "true", "unicode_é中" } + }; + FrameBlock in = new FrameBlock(schema, names, data); + + new FrameWriterParquet().writeFrameToHDFS(in, FILENAME, 3, 6); + FrameBlock out = new FrameReaderParquet().readFrameFromHDFS(FILENAME, schema, names, 3, 6); + + TestUtils.compareFrames(in, out, false); + } + + @Test + public void testNullsInStringColumn() throws IOException { + // Numeric columns from String[][] convert null to 0. Only test string nulls here. + // Numeric null round-trips are tested in ReadParquetTest. + ValueType[] schema = { ValueType.STRING, ValueType.STRING }; + String[] names = { "a", "b" }; + String[][] data = { + { "x", "y" }, + { null, null }, + { "p", null } + }; + FrameBlock in = new FrameBlock(schema, names, data); + + new FrameWriterParquet().writeFrameToHDFS(in, FILENAME, 3, 2); + FrameBlock out = new FrameReaderParquet().readFrameFromHDFS(FILENAME, schema, names, 3, 2); + + org.junit.Assert.assertNull(out.get(1, 0)); + org.junit.Assert.assertNull(out.get(1, 1)); + org.junit.Assert.assertNull(out.get(2, 1)); + org.junit.Assert.assertNotNull(out.get(0, 0)); + org.junit.Assert.assertNotNull(out.get(2, 0)); + TestUtils.compareFrames(in, out, false); + } + + private void runWriteReadRoundTrip(int rows, int cols, long seed) throws IOException { + try { + ValueType[] schema = new ValueType[cols]; + for(int i = 0; i < cols; i++) + schema[i] = SCHEMA[i % SCHEMA.length]; + + FrameBlock writeBlock = TestUtils.generateRandomFrameBlock(rows, schema, seed); + + new FrameWriterParquet().writeFrameToHDFS(writeBlock, FILENAME, rows, cols); + FrameBlock readBlock = new FrameReaderParquet() + .readFrameFromHDFS(FILENAME, schema, writeBlock.getColumnNames(), rows, cols); + + TestUtils.compareFrames(writeBlock, readBlock, false); + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameWriterParquetLegacy.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameWriterParquetLegacy.java new file mode 100755 index 00000000000..e57e2c99846 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/FrameWriterParquetLegacy.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.test.functions.io.parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameWriter; +import org.apache.sysds.runtime.io.IOUtilFunctions; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.common.Types.ValueType; + +/** + * Single-threaded frame parquet writer. + * + */ +public class FrameWriterParquetLegacy extends FrameWriter { + + private final int batchSizeOverride; + + public FrameWriterParquetLegacy() { + this.batchSizeOverride = 1000; + } + + public FrameWriterParquetLegacy(int batchSize) { + this.batchSizeOverride = batchSize; + } + + /** + * Writes a FrameBlock to a Parquet file on HDFS. + * + * @param src The FrameBlock containing the data to write. + * @param fname The HDFS file path where the Parquet file will be stored. + * @param rlen The expected number of rows. + * @param clen The expected number of columns. + */ + @Override + public final void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long clen) throws IOException, DMLRuntimeException { + // Prepare file access + JobConf conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + + // If the file already exists on HDFS, remove it + HDFSTool.deleteFileIfExistOnHDFS(path, conf); + + // Check frame dimensions + if (src.getNumRows() != rlen || src.getNumColumns() != clen) { + throw new IOException("Frame dimensions mismatch with metadata: " + src.getNumRows() + "x" + src.getNumColumns() + " vs " + rlen + "x" + clen + "."); + } + + // Write parquet file + writeParquetFrameToHDFS(path, conf, src); + } + + /** + * Writes the FrameBlock data to a Parquet file using a ParquetWriter. + * The method generates a Parquet schema based on the metadata of the FrameBlock, initializes a ParquetWriter with specified configurations, + * iterates over each row and column, adding values (in batches for improved performance) using type-specific conversions. + * + * @param path The HDFS path where the Parquet file will be written. + * @param conf The Hadoop configuration. + * @param src The FrameBlock containing the data to write. + */ + protected void writeParquetFrameToHDFS(Path path, Configuration conf, FrameBlock src) + throws IOException + { + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + + // Create schema based on frame block metadata + MessageType schema = createParquetSchema(src); + + int batchSize = batchSizeOverride; + int rowCount = 0; + + // Write data using ParquetWriter //FIXME replace example writer? + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withConf(conf) + .withType(schema) + .withCompressionCodec(ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME) + .withRowGroupSize((long) ParquetWriter.DEFAULT_BLOCK_SIZE) + .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) + .withDictionaryEncoding(true) + .build()) + { + + SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); + + List rowBuffer = new ArrayList<>(batchSize); + + for (int i = 0; i < src.getNumRows(); i++) { + Group group = groupFactory.newGroup(); + for (int j = 0; j < src.getNumColumns(); j++) { + Object value = src.get(i, j); + if (value != null) { + ValueType type = src.getSchema()[j]; + switch (type) { + case STRING: + group.add(src.getColumnNames()[j], value.toString()); + break; + case INT32: + group.add(src.getColumnNames()[j], (int) value); + break; + case INT64: + group.add(src.getColumnNames()[j], (long) value); + break; + case FP32: + group.add(src.getColumnNames()[j], (float) value); + break; + case FP64: + group.add(src.getColumnNames()[j], (double) value); + break; + case BOOLEAN: + group.add(src.getColumnNames()[j], (boolean) value); + break; + default: + throw new IOException("Unsupported value type: " + type); + } + } + } + rowBuffer.add(group); + rowCount++; + + if (rowCount >= batchSize) { + for (Group g : rowBuffer) { + writer.write(g); + } + rowBuffer.clear(); + rowCount = 0; + } + } + + for (Group g : rowBuffer) { + writer.write(g); + } + } + + // Delete CRC files created by Hadoop if necessary + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); + } + + /** + * Creates a Parquet schema based on the metadata of a FrameBlock. + * + * @param src The FrameBlock whose metadata is used to create the Parquet schema. + * @return The generated Parquet MessageType schema. + */ + protected MessageType createParquetSchema(FrameBlock src) { + String[] columnNames = src.getColumnNames(); + ValueType[] columnTypes = src.getSchema(); + Types.MessageTypeBuilder builder = Types.buildMessage(); + + for (int i = 0; i < src.getNumColumns(); i++) { + switch (columnTypes[i]) { + case STRING: + builder.optional(PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named(columnNames[i]); + break; + case INT32: + builder.optional(PrimitiveTypeName.INT32).named(columnNames[i]); + break; + case INT64: + builder.optional(PrimitiveTypeName.INT64).named(columnNames[i]); + break; + case FP32: + builder.optional(PrimitiveTypeName.FLOAT).named(columnNames[i]); + break; + case FP64: + builder.optional(PrimitiveTypeName.DOUBLE).named(columnNames[i]); + break; + case BOOLEAN: + builder.optional(PrimitiveTypeName.BOOLEAN).named(columnNames[i]); + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + columnTypes[i]); + } + } + return builder.named("FrameSchema"); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetReaderBenchmark.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetReaderBenchmark.java new file mode 100644 index 00000000000..592e610185f --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetReaderBenchmark.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.test.functions.io.parquet; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReaderParquet; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.junit.After; +import org.junit.Assume; + +/** + * Parquet reader benchmark comparing the column-API reader against + * the Group-based reader. + * + * Results report median and min/max across runs and are appended to + * temp/benchmark_results.csv for plotting. + * + * If a dataset is not present the corresponding test is skipped with instructions. + */ +public class ParquetReaderBenchmark { + + private static final String TPCH_FILE = "temp/lineitem.tbl"; + private static final String RESULTS_CSV = "temp/benchmark_results.csv"; + private static final String TEMP_FILE = System.getProperty("java.io.tmpdir") + "/systemds_read_bench.parquet"; + private static final int RUNS = 7; + // override on larger machine with -DmaxRows=... (e.g. -DmaxRows=20000000) to test larger row groups. + private static final int MAX_ROWS = Integer.getInteger("maxRows", 2_000_000); + + // TPC-H lineitem schema + private static final ValueType[] LINEITEM_SCHEMA = { + ValueType.INT64, ValueType.INT64, ValueType.INT64, ValueType.INT32, + ValueType.FP64, ValueType.FP64, ValueType.FP64, ValueType.FP64, + ValueType.STRING, ValueType.STRING, ValueType.STRING, ValueType.STRING, + ValueType.STRING, ValueType.STRING, ValueType.STRING, ValueType.STRING + }; + private static final String[] LINEITEM_NAMES = { + "orderkey", "partkey", "suppkey", "linenumber", + "quantity", "extendedprice", "discount", "tax", + "returnflag", "linestatus", "shipdate", "commitdate", + "receiptdate", "shipinstruct", "shipmode", "comment" + }; + + private PrintWriter csv; + + @After + public void cleanup() { + new File(TEMP_FILE).delete(); + if (csv != null) csv.close(); + } + + // @Test + public void benchmarkReadTpch() throws Exception { + ReadSpec spec = writeTempParquet(loadLineitemOrSkip()); + runReadBenchmark("read_tpch", spec); + } + + private static final class ReadSpec { + final ValueType[] schema; final String[] names; final int rows; final int cols; + ReadSpec(ValueType[] schema, String[] names, int rows, int cols) { + this.schema = schema; this.names = names; this.rows = rows; this.cols = cols; + } + } + + /** + * Writes a frame to Parquet, then times reading it back with both sequential and parallel readers. + */ + private ReadSpec writeTempParquet(FrameBlock data) throws Exception { + int rows = data.getNumRows(), cols = data.getNumColumns(); + ReadSpec spec = new ReadSpec(data.getSchema(), data.getColumnNames(), rows, cols); + new File(TEMP_FILE).delete(); + new FrameWriterParquet().writeFrameToHDFS(data, TEMP_FILE, rows, cols); + return spec; + } + + /** + * Writes a frame to a Parquet, then benchmarks sequential and parallel reads + */ + private void runReadBenchmark(String category, ReadSpec spec) throws Exception { + final ValueType[] schema = spec.schema; + final String[] names = spec.names; + final int rows = spec.rows, cols = spec.cols; + + FrameBlock probe = new FrameReaderParquet().readFrameFromHDFS(TEMP_FILE, schema, names, rows, cols); + org.junit.Assert.assertEquals("Row count mismatch", rows, probe.getNumRows()); + org.junit.Assert.assertEquals("Column count mismatch", cols, probe.getNumColumns()); + probe = null; + + openCsv(); + System.out.println("\n=== Parquet Read Benchmark [" + category + "] (" + rows + " rows, median of " + RUNS + " runs) ===\n"); + System.out.printf("%-24s %12s %15s %s%n", "Reader", "Median (ms)", "Rows/sec", "[runs]"); + System.out.println("-".repeat(72)); + + timeRead(category, "Legacy (Group)", () -> + new FrameReaderParquetLegacy().readFrameFromHDFS(TEMP_FILE, schema, names, rows, cols), rows); + timeRead(category, "Column API", () -> + new FrameReaderParquet().readFrameFromHDFS(TEMP_FILE, schema, names, rows, cols), rows); + System.out.println(); + } + + private interface ReadAction { FrameBlock run() throws Exception; } + + private void timeRead(String category, String label, ReadAction action, int rows) throws Exception { + action.run(); // warmup + + long[] times = new long[RUNS]; + for (int run = 0; run < RUNS; run++) { + long start = System.currentTimeMillis(); + action.run(); + times[run] = System.currentTimeMillis() - start; + } + long med = median(times); + long min = Arrays.stream(times).min().orElse(med); + long max = Arrays.stream(times).max().orElse(med); + System.out.printf("%-24s %12d %15.0f %14s %s%n", label, med, rows * 1000.0 / med, meanStd(times), Arrays.toString(times)); + // columns: benchmark,label,time_ms(median),rows_per_sec,min_ms,max_ms + csv.printf("%s,%s,%d,%.0f,%d,%d%n", category, label, med, rows * 1000.0 / med, min, max); + } + + private static long median(long[] times) { + long[] sorted = times.clone(); + Arrays.sort(sorted); + return sorted[sorted.length / 2]; + } + + private static String meanStd(long[] times) { + double mean = Arrays.stream(times).average().orElse(0); + double var = Arrays.stream(times).mapToDouble(t -> (t - mean) * (t - mean)).average().orElse(0); + return String.format("%.0f+-%.0f ms", mean, Math.sqrt(var)); + } + + private void openCsv() throws Exception { + new File("temp").mkdirs(); + boolean exists = new File(RESULTS_CSV).exists(); + csv = new PrintWriter(new FileWriter(RESULTS_CSV, true)); + if (!exists) + csv.println("benchmark,label,time_ms,rows_per_sec,size_mb,compression_ratio"); + csv.flush(); + } + + private FrameBlock loadLineitemOrSkip() throws Exception { + File f = new File(TPCH_FILE); + if (!f.exists()) { + System.out.println("=== TPC-H read benchmark skipped, dataset not found at " + TPCH_FILE + " ==="); + Assume.assumeTrue("TPC-H dataset not found at " + TPCH_FILE, false); + } + System.out.print("Loading " + f.getPath() + " ... "); + List rows = new ArrayList<>(); + try (BufferedReader br = new BufferedReader(new FileReader(f))) { + String line; + while ((line = br.readLine()) != null && rows.size() < MAX_ROWS) { + if (line.isEmpty()) continue; + if (line.endsWith("|")) line = line.substring(0, line.length() - 1); + rows.add(line.split("\\|", -1)); + } + } + String[][] arr = rows.toArray(new String[0][]); + System.out.println(arr.length + " rows loaded."); + return new FrameBlock(LINEITEM_SCHEMA, LINEITEM_NAMES, arr); + } + +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetTestUtils.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetTestUtils.java new file mode 100644 index 00000000000..9ce52887cee --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetTestUtils.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.parquet; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.test.AutomatedTestBase; + +class ParquetTestUtils { + + static class ParquetMetadataInfo { + String[] names; + ValueType[] schema; + long rlen; + long clen; + } + + static ParquetMetadataInfo inferMetadata(String fname) throws IOException { + Configuration conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + + ParquetMetadata metadata; + try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + metadata = r.getFooter(); + } + MessageType parquetSchema = metadata.getFileMetaData().getSchema(); + + int fieldCount = parquetSchema.getFieldCount(); + String[] names = new String[fieldCount]; + ValueType[] schema = new ValueType[fieldCount]; + + for (int i = 0; i < fieldCount; i++) { + names[i] = parquetSchema.getFieldName(i); + PrimitiveType.PrimitiveTypeName type = parquetSchema.getType(i).asPrimitiveType().getPrimitiveTypeName(); + switch (type) { + case INT32: schema[i] = ValueType.INT32; break; + case INT64: schema[i] = ValueType.INT64; break; + case FLOAT: schema[i] = ValueType.FP32; break; + case DOUBLE: schema[i] = ValueType.FP64; break; + case BOOLEAN: schema[i] = ValueType.BOOLEAN; break; + case BINARY: schema[i] = ValueType.STRING; break; + case INT96: schema[i] = ValueType.INT64; break; + default: + throw new IOException("Unsupported parquet type: " + type + " in column " + names[i]); + } + } + + long rlen = 0; + for (BlockMetaData block : metadata.getBlocks()) + rlen += block.getRowCount(); + + ParquetMetadataInfo info = new ParquetMetadataInfo(); + info.names = names; + info.schema = schema; + info.rlen = rlen; + info.clen = fieldCount; + return info; + } + + /** + * Generates the public test files (userdata1, alltypes_plain, all) with Spark's + * DataFrameWriter. userdata1 and alltypes_plain each include a TimestampType column, + * which Spark 3.5 encodes as INT96 by default. + * + * @param outDir directory the generated files are written into + * @return map from file name (e.g. "userdata1") to its generated file path + */ + static Map generatePublicTestFiles(File outDir) throws Exception { + SparkSession spark = AutomatedTestBase.createSystemDSSparkSession("parquet-test-files", "local[1]"); + + Map files = new LinkedHashMap<>(); + files.put("userdata1", writeTestFile(spark, outDir, "userdata1", userdata1Rows(), userdata1Schema())); + files.put("alltypes_plain", writeTestFile(spark, outDir, "alltypes_plain", alltypesPlainRows(), alltypesPlainSchema())); + files.put("all", writeTestFile(spark, outDir, "all", allRows(), allSchema())); + return files; + } + + private static StructType userdata1Schema() { + return new StructType(new StructField[] { + new StructField("registration_dttm", DataTypes.TimestampType, true, Metadata.empty()), + new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("first_name", DataTypes.StringType, true, Metadata.empty()), + new StructField("salary", DataTypes.DoubleType, true, Metadata.empty()), + }); + } + + private static List userdata1Rows() { + return Arrays.asList( + RowFactory.create(Timestamp.valueOf("2016-02-03 07:55:29"), 1, "Amanda", 49756.53), + RowFactory.create(Timestamp.valueOf("2016-02-03 17:04:03"), 2, "Albert", 150280.17), + RowFactory.create(Timestamp.valueOf("2016-02-03 01:09:31"), 3, "Evelyn", 144972.51), + RowFactory.create((Timestamp) null, 4, "Denise", 90263.05), + RowFactory.create(Timestamp.valueOf("2016-02-03 05:07:25"), 5, "Carlos", 75500.34) + ); + } + + private static StructType alltypesPlainSchema() { + return new StructType(new StructField[] { + new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("bool_col", DataTypes.BooleanType, true, Metadata.empty()), + new StructField("tinyint_col", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("smallint_col", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("bigint_col", DataTypes.LongType, true, Metadata.empty()), + new StructField("float_col", DataTypes.FloatType, true, Metadata.empty()), + new StructField("double_col", DataTypes.DoubleType, true, Metadata.empty()), + new StructField("date_string_col", DataTypes.StringType, true, Metadata.empty()), + new StructField("string_col", DataTypes.StringType, true, Metadata.empty()), + new StructField("timestamp_col", DataTypes.TimestampType, true, Metadata.empty()), + }); + } + + private static List alltypesPlainRows() { + return Arrays.asList( + RowFactory.create(1, true, 1, 10, 100L, 1.5f, 2.25, "03/01/09", "row-1", Timestamp.valueOf("2009-03-01 00:00:00")), + RowFactory.create(2, false, 2, 20, 200L, 2.5f, 4.5, "03/02/09", "row-2", Timestamp.valueOf("2009-03-02 05:15:30")), + RowFactory.create(3, true, 3, 30, 300L, 3.5f, 6.75, "03/03/09", "row-3", Timestamp.valueOf("2009-03-03 10:30:00")), + RowFactory.create(4, false, 4, 40, 400L, 4.5f, 9.0, "03/04/09", "row-4", Timestamp.valueOf("2009-03-04 15:45:15")), + RowFactory.create(5, true, 5, 50, 500L, 5.5f, 11.25, "03/05/09", "row-5", Timestamp.valueOf("2009-03-05 21:00:45")), + RowFactory.create(6, false, 6, 60, 600L, 6.5f, 13.5, "03/06/09", "row-6", Timestamp.valueOf("2009-03-06 02:20:10")), + RowFactory.create(7, true, 7, 70, 700L, 7.5f, 15.75, "03/07/09", "row-7", Timestamp.valueOf("2009-03-07 08:35:50")), + RowFactory.create(8, false, 8, 80, 800L, 8.5f, 18.0, "03/08/09", "row-8", Timestamp.valueOf("2009-03-08 13:50:25")) + ); + } + + private static StructType allSchema() { + return new StructType(new StructField[] { + new StructField("PassengerId", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("Survived", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("Pclass", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("Name", DataTypes.StringType, true, Metadata.empty()), + new StructField("Sex", DataTypes.StringType, true, Metadata.empty()), + new StructField("Age", DataTypes.DoubleType, true, Metadata.empty()), + new StructField("Fare", DataTypes.DoubleType, true, Metadata.empty()), + new StructField("Embarked", DataTypes.StringType, true, Metadata.empty()), + }); + } + + private static List allRows() { + return Arrays.asList( + RowFactory.create(1, 0, 3, "Braund, Mr. Owen Harris", "male", 22.0, 7.25, "S"), + RowFactory.create(2, 1, 1, "Cumings, Mrs. John Bradley", "female", 38.0, 71.2833, "C"), + RowFactory.create(3, 1, 3, "Heikkinen, Miss. Laina", "female", 26.0, 7.925, "S"), + RowFactory.create(4, 1, 1, "Futrelle, Mrs. Jacques Heath", "female", 35.0, 53.1, "S"), + RowFactory.create(5, 0, 3, "Allen, Mr. William Henry", "male", null, 8.05, "S"), + RowFactory.create(6, 0, 3, "Moran, Mr. James", "male", null, 8.4583, "Q"), + RowFactory.create(7, 0, 1, "McCarthy, Mr. Timothy J", "male", 54.0, 51.8625, "S"), + RowFactory.create(8, 0, 3, "Palsson, Master. Gosta Leonard", "male", 2.0, 21.075, "S") + ); + } + + // Spark writes a directory of part files, so we force one partition and rename it to a single file. + private static String writeTestFile(SparkSession spark, File outDir, String name, List rows, StructType schema) throws IOException { + Dataset df = spark.createDataFrame(rows, schema); + File tmpDir = new File(outDir, "_tmp_" + name); + df.coalesce(1).write().mode(SaveMode.Overwrite).parquet(tmpDir.getPath()); + + File[] parts = tmpDir.listFiles((d, n) -> n.startsWith("part-") && n.endsWith(".parquet")); + if (parts == null || parts.length != 1) + throw new IOException("expected exactly 1 part file in " + tmpDir); + + File dest = new File(outDir, name + ".parquet"); + Files.copy(parts[0].toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING); + deleteRecursive(tmpDir); + return dest.getPath(); + } + + private static void deleteRecursive(File f) { + File[] children = f.listFiles(); + if (children != null) + for (File c : children) + deleteRecursive(c); + f.delete(); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetWriterBenchmark.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetWriterBenchmark.java new file mode 100644 index 00000000000..902931a5adf --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/ParquetWriterBenchmark.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysds.test.functions.io.parquet; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.apache.sysds.runtime.io.FrameWriterParquet.DictEncoding; +import org.junit.After; +import org.junit.Assume; + +/** + * Parquet writer benchmark using the TPC-H lineitem dataset. + * Writes results to temp/benchmark_results.csv for plotting. + * + * The benchmark methods are disabled by default; uncomment to run manually. + * If a dataset is not present inside the temp dir the corresponding test is skipped with instructions. + * + * The default maxRows=2_000_000, to benchmark row-group + * size properly, run on a machine with enough RAM for a larger frame: + * # 1. Generate a larger TPC-H lineitem into temp/lineitem.tbl + * # 2. Raise the maxRows cap, then run the benchmark: + * mvn test -Dtest=ParquetWriterBenchmark#benchmarkRowGroupSizes \ + * -DmaxRows=60000000 -DargLine="-Xms24g -Xmx24g" -DfailIfNoTests=false + */ +public class ParquetWriterBenchmark { + + private static final String TPCH_FILE = "temp/lineitem.tbl"; + private static final String RESULTS_CSV = "temp/benchmark_results.csv"; + private static final String TEMP_FILE = System.getProperty("java.io.tmpdir") + "/systemds_tpch_bench.parquet"; + private static final int RUNS = 3; + private static final int MAX_ROWS = Integer.getInteger("maxRows", 2_000_000); + private static final int[] BATCH_SIZES = { 1, 100, 500, 1000, 5000, 10_000, 50_000, 100_000, 200_000 }; + private static final long[] ROW_GROUP_SIZES = { + 1024 * 1024, // 1 MB + 8L * 1024 * 1024, // 8 MB + 16L * 1024 * 1024, // 16 MB + 32L * 1024 * 1024, // 32 MB + 64L * 1024 * 1024, // 64 MB + 128L * 1024 * 1024, // 128 MB (Parquet default) + 256L * 1024 * 1024, // 256 MB + 512L * 1024 * 1024 // 512 MB + }; + + // TPC-H lineitem schema + private static final ValueType[] LINEITEM_SCHEMA = { + ValueType.INT64, // orderkey + ValueType.INT64, // partkey + ValueType.INT64, // suppkey + ValueType.INT32, // linenumber + ValueType.FP64, // quantity + ValueType.FP64, // extendedprice + ValueType.FP64, // discount + ValueType.FP64, // tax + ValueType.STRING, // returnflag (3 unique values) + ValueType.STRING, // linestatus (2 unique values) + ValueType.STRING, // shipdate + ValueType.STRING, // commitdate + ValueType.STRING, // receiptdate + ValueType.STRING, // shipinstruct (4 unique values) + ValueType.STRING, // shipmode (7 unique values) + ValueType.STRING // comment + }; + + private static final String[] LINEITEM_NAMES = { + "orderkey", "partkey", "suppkey", "linenumber", + "quantity", "extendedprice", "discount", "tax", + "returnflag", "linestatus", "shipdate", "commitdate", + "receiptdate", "shipinstruct", "shipmode", "comment" + }; + + private PrintWriter csv; + + @After + public void cleanup() { + new File(TEMP_FILE).delete(); + if (csv != null) csv.close(); + } + + // @Test + public void benchmarkWriters() throws Exception { + FrameBlock data = loadOrSkip(); + int rows = data.getNumRows(); + + openCsv(); + System.out.println("\n=== TPC-H Writer Benchmark (" + rows + " rows, median of " + RUNS + " runs) ===\n"); + System.out.printf("%-38s %12s %15s%n", "Configuration", "Time (ms)", "Rows/sec"); + System.out.println("-".repeat(70)); + + FrameWriterParquet newWriter = new FrameWriterParquet(CompressionCodecName.UNCOMPRESSED, DictEncoding.ALL_ON); + + // Warmup + new File(TEMP_FILE).delete(); + new FrameWriterParquetLegacy(1000).writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); + new File(TEMP_FILE).delete(); + newWriter.writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); + + for (int batchSize : BATCH_SIZES) { + long[] times = new long[RUNS]; + for (int run = 0; run < RUNS; run++) { + new File(TEMP_FILE).delete(); + long start = System.currentTimeMillis(); + new FrameWriterParquetLegacy(batchSize).writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); + times[run] = System.currentTimeMillis() - start; + } + long med = median(times); + String label = "Legacy batchSize=" + batchSize; + System.out.printf("%-38s %12d %15.0f%n", label, med, rows * 1000.0 / med); + csv.printf("batch_sizes,%s,%d,%.0f,,%n", label, med, rows * 1000.0 / med); + } + + System.out.println("-".repeat(70)); + + long[] times = new long[RUNS]; + for (int run = 0; run < RUNS; run++) { + new File(TEMP_FILE).delete(); + long start = System.currentTimeMillis(); + newWriter.writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); + times[run] = System.currentTimeMillis() - start; + } + long med = median(times); + System.out.printf("%-38s %12d %15.0f%n", "New WriteSupport", med, rows * 1000.0 / med); + csv.printf("batch_sizes,New WriteSupport,%d,%.0f,,%n", med, rows * 1000.0 / med); + System.out.println(); + } + + // @Test + public void benchmarkDictionaryEncoding() throws Exception { + FrameBlock data = loadOrSkip(); + int rows = data.getNumRows(); + + openCsv(); + System.out.println("\n=== TPC-H Dictionary Encoding Benchmark (" + rows + " rows, median of " + RUNS + " runs) ===\n"); + System.out.printf("%-20s %12s %15s%n", "Strategy", "Time (ms)", "Rows/sec"); + System.out.println("-".repeat(52)); + + time("encoding", "ALL_ON", new FrameWriterParquet(CompressionCodecName.UNCOMPRESSED, DictEncoding.ALL_ON), data, rows); + time("encoding", "ALL_OFF", new FrameWriterParquet(CompressionCodecName.UNCOMPRESSED, DictEncoding.ALL_OFF), data, rows); + time("encoding", "STRING_ONLY", new FrameWriterParquet(CompressionCodecName.UNCOMPRESSED, DictEncoding.STRING_ONLY), data, rows); + System.out.println(); + } + + + // @Test + public void benchmarkRowGroupSizes() throws Exception { + FrameBlock data = loadOrSkip(); + int rows = data.getNumRows(); + + openCsv(); + System.out.println("\n=== TPC-H Row Group Size Benchmark (" + rows + " rows, median of " + RUNS + " runs) ===\n"); + System.out.printf("%-20s %12s %15s%n", "Row Group Size", "Time (ms)", "Rows/sec"); + System.out.println("-".repeat(52)); + + for (long rowGroupSize : ROW_GROUP_SIZES) { + String label = (rowGroupSize / (1024 * 1024)) + "MB"; + time("row_group_sizes", label, + new FrameWriterParquet(CompressionCodecName.ZSTD, DictEncoding.ALL_ON, rowGroupSize), data, rows); + } + System.out.println(); + } + + private FrameBlock loadOrSkip() throws Exception { + File f = new File(TPCH_FILE); + if (!f.exists()) { + System.out.println(); + System.out.println("==================================================="); + System.out.println("TPC-H benchmark skipped, dataset not found"); + System.out.println("To reproduce:"); + System.out.println(" 1. Install DuckDB: https://duckdb.org"); + System.out.println(" 2. Open shell: duckdb"); + System.out.println(" 3. Run in DuckDB: INSTALL tpch;"); + System.out.println(" LOAD tpch;"); + System.out.println(" CALL dbgen(sf=1);"); + System.out.println(" COPY lineitem TO '/temp/lineitem.tbl'"); + System.out.println(" (DELIMITER '|', HEADER false);"); + System.out.println("==================================================="); + Assume.assumeTrue("TPC-H dataset not found at " + TPCH_FILE, false); + } + return loadLineitem(f); + } + + private void openCsv() throws Exception { + new File("temp").mkdirs(); + boolean exists = new File(RESULTS_CSV).exists(); + csv = new PrintWriter(new FileWriter(RESULTS_CSV, true)); + if (!exists) + csv.println("benchmark,label,time_ms,rows_per_sec,size_mb,compression_ratio"); + csv.flush(); + } + + private void time(String category, String label, FrameWriterParquet writer, FrameBlock data, int rows) throws Exception { + new File(TEMP_FILE).delete(); + writer.writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); // warmup + + long[] times = new long[RUNS]; + for (int run = 0; run < RUNS; run++) { + new File(TEMP_FILE).delete(); + long start = System.currentTimeMillis(); + writer.writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); + times[run] = System.currentTimeMillis() - start; + } + long med = median(times); + System.out.printf("%-20s %12d %15.0f%n", label, med, rows * 1000.0 / med); + csv.printf("%s,%s,%d,%.0f,,%n", category, label, med, rows * 1000.0 / med); + } + + private long timeWithSize(String category, String label, FrameWriterParquet writer, FrameBlock data, int rows, long baseSize) throws Exception { + new File(TEMP_FILE).delete(); + writer.writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); // warmup + + long[] times = new long[RUNS]; + for (int run = 0; run < RUNS; run++) { + new File(TEMP_FILE).delete(); + long start = System.currentTimeMillis(); + writer.writeFrameToHDFS(data, TEMP_FILE, rows, data.getNumColumns()); + times[run] = System.currentTimeMillis() - start; + } + long med = median(times); + long size = new File(TEMP_FILE).length(); + double mb = size / (1024.0 * 1024.0); + String ratio = baseSize > 0 ? String.format("%.2fx", (double) baseSize / size) : "baseline"; + System.out.printf("%-20s %12d %15.0f %12.2f %10s%n", label, med, rows * 1000.0 / med, mb, ratio); + csv.printf("%s,%s,%d,%.0f,%.2f,%s%n", category, label, med, rows * 1000.0 / med, mb, ratio); + return size; + } + + private static long median(long[] times) { + long[] sorted = times.clone(); + Arrays.sort(sorted); + return sorted[sorted.length / 2]; + } + + private static FrameBlock loadLineitem(File f) throws Exception { + System.out.print("Loading " + f.getPath() + " ... "); + List rows = new ArrayList<>(); + try (BufferedReader br = new BufferedReader(new FileReader(f))) { + String line; + while ((line = br.readLine()) != null && rows.size() < MAX_ROWS) { + if (line.isEmpty()) continue; + if (line.endsWith("|")) line = line.substring(0, line.length() - 1); + rows.add(line.split("\\|", -1)); + } + } + String[][] data = rows.toArray(new String[0][]); + System.out.println(data.length + " rows loaded."); + return new FrameBlock(LINEITEM_SCHEMA, LINEITEM_NAMES, data); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/ReadParquetTest.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/ReadParquetTest.java new file mode 100644 index 00000000000..4c7f2ccae2e --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/ReadParquetTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.parquet; + +import java.io.File; +import java.nio.file.Files; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReaderParquet; +import org.apache.sysds.runtime.io.FrameReaderParquetParallel; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.apache.sysds.runtime.io.FrameWriterParquetParallel; +import org.apache.sysds.test.functions.io.parquet.ParquetTestUtils.ParquetMetadataInfo; +import org.apache.sysds.test.TestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ReadParquetTest { + + // Generated once per test class with Spark's DataFrameWriter + private static File testFileDir; + private static String[] FILENAMES; + + @BeforeClass + public static void generateTestFiles() throws Exception { + testFileDir = Files.createTempDirectory("systemds_parquet_public_test_files").toFile(); + Map files = ParquetTestUtils.generatePublicTestFiles(testFileDir); + FILENAMES = new String[] { files.get("userdata1"), files.get("alltypes_plain"), files.get("all") }; + } + + @AfterClass + public static void cleanupTestFiles() { + File[] children = testFileDir.listFiles(); + if (children != null) + for (File f : children) + f.delete(); + testFileDir.delete(); + } + + @Test + public void testReadParquet() throws Exception { + for (String filename : FILENAMES) { + ParquetMetadataInfo info = ParquetTestUtils.inferMetadata(filename); + + FrameReaderParquet reader = new FrameReaderParquet(); + FrameBlock frame = reader.readFrameFromHDFS(filename, info.schema, info.names, info.rlen, info.clen); + + Assert.assertEquals("Row count mismatch for " + filename, info.rlen, frame.getNumRows()); + Assert.assertEquals("Column count mismatch for " + filename, info.clen, frame.getNumColumns()); + } + } + + @Test + public void testInt96ColumnsDecodedCorrectly() throws Exception { + assertColumnIsEpochMillis(FILENAMES[0], 0); // userdata1: registration_dttm + assertColumnIsEpochMillis(FILENAMES[1], 9); // alltypes_plain: timestamp_col + } + + private void assertColumnIsEpochMillis(String filename, int colIdx) throws Exception { + ParquetMetadataInfo info = ParquetTestUtils.inferMetadata(filename); + FrameReaderParquet reader = new FrameReaderParquet(); + FrameBlock frame = reader.readFrameFromHDFS(filename, info.schema, info.names, info.rlen, info.clen); + int decoded = 0; + for (int r = 0; r < frame.getNumRows(); r++) { + Object val = frame.get(r, colIdx); + if (val == null) continue; + decoded++; + Assert.assertTrue( + "Expected Long (epoch millis) at row " + r + " col " + colIdx + ", got: " + val.getClass().getSimpleName(), + val instanceof Long + ); + } + Assert.assertTrue("No INT96 values were decoded in " + filename, decoded > 0); + } + + @Test + public void testNullHandling() throws Exception { + File temp = Files.createTempFile("systemds_null_parquet", ".parquet").toFile(); + try { + ValueType[] schema = { ValueType.STRING, ValueType.STRING }; + String[] names = { "a", "b" }; + FrameBlock original = new FrameBlock(schema, names, + new String[][] { { "x", "y" }, { null, null }, { "p", "q" } }); + + new FrameWriterParquet().writeFrameToHDFS(original, temp.getPath(), 3, 2); + + FrameBlock result = new FrameReaderParquet() + .readFrameFromHDFS(temp.getPath(), schema, names, 3, 2); + + Assert.assertNotNull("Row 0 col 0 should be non-null", result.get(0, 0)); + Assert.assertNotNull("Row 0 col 1 should be non-null", result.get(0, 1)); + Assert.assertNull("Row 1 col 0 should be null", result.get(1, 0)); + Assert.assertNull("Row 1 col 1 should be null", result.get(1, 1)); + Assert.assertNotNull("Row 2 col 0 should be non-null", result.get(2, 0)); + Assert.assertNotNull("Row 2 col 1 should be non-null", result.get(2, 1)); + } finally { + temp.delete(); + } + + } + + @Test + public void testColumnReaderMatchesLegacy() throws Exception { + for (String filename : FILENAMES) { + ParquetMetadataInfo info = ParquetTestUtils.inferMetadata(filename); + + FrameBlock legacy = new FrameReaderParquetLegacy() + .readFrameFromHDFS(filename, info.schema, info.names, info.rlen, info.clen); + FrameBlock current = new FrameReaderParquet() + .readFrameFromHDFS(filename, info.schema, info.names, info.rlen, info.clen); + + TestUtils.compareFrames(legacy, current, false); + } + } + + @Test + public void testColumnSubsetProjection() throws Exception { + // request a reordered subset of columns (d, a, c; skip b) + File temp = Files.createTempFile("systemds_subset_parquet", ".parquet").toFile(); + try { + ValueType[] fullSchema = { ValueType.INT64, ValueType.STRING, ValueType.FP64, ValueType.BOOLEAN }; + String[] fullNames = { "a", "b", "c", "d" }; + FrameBlock original = new FrameBlock(fullSchema, fullNames, new String[][] { + { "10", "x", "1.5", "true" }, + { "20", "y", "2.5", "false" }, + { "30", "z", "3.5", "true" } + }); + new FrameWriterParquet().writeFrameToHDFS(original, temp.getPath(), 3, 4); + + ValueType[] subSchema = { ValueType.BOOLEAN, ValueType.INT64, ValueType.FP64 }; + String[] subNames = { "d", "a", "c" }; + + FrameBlock legacy = new FrameReaderParquetLegacy() + .readFrameFromHDFS(temp.getPath(), subSchema, subNames, 3, 3); + FrameBlock current = new FrameReaderParquet() + .readFrameFromHDFS(temp.getPath(), subSchema, subNames, 3, 3); + + TestUtils.compareFrames(legacy, current, false); + + Assert.assertEquals(true, current.get(0, 0)); // d + Assert.assertEquals(10L, current.get(0, 1)); // a + Assert.assertEquals(3.5, ((Number) current.get(2, 2)).doubleValue(), 0.0); // c + } finally { + temp.delete(); + } + } + + @Test + public void testEmptyFile() throws Exception { + File temp = Files.createTempFile("systemds_empty_parquet", ".parquet").toFile(); + try { + ValueType[] schema = { ValueType.INT64, ValueType.STRING }; + String[] names = { "a", "b" }; + FrameBlock empty = new FrameBlock(schema, names, new String[0][]); + new FrameWriterParquet().writeFrameToHDFS(empty, temp.getPath(), 0, 2); + + FrameBlock legacy = new FrameReaderParquetLegacy() + .readFrameFromHDFS(temp.getPath(), schema, names, 0, 2); + FrameBlock current = new FrameReaderParquet() + .readFrameFromHDFS(temp.getPath(), schema, names, 0, 2); + + Assert.assertEquals("Empty file should yield 0 rows (legacy)", 0, legacy.getNumRows()); + Assert.assertEquals("Empty file should yield 0 rows (column API)", 0, current.getNumRows()); + Assert.assertEquals(2, current.getNumColumns()); + } finally { + temp.delete(); + } + } + + @Test + public void testParallelReaderMatchesSequential() throws Exception { + for (String filename : FILENAMES) { + ParquetMetadataInfo info = ParquetTestUtils.inferMetadata(filename); + + FrameReaderParquet sequential = new FrameReaderParquet(); + FrameBlock expected = sequential.readFrameFromHDFS(filename, info.schema, info.names, info.rlen, info.clen); + + FrameReaderParquetParallel parallel = new FrameReaderParquetParallel(); + FrameBlock actual = parallel.readFrameFromHDFS(filename, info.schema, info.names, info.rlen, info.clen); + + TestUtils.compareFrames(expected, actual, false); + } + } + + @Test + public void testParallelReaderMultiFileOffsets() throws Exception { + // each file must use its own row range, else the parallel reader overwrites/loses rows + File tempDir = Files.createTempDirectory("systemds_parallel_parquet").toFile(); + try { + ValueType[] schema = { ValueType.STRING, ValueType.INT32 }; + String[] names = { "label", "value" }; + + FrameBlock block1 = new FrameBlock(schema, names, + new String[][] { { "a", "1" }, { "b", "2" }, { "c", "3" } }); + FrameBlock block2 = new FrameBlock(schema, names, + new String[][] { { "d", "4" }, { "e", "5" }, { "f", "6" } }); + + String path1 = tempDir + "/part-0.parquet"; + String path2 = tempDir + "/part-1.parquet"; + FrameWriterParquet writer = new FrameWriterParquet(); + writer.writeFrameToHDFS(block1, path1, 3, 2); + writer.writeFrameToHDFS(block2, path2, 3, 2); + + FrameReaderParquet seq = new FrameReaderParquet(); + Set expectedLabels = new HashSet<>(); + String[] parts = { path1, path2 }; + for (String p : parts) { + FrameBlock fb = seq.readFrameFromHDFS(p, schema, names, 3, 2); + for (int r = 0; r < fb.getNumRows(); r++) + expectedLabels.add((String) fb.get(r, 0)); + } + + FrameReaderParquetParallel parallel = new FrameReaderParquetParallel(); + FrameBlock result = parallel.readFrameFromHDFS(tempDir.toString(), schema, names, 6, 2); + + Assert.assertEquals("Expected 6 total rows", 6, result.getNumRows()); + + Set actualLabels = new HashSet<>(); + for (int r = 0; r < result.getNumRows(); r++) { + Object label = result.get(r, 0); + Assert.assertNotNull("Row " + r + " is null, row-offset bug suspected", label); + actualLabels.add((String) label); + } + Assert.assertEquals("Parallel result does not match sequential ground truth", expectedLabels, actualLabels); + } finally { + for (File f : tempDir.listFiles()) + f.delete(); + tempDir.delete(); + } + } + + @Test + public void testParallelWriterRoundTrip() throws Exception { + File tempDir = Files.createTempDirectory("systemds_parallel_write").toFile(); + try { + ValueType[] schema = { ValueType.INT64, ValueType.STRING, ValueType.FP64 }; + String[] names = { "id", "name", "val" }; + String[][] data = new String[20][]; + for (int i = 0; i < 20; i++) + data[i] = new String[] { String.valueOf(i), "row" + i, String.valueOf(i + 0.5) }; + FrameBlock original = new FrameBlock(schema, names, data); + + FrameWriterParquetParallel writer = new FrameWriterParquetParallel(); + writer.setForcedParallel(true); + writer.writeFrameToHDFS(original, tempDir.getPath(), 20, 3); + + FrameBlock result = new FrameReaderParquetParallel() + .readFrameFromHDFS(tempDir.getPath(), schema, names, 20, 3); + + Assert.assertEquals("Row count mismatch after parallel write", 20, result.getNumRows()); + // Rows may be out of order in parallel reads, validate by comparing tuples, not row positions + Set expected = new HashSet<>(); + for (int i = 0; i < 20; i++) + expected.add(i + "|row" + i + "|" + (i + 0.5)); + Set actual = new HashSet<>(); + for (int r = 0; r < result.getNumRows(); r++) + actual.add(result.get(r, 0) + "|" + result.get(r, 1) + "|" + result.get(r, 2)); + Assert.assertEquals("Parallel-written data does not round-trip", expected, actual); + } finally { + for (File f : tempDir.listFiles()) + f.delete(); + tempDir.delete(); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/parquet/WriteParquetTest.java b/src/test/java/org/apache/sysds/test/functions/io/parquet/WriteParquetTest.java new file mode 100644 index 00000000000..e8a6c5b3599 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/parquet/WriteParquetTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.parquet; + +import java.io.File; +import java.nio.file.Files; +import java.util.Map; + +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReaderParquet; +import org.apache.sysds.runtime.io.FrameReaderParquetParallel; +import org.apache.sysds.runtime.io.FrameWriterParquet; +import org.apache.sysds.runtime.io.FrameWriterParquetParallel; +import org.apache.sysds.test.functions.io.parquet.ParquetTestUtils.ParquetMetadataInfo; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class WriteParquetTest { + + private static final String TEMP_FILE = System.getProperty("java.io.tmpdir") + "/systemds_write_parquet_test.parquet"; + private static final String TEMP_PAR_PATH = System.getProperty("java.io.tmpdir") + "/systemds_write_parquet_test_par"; + + // See ParquetTestUtils.generatePublicTestFiles(): these are generated with Spark's DataFrameWriter + private static File testFileDir; + private static String[] PUBLIC_FILES; + + @BeforeClass + public static void generateTestFiles() throws Exception { + testFileDir = Files.createTempDirectory("systemds_parquet_public_test_files").toFile(); + Map files = ParquetTestUtils.generatePublicTestFiles(testFileDir); + PUBLIC_FILES = new String[] { files.get("userdata1"), files.get("alltypes_plain"), files.get("all") }; + } + + @AfterClass + public static void cleanupTestFiles() { + File[] children = testFileDir.listFiles(); + if (children != null) + for (File f : children) + f.delete(); + testFileDir.delete(); + } + + @After + public void cleanup() { + new File(TEMP_FILE).delete(); + deleteRecursive(new File(TEMP_PAR_PATH)); + } + + private static void deleteRecursive(File f) { + if (f.isDirectory()) + for (File c : f.listFiles()) deleteRecursive(c); + f.delete(); + } + + @Test + public void testRoundtripPublicFiles() throws Exception { + for (String filename : PUBLIC_FILES) { + ParquetMetadataInfo info = ParquetTestUtils.inferMetadata(filename); + + FrameReaderParquet reader = new FrameReaderParquet(); + FrameBlock original = reader.readFrameFromHDFS(filename, info.schema, info.names, info.rlen, info.clen); + + FrameWriterParquet writer = new FrameWriterParquet(); + writer.writeFrameToHDFS(original, TEMP_FILE, original.getNumRows(), original.getNumColumns()); + + FrameBlock result = reader.readFrameFromHDFS(TEMP_FILE, info.schema, info.names, info.rlen, info.clen); + + TestUtils.compareFrames(original, result, false); + } + } + + @Test + public void testMultiPartFileRoundtrip() throws Exception { + // Create two parquet part files and verify that the parallel reader + // reads both files correctly and combines them into the expected result. + ValueType[] schema = { + ValueType.STRING, + ValueType.INT32, + ValueType.INT64, + ValueType.FP32, + ValueType.FP64, + ValueType.BOOLEAN + }; + String[] names = { "name", "age", "id", "score", "ratio", "active" }; + String[][] data = { + { "Alice", "30", "1000", "1.5", "0.75", "true" }, + { "Bob", "25", "2000", "2.5", "0.50", "false" }, + { "Carol", "40", "3000", "3.5", "0.25", "true" }, + { "Dave", "35", "4000", "4.5", "0.10", "false" }, + { "Eve", "28", "5000", "5.5", "0.90", "true" }, + { "Frank", "45", "6000", "6.5", "0.60", "false" } + }; + FrameBlock original = new FrameBlock(schema, names, data); + + new File(TEMP_PAR_PATH).mkdir(); + FrameWriterParquet writer = new FrameWriterParquet(); + writer.writeFrameToHDFS(original.slice(0, 2), TEMP_PAR_PATH + "/part-0.parquet", 3, schema.length); + writer.writeFrameToHDFS(original.slice(3, 5), TEMP_PAR_PATH + "/part-1.parquet", 3, schema.length); + + FrameBlock result = new FrameReaderParquetParallel() + .readFrameFromHDFS(TEMP_PAR_PATH, schema, names, 6, schema.length); + + TestUtils.compareFrames(original, result, false); + } + + @Test + public void testParallelRoundtrip() throws Exception { + ValueType[] schema = { + ValueType.STRING, + ValueType.INT32, + ValueType.INT64, + ValueType.FP32, + ValueType.FP64, + ValueType.BOOLEAN + }; + String[] names = { "name", "age", "id", "score", "ratio", "active" }; + String[][] data = { + { "Alice", "30", "1000", "1.5", "0.75", "true" }, + { "Bob", "25", "2000", "2.5", "0.50", "false" }, + { "Carol", "40", "3000", "3.5", "0.25", "true" } + }; + FrameBlock original = new FrameBlock(schema, names, data); + + new FrameWriterParquetParallel().writeFrameToHDFS(original, TEMP_PAR_PATH, original.getNumRows(), original.getNumColumns()); + FrameBlock result = new FrameReaderParquetParallel().readFrameFromHDFS(TEMP_PAR_PATH, schema, names, original.getNumRows(), original.getNumColumns()); + + TestUtils.compareFrames(original, result, false); + } + + @Test + public void testRoundtrip() throws Exception { + ValueType[] schema = { + ValueType.STRING, + ValueType.INT32, + ValueType.INT64, + ValueType.FP32, + ValueType.FP64, + ValueType.BOOLEAN + }; + String[] names = { "name test", "age", "id", "score", "ratio", "active" }; + String[][] data = { + { "Alice", "30", "1000", "1.5", "0.75", "true" }, + { "Bob", "25", "2000", "2.5", "0.50", "false" }, + { "Carol", "40", "3000", "3.5", "0.25", "true" } + }; + + FrameBlock original = new FrameBlock(schema, names, data); + + // Write + FrameWriterParquet writer = new FrameWriterParquet(); + writer.writeFrameToHDFS(original, TEMP_FILE, original.getNumRows(), original.getNumColumns()); + + // Read back + FrameReaderParquet reader = new FrameReaderParquet(); + FrameBlock result = reader.readFrameFromHDFS(TEMP_FILE, schema, names, original.getNumRows(), original.getNumColumns()); + + TestUtils.compareFrames(original, result, false); + } +}