diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java index 83676da47a7..8b0f5fe06b9 100644 --- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java +++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java @@ -268,11 +268,16 @@ public static int getDeltaWriterBatchSize() { return getDMLConfig().getIntValue(DMLConfig.DELTA_WRITER_BATCH_SIZE); } - /** @return target data-file size (bytes) for the native Delta writer */ + /** @return upper bound (bytes) on the native Delta writer's target data-file size */ public static long getDeltaWriterTargetFileSize() { return Long.parseLong(getDMLConfig().getTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE)); } + /** @return whether the native Delta writer adaptively sizes data files for parallel reads */ + public static boolean isDeltaWriterAdaptiveFileSize() { + return getDMLConfig().getBooleanValue(DMLConfig.DELTA_WRITER_ADAPTIVE_FILE_SIZE); + } + public static boolean isFederatedSSL(){ return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION); } diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index e06b58b07c8..d114ccf69b9 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -73,7 +73,8 @@ public class DMLConfig public static final String IO_COMPRESSION_CODEC = "sysds.io.compression.encoding"; public static final String DELTA_READER_BATCH_SIZE = "sysds.io.delta.reader.batchsize"; // int: rows per parquet read batch public static final String DELTA_WRITER_BATCH_SIZE = "sysds.io.delta.writer.batchsize"; // int: matrix rows materialized per columnar batch handed to the engine - public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: target data-file size in bytes (smaller -> more files -> more parallel-read throughput) + public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: upper bound on target data-file size in bytes; adaptive sizing may pick smaller -> more files -> more parallel-read throughput + public static final String DELTA_WRITER_ADAPTIVE_FILE_SIZE = "sysds.io.delta.writer.adaptivefilesize"; // boolean: size data files toward one per parallel reader (capped by targetfilesize) public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply public static final String PARALLEL_ENCODE_STAGED = "sysds.parallel.encode.staged"; public static final String PARALLEL_ENCODE_APPLY_BLOCKS = "sysds.parallel.encode.applyBlocks"; @@ -163,7 +164,8 @@ public class DMLConfig _defaultVals.put(IO_COMPRESSION_CODEC, "none"); _defaultVals.put(DELTA_READER_BATCH_SIZE, "4096"); // rows per parquet read batch (Delta Kernel default 1024) _defaultVals.put(DELTA_WRITER_BATCH_SIZE, "4096"); // matrix rows materialized per columnar batch handed to the engine - _defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(64L * 1024 * 1024)); // 64MB target data-file size (Delta Kernel default 128MB) -> more files -> more parallel-read throughput + _defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(64L * 1024 * 1024)); // 64MB cap on target data-file size; adaptive sizing may pick smaller -> more files -> more parallel-read throughput + _defaultVals.put(DELTA_WRITER_ADAPTIVE_FILE_SIZE, "true"); // size data files toward one per parallel reader _defaultVals.put(PARALLEL_TOKENIZE, "false"); _defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64"); _defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false"); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index 7151d87211c..87d14dbf87e 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.common.Types.ValueType; @@ -203,13 +204,19 @@ protected FrameBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcept .createFrameReader(iimd.getFileFormat(), getFileFormatProperties()) .readFrameFromHDFS(fname, lschema, dc.getRows(), dc.getCols()); - if(iimd.getFileFormat() == FileFormat.CSV) + // sanity check correct output (before dereferencing data below) + if(data == null) + throw new IOException("Unable to load frame from file: " + fname); + + //Delta and CSV discover dimensions (and Delta also schema) at read time, so + //refresh the cached metadata to reflect the materialized frame block. + if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) { _metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(data.getDataCharacteristics(), iimd.getFileFormat()) : new MetaData(data.getDataCharacteristics()); + if(iimd.getFileFormat() == FileFormat.DELTA) + _schema = data.getSchema(); + } - // sanity check correct output - if(data == null) - throw new IOException("Unable to load frame from file: " + fname); return data; } @@ -293,6 +300,9 @@ protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatPro FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop); writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns()); + + if(DMLScript.STATISTICS) + CacheStatistics.incrementHDFSWrites(); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java index 5f2d08a122f..80a5d699dfa 100644 --- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java +++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java @@ -123,6 +123,87 @@ public static RaggedArray create(T[] col, int m) { return new RaggedArray<>(col, m); } + /** + * Wrap a fully populated raw typed column array into an {@link Array} of the given value type. The runtime type of + * {@code col} must match the primitive backing type of {@code vt} (e.g. {@code double[]} for {@link ValueType#FP64}, + * {@code String[]} for {@link ValueType#STRING}). + * + *

For {@link ValueType#BOOLEAN} this mirrors {@link #allocateBoolean(int)}: a {@code boolean[]} longer than + * {@link #bitSetSwitchPoint} is bit-packed into a compact {@link BitSetArray} (so a bulk decoder that fills a + * plain {@code boolean[]} still ends up with the same representation as every other frame allocation path), + * while shorter columns stay a plain {@link BooleanArray}.

+ * + * @param vt the value type of the column + * @param col the backing array to wrap + * @return an {@link Array} view over {@code col} (boolean columns may be bit-packed rather than wrapped in place) + */ + public static Array create(ValueType vt, Object col) { + switch(vt) { + case FP64: + return create((double[]) col); + case FP32: + return create((float[]) col); + case INT64: + return create((long[]) col); + case UINT4: + case UINT8: + case INT32: + return create((int[]) col); + case BOOLEAN: { + boolean[] b = (boolean[]) col; + return b.length > bitSetSwitchPoint ? new BitSetArray(b) : create(b); + } + case CHARACTER: + return create((char[]) col); + case HASH64: + return createHash64((long[]) col); + case HASH32: + return createHash32((int[]) col); + case UNKNOWN: + case STRING: + default: + return create((String[]) col); + } + } + + /** + * Allocate the raw backing array for a column of the given value type: the inverse of + * {@link #create(ValueType, Object)}. Returns {@code double[]} for {@link ValueType#FP64}, + * {@code int[]} for INT32/UINT/HASH32, {@code long[]} for INT64/HASH64, {@code String[]} for STRING, etc. The + * runtime array type matches what {@link #create(ValueType, Object)} expects, so a bulk decoder can fill this + * primitive array directly and then wrap it via {@code create(vt, backing)}. + * + * @param vt the value type of the column + * @param nRow the number of rows to allocate + * @return a freshly allocated raw backing array of the matching primitive/object type + */ + public static Object allocateBacking(ValueType vt, int nRow) { + switch(vt) { + case FP64: + return new double[nRow]; + case FP32: + return new float[nRow]; + case INT64: + case HASH64: + return new long[nRow]; + case UINT4: + case UINT8: + LOG.warn("Not supported allocation of UInt 4 or 8 array: defaulting to Int32"); + // fall through: UINT4/UINT8 are backed by int[] (wrapped as Int32) + case INT32: + case HASH32: + return new int[nRow]; + case BOOLEAN: + return new boolean[nRow]; + case CHARACTER: + return new char[nRow]; + case UNKNOWN: + case STRING: + default: + return new String[nRow]; + } + } + public static long getInMemorySize(ValueType type, int _numRows, boolean containsNull) { if(containsNull) { switch(type) { @@ -221,27 +302,8 @@ public static Array allocate(ValueType v, int nRow) { switch(v) { case BOOLEAN: return allocateBoolean(nRow); - case UINT4: - case UINT8: - LOG.warn("Not supported allocation of UInt 4 or 8 array: defaulting to Int32"); - case INT32: - return new IntegerArray(new int[nRow]); - case INT64: - return new LongArray(new long[nRow]); - case FP32: - return new FloatArray(new float[nRow]); - case FP64: - return new DoubleArray(new double[nRow]); - case CHARACTER: - return new CharArray(new char[nRow]); - case HASH64: - return new HashLongArray(new long[nRow]); - case HASH32: - return new HashIntegerArray(new int[nRow]); - case UNKNOWN: - case STRING: default: - return new StringArray(new String[nRow]); + return create(v, allocateBacking(v, nRow)); } } diff --git a/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java index 1e06f9acb56..bbca857a1cd 100644 --- a/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java +++ b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java @@ -26,9 +26,12 @@ import java.util.Optional; import java.util.function.Function; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.util.HDFSTool; @@ -77,6 +80,8 @@ */ public class DeltaKernelUtils { + private static final Log LOG = LogFactory.getLog(DeltaKernelUtils.class.getName()); + private static final String ENGINE_INFO = "Apache SystemDS"; /** Reused thread-safe JSON reader for the per-file Delta stats (numRecords). */ @@ -157,6 +162,17 @@ public static int countSelected(int size, boolean[] selected) { return n; } + /** Floor on the adaptive writer target file size. Below this the per-file metadata/open + * overhead (and tiny-file proliferation) outweighs the extra read parallelism. */ + public static final long ADAPTIVE_WRITER_MIN_FILE_SIZE = 4L * 1024 * 1024; + + private static Configuration buildConf(Configuration base, int batchSize, long targetFileSize) { + Configuration c = new Configuration(base); + c.setInt(CONF_READER_BATCH_SIZE, batchSize); + c.setLong(CONF_WRITER_TARGET_FILE_SIZE, targetFileSize); + return c; + } + private static synchronized Configuration deltaConf() { Configuration base = ConfigurationManager.getCachedJobConf(); int batchSize = ConfigurationManager.getDeltaReaderBatchSize(); @@ -164,10 +180,7 @@ private static synchronized Configuration deltaConf() { if(cachedConf == null || cachedConfBase != base || cachedBatchSize != batchSize || cachedTargetFileSize != targetFileSize) { - Configuration c = new Configuration(base); - c.setInt(CONF_READER_BATCH_SIZE, batchSize); - c.setLong(CONF_WRITER_TARGET_FILE_SIZE, targetFileSize); - cachedConf = c; + cachedConf = buildConf(base, batchSize, targetFileSize); cachedConfBase = base; cachedBatchSize = batchSize; cachedTargetFileSize = targetFileSize; @@ -179,6 +192,47 @@ public static Engine createEngine() { return DefaultEngine.create(deltaConf()); } + /** + * Compute the parquet target data-file size (bytes) for writing a table of the given + * estimated size. With adaptive sizing enabled the writer aims for roughly one data + * file per expected parallel reader (so the native per-file parallel read can use all + * threads): never above the configured target, and never below + * {@code ADAPTIVE_WRITER_MIN_FILE_SIZE} unless the configured target is itself smaller + * than that floor (in which case the configured target wins). + * + * @param estimatedBytes estimate of the table's size (the block in-memory size is a fine proxy) + * @return the target max parquet data-file size in bytes + */ + public static long adaptiveWriterTargetFileSize(long estimatedBytes) { + long configured = ConfigurationManager.getDeltaWriterTargetFileSize(); + if(!ConfigurationManager.isDeltaWriterAdaptiveFileSize() || estimatedBytes <= 0) + return configured; + int par = Math.max(1, OptimizerUtils.getParallelBinaryReadParallelism()); + long perReader = Math.max(1, estimatedBytes / par); + //never above the configured cap, never below the floor (unless the cap itself is lower) + long target = Math.min(configured, Math.max(ADAPTIVE_WRITER_MIN_FILE_SIZE, perReader)); + if(LOG.isDebugEnabled()) + LOG.debug("Delta adaptive file size: est=" + estimatedBytes + "B par=" + par + " -> target=" + target + + "B (cap=" + configured + "B, floor=" + ADAPTIVE_WRITER_MIN_FILE_SIZE + "B)"); + return target; + } + + /** + * Create an engine for writing a table of the given estimated size, configured with an + * adaptive target data-file size (see {@link #adaptiveWriterTargetFileSize(long)}). A fresh + * (uncached) configuration is built since writes happen once per table, not per data file. + * + * @param estimatedBytes estimate of the table's size (the block in-memory size is a fine proxy) + * @return a Delta Kernel engine for the write + */ + public static Engine createWriteEngine(long estimatedBytes) { + //the reader batch size is irrelevant on the write path but is set to keep the + //conf shape identical to deltaConf(); only the target file size matters here. + Configuration c = buildConf(ConfigurationManager.getCachedJobConf(), + ConfigurationManager.getDeltaReaderBatchSize(), adaptiveWriterTargetFileSize(estimatedBytes)); + return DefaultEngine.create(c); + } + /** * Resolve a (possibly relative) path to a fully-qualified URI so the * kernel's default engine can locate the table on the right filesystem. diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java new file mode 100644 index 00000000000..9e8823f7ecf --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.DataType; + +/** + * Single-threaded native Delta Lake reader for frames, built on the Spark-free Delta Kernel library. It opens the + * latest snapshot of a Delta table, reads its parquet data files through the kernel's default engine (honoring deletion + * vectors), and materializes the columns into a {@link FrameBlock} whose schema and column names are derived from the + * Delta table schema. + * + *

+ * Data is extracted column-at-a-time into primitive arrays (no per-cell boxing or {@code FrameBlock.set} dispatch) and + * the frame is constructed directly from typed column {@link Array}s. Supported column types map to SystemDS value + * types: double, float, long, int, short, byte, boolean, and string. Neither the schema nor the dimensions need to be + * supplied; they are discovered from the table. + *

+ */ +public class FrameReaderDelta extends FrameReader { + + // per-column read codes (how to pull a value out of the Delta column vector); + // aliases of the shared codes in DeltaKernelUtils so the frame read dispatch stays + // in lockstep with the matrix reader's type mapping. Package visible so the parallel + // reader can reuse the same dispatch. + static final int R_DOUBLE = DeltaKernelUtils.T_DOUBLE, R_FLOAT = DeltaKernelUtils.T_FLOAT, + R_LONG = DeltaKernelUtils.T_LONG, R_INT = DeltaKernelUtils.T_INT, R_SHORT = DeltaKernelUtils.T_SHORT, + R_BYTE = DeltaKernelUtils.T_BYTE, R_BOOLEAN = DeltaKernelUtils.T_BOOLEAN, R_STRING = DeltaKernelUtils.T_STRING; + + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath); + return readWithHandle(fname, engine, handle); + } + + /** + * Materialize the frame from an already-opened engine and scan handle. Factored out so the parallel reader can + * reuse a handle it already opened for its single-file/single-thread fallback instead of re-opening the (expensive) + * Delta snapshot a second time. + * + * @param fname the table path (for error messages) + * @param engine the Delta Kernel engine + * @param handle the opened scan handle + * @return the materialized frame block + */ + protected FrameBlock readWithHandle(String fname, Engine engine, DeltaKernelUtils.ScanHandle handle) + throws IOException { + final ReadPlan plan = planColumns(handle); + + // fast path: exact per-file row counts are known from metadata (no deletion + // vectors) -> pre-size one typed array per column and decode each file + // straight into its row offset, avoiding the per-batch extract + concatenate. + if(useDirectPath(handle)) { + long total = 0; + for(long r : handle.numRecords) + total += r; + // empty table: the typed column arrays cannot be zero-length, so return a + // schema-only frame with the discovered schema/names and zero rows. + if(total == 0) + return new FrameBlock(plan.vt, plan.cnames, 0); + if(total <= Integer.MAX_VALUE) + return readDirect(fname, engine, handle, plan, (int) total); + } + + // fallback: row counts unknown or deletion vectors present -> decode into + // per-batch arrays and concatenate per column in file order. + return readBuffered(engine, handle, plan); + } + + /** + * Immutable per-column read plan derived once from the Delta table schema: how to pull each column out of the + * kernel column vector ({@code readCodes}), the resulting SystemDS value types, and the column names. Shared by the + * serial and parallel readers so the schema-to-column mapping lives in exactly one place. + */ + protected static final class ReadPlan { + final int ncol; + final int[] readCodes; + final ValueType[] vt; + final String[] cnames; + + private ReadPlan(int ncol, int[] readCodes, ValueType[] vt, String[] cnames) { + this.ncol = ncol; + this.readCodes = readCodes; + this.vt = vt; + this.cnames = cnames; + } + } + + /** Derive the {@link ReadPlan} (read codes, value types, names) from the opened scan handle's schema. */ + protected static ReadPlan planColumns(DeltaKernelUtils.ScanHandle handle) { + final int ncol = handle.schema.length(); + final int[] readCodes = new int[ncol]; + final ValueType[] vt = new ValueType[ncol]; + final String[] cnames = new String[ncol]; + for(int c = 0; c < ncol; c++) { + DataType dt = handle.schema.at(c).getDataType(); + readCodes[c] = readCode(dt, handle.schema.at(c).getName()); + vt[c] = valueType(readCodes[c]); + cnames[c] = handle.schema.at(c).getName(); + } + return new ReadPlan(ncol, readCodes, vt, cnames); + } + + /** + * Whether the metadata-driven direct read fast path can be used for this table (exact per-file row counts and no + * deletion vectors, so the output can be pre-sized and each file decoded straight into its row offset). Visible for + * testing: the buffered fallback is otherwise only reachable for tables lacking row statistics or carrying deletion + * vectors, which the SystemDS Delta writer never produces. + * + * @param handle the opened scan handle + * @return true if the direct path is applicable + */ + protected boolean useDirectPath(DeltaKernelUtils.ScanHandle handle) { + return handle.hasExactRowCounts(); + } + + /** + * Fast path: decode each data file straight into pre-sized typed column arrays at a metadata-derived row offset. + * One allocation per column, single pass, no intermediate per-batch buffers or serial concatenation. + */ + private FrameBlock readDirect(String fname, Engine engine, DeltaKernelUtils.ScanHandle handle, ReadPlan plan, + int nrow) throws IOException { + final int ncol = plan.ncol; + final int[] readCodes = plan.readCodes; + final Object[] dest = new Object[ncol]; + for(int c = 0; c < ncol; c++) + dest[c] = ArrayFactory.allocateBacking(plan.vt[c], nrow); + + int base = 0; + for(int i = 0; i < handle.scanFiles.size(); i++) { + // exclusive upper row bound for this file's slice; a file decoding more + // rows than its numRecords statistic would otherwise overflow into the + // next file's region or off the array + final int limit = base + (int) handle.numRecords[i]; + final int[] cur = new int[] {base}; + DeltaKernelUtils.readScanFile(engine, handle.scanState, handle.physicalReadSchema, handle.scanFiles.get(i), + (cols, size, selected) -> { + int n = DeltaKernelUtils.countSelected(size, selected); + if(cur[0] + n > limit) + throw new DMLRuntimeException("Delta file produced more rows than its " + + "numRecords statistic; refusing direct read of " + fname); + for(int c = 0; c < ncol; c++) + extractColumnInto(cols[c], size, selected, readCodes[c], dest[c], cur[0]); + cur[0] += n; + }); + // also fail loud on underflow: a file decoding fewer rows than its + // numRecords statistic would leave the tail of the slice at the array + // default (0/null) while nrow still reports the (inflated) statistic. + if(cur[0] != limit) + throw new DMLRuntimeException("Delta file produced " + (cur[0] - base) + " rows, expected " + + (limit - base) + " from its numRecords statistic; refusing direct read of " + fname); + base = limit; + } + + Array[] columns = new Array[ncol]; + for(int c = 0; c < ncol; c++) + columns[c] = ArrayFactory.create(plan.vt[c], dest[c]); + FrameBlock ret = new FrameBlock(columns); + ret.setColumnNames(plan.cnames); + return ret; + } + + /** + * Fallback path: decode each batch into per-batch typed arrays and concatenate them per column in file order. Used + * when exact per-file row counts are not available (missing statistics or deletion vectors present), so the output + * cannot be pre-sized up front. + */ + private FrameBlock readBuffered(Engine engine, DeltaKernelUtils.ScanHandle handle, ReadPlan plan) + throws IOException { + final int ncol = plan.ncol; + final int[] readCodes = plan.readCodes; + final ArrayList batchCols = new ArrayList<>(); + final ArrayList batchSizes = new ArrayList<>(); + final int[] nrowHolder = new int[1]; + for(Row scanFileRow : handle.scanFiles) { + DeltaKernelUtils.readScanFile(engine, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + int n = DeltaKernelUtils.countSelected(size, selected); + Object[] extracted = new Object[ncol]; + for(int c = 0; c < ncol; c++) { + // decode into a fresh per-batch array via the shared alloc + + // decode primitives (the same ones the direct path uses) + Object col = ArrayFactory.allocateBacking(plan.vt[c], n); + extractColumnInto(cols[c], size, selected, readCodes[c], col, 0); + extracted[c] = col; + } + batchCols.add(extracted); + batchSizes.add(n); + nrowHolder[0] += n; + }); + } + + int nrow = nrowHolder[0]; + // empty table: return a schema-only frame with the discovered schema/names. + if(nrow == 0) + return new FrameBlock(plan.vt, plan.cnames, 0); + Array[] columns = new Array[ncol]; + for(int c = 0; c < ncol; c++) + columns[c] = concatColumn(plan.vt[c], nrow, batchCols, batchSizes, c); + FrameBlock ret = new FrameBlock(columns); + ret.setColumnNames(plan.cnames); + return ret; + } + + /** + * Concatenate the per-batch typed arrays of one column (in file/batch order) into a single pre-sized array and wrap + * it as a frame {@link Array}. The copy is type-agnostic ({@link System#arraycopy} works on the boxed primitive or + * object arrays), so there is no per-type dispatch here: allocation and wrapping reuse + * {@link ArrayFactory#allocateBacking(ValueType, int)} and {@link ArrayFactory#create(ValueType, Object)}, the same + * primitives the single-pass direct path uses. + * + *

+ * Only the buffered fallback needs this concatenation; the default direct path decodes straight into one pre-sized + * array per column with no intermediate per-batch arrays. + *

+ */ + static Array concatColumn(ValueType vt, int nrow, ArrayList batchCols, ArrayList batchSizes, + int c) { + Object full = ArrayFactory.allocateBacking(vt, nrow); + int off = 0; + for(int b = 0; b < batchCols.size(); b++) { + int n = batchSizes.get(b); + System.arraycopy(batchCols.get(b)[c], 0, full, off, n); + off += n; + } + return ArrayFactory.create(vt, full); + } + + static int readCode(DataType dt, String name) { + // reuse the shared Delta type -> code mapping; frames additionally reject the + // types the matrix reader also cannot map (typeCode returns -1) + int code = DeltaKernelUtils.typeCode(dt); + if(code < 0) + throw new DMLRuntimeException( + "Unsupported non-mappable Delta column '" + name + "' of type " + dt + " for frame read."); + return code; + } + + static ValueType valueType(int readCode) { + switch(readCode) { + case R_DOUBLE: + return ValueType.FP64; + case R_FLOAT: + return ValueType.FP32; + case R_LONG: + return ValueType.INT64; + case R_INT: + case R_SHORT: + case R_BYTE: + return ValueType.INT32; + case R_BOOLEAN: + return ValueType.BOOLEAN; + default: + return ValueType.STRING; + } + } + + /** + * Decode the live (selected, after deletion vector) rows of one column batch directly into a pre-sized typed array + * starting at absolute row {@code destOff}. Null numeric cells keep the array default (0); string nulls are stored + * as null. + */ + static void extractColumnInto(ColumnVector col, int size, boolean[] selected, int readCode, Object dest, + int destOff) { + switch(readCode) { + case R_DOUBLE: { + double[] a = (double[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + if(!col.isNullAt(r)) + a[lr] = col.getDouble(r); + lr++; + } + break; + } + case R_FLOAT: { + float[] a = (float[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + if(!col.isNullAt(r)) + a[lr] = col.getFloat(r); + lr++; + } + break; + } + case R_LONG: { + long[] a = (long[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + if(!col.isNullAt(r)) + a[lr] = col.getLong(r); + lr++; + } + break; + } + case R_INT: { + int[] a = (int[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + if(!col.isNullAt(r)) + a[lr] = col.getInt(r); + lr++; + } + break; + } + case R_SHORT: { + int[] a = (int[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + if(!col.isNullAt(r)) + a[lr] = col.getShort(r); + lr++; + } + break; + } + case R_BYTE: { + int[] a = (int[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + if(!col.isNullAt(r)) + a[lr] = col.getByte(r); + lr++; + } + break; + } + case R_BOOLEAN: { + boolean[] a = (boolean[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + if(!col.isNullAt(r)) + a[lr] = col.getBoolean(r); + lr++; + } + break; + } + default: { // R_STRING + String[] a = (String[]) dest; + int lr = destOff; + for(int r = 0; r < size; r++) { + if(selected != null && !selected[r]) + continue; + a[lr] = col.isNullAt(r) ? null : col.getString(r); + lr++; + } + break; + } + } + } + + @Override + public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException { + throw new UnsupportedOperationException( + "Reading a Delta table from an input stream is not supported; Delta is a directory-based table format."); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderDeltaParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDeltaParallel.java new file mode 100644 index 00000000000..106264afe6c --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDeltaParallel.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; +import org.apache.sysds.runtime.util.CommonThreadPool; + +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; + +/** + * Parallel native Delta Lake frame reader. Delta tables are stored as one or more parquet data files; this reader + * decodes those files concurrently (one task per data file) and assembles them into a column-major {@link FrameBlock} + * in the original file order. + * + *

+ * It mirrors {@link ReaderDeltaParallel} (the matrix variant) but produces typed column {@link Array}s instead of a + * dense {@code double[]}. As with the matrix reader, the expensive part of a Delta read is the per-file parquet decode, + * so parallelizing across data files is the natural speedup. A table backed by a single data file cannot be split this + * way, so the reader transparently falls back to the sequential {@link FrameReaderDelta}. + *

+ */ +public class FrameReaderDeltaParallel extends FrameReaderDelta { + + private final int _numThreads; + + public FrameReaderDeltaParallel() { + _numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + } + + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath); + + final int nfiles = handle.scanFiles.size(); + // nothing to gain from parallelism for single-file (or empty) tables: reuse + // the already-opened engine + scan handle instead of re-opening the snapshot. + if(_numThreads <= 1 || nfiles <= 1) + return readWithHandle(fname, engine, handle); + + // derive per-column read codes, value types and names once from the schema + final ReadPlan plan = planColumns(handle); + + // fast path: exact per-file row counts are known from metadata -> pre-size + // one typed array per column and let each thread decode directly into its + // row offset (no intermediate buffers, no serial concatenation). + if(useDirectPath(handle)) { + long total = 0; + for(long r : handle.numRecords) + total += r; + if(total > 0 && total <= Integer.MAX_VALUE) + return readDirect(fname, handle, plan, (int) total); + } + + return readBuffered(fname, handle, plan); + } + + /** + * Fast path: each thread decodes one data file straight into the final typed column arrays at a metadata-derived + * row offset. Single allocation per column, fully parallel. + */ + private FrameBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle, ReadPlan plan, int nrow) + throws IOException { + final int ncol = plan.ncol; + final int[] readCodes = plan.readCodes; + final int nfiles = handle.scanFiles.size(); + final int[] rowOffset = new int[nfiles]; + int acc = 0; + for(int i = 0; i < nfiles; i++) { + rowOffset[i] = acc; + acc += (int) handle.numRecords[i]; + } + + // pre-size one typed array per column for the whole table + final Object[] dest = new Object[ncol]; + for(int c = 0; c < ncol; c++) + dest[c] = ArrayFactory.allocateBacking(plan.vt[c], nrow); + + ArrayList> tasks = new ArrayList<>(nfiles); + for(int i = 0; i < nfiles; i++) { + final Row scanFileRow = handle.scanFiles.get(i); + final int base = rowOffset[i]; + // exclusive upper row bound for this file's slice; a file decoding more + // rows than its numRecords statistic would otherwise overflow into the + // next file's region (concurrent overlapping writes) or off the array + final int limit = base + (int) handle.numRecords[i]; + tasks.add(() -> { + int[] cur = new int[] {base}; + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + int n = DeltaKernelUtils.countSelected(size, selected); + if(cur[0] + n > limit) + throw new DMLRuntimeException("Delta file produced more rows than its " + + "numRecords statistic; refusing parallel direct read of " + fname); + for(int c = 0; c < ncol; c++) + extractColumnInto(cols[c], size, selected, readCodes[c], dest[c], cur[0]); + cur[0] += n; + }); + // fail loud on underflow too: fewer decoded rows than the statistic + // would leave this slice's tail at the array default (0/null). + if(cur[0] != limit) + throw new DMLRuntimeException("Delta file produced " + (cur[0] - base) + " rows, expected " + + (limit - base) + " from its numRecords statistic; refusing parallel direct read of " + fname); + return null; + }); + } + awaitFileTasks(tasks, fname); + + Array[] columns = new Array[ncol]; + for(int c = 0; c < ncol; c++) + columns[c] = ArrayFactory.create(plan.vt[c], dest[c]); + + FrameBlock ret = new FrameBlock(columns); + ret.setColumnNames(plan.cnames); + return ret; + } + + /** + * Fallback path: decode each file in parallel into per-file per-column batch arrays (used when row counts are + * unknown or deletion vectors are present), then concatenate per column in file order via the shared + * {@link FrameReaderDelta#concatColumn} helper. + */ + private FrameBlock readBuffered(String fname, DeltaKernelUtils.ScanHandle handle, ReadPlan plan) + throws IOException { + final int ncol = plan.ncol; + final int[] readCodes = plan.readCodes; + final int nfiles = handle.scanFiles.size(); + @SuppressWarnings("unchecked") + final ArrayList[] fileCols = new ArrayList[nfiles]; + @SuppressWarnings("unchecked") + final ArrayList[] fileSizes = new ArrayList[nfiles]; + ArrayList> tasks = new ArrayList<>(nfiles); + for(int i = 0; i < nfiles; i++) { + final int fi = i; + final Row scanFileRow = handle.scanFiles.get(i); + tasks.add(() -> { + ArrayList fileBatchCols = new ArrayList<>(); + ArrayList fileBatchSizes = new ArrayList<>(); + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + int n = DeltaKernelUtils.countSelected(size, selected); + Object[] extracted = new Object[ncol]; + for(int c = 0; c < ncol; c++) { + // decode into a fresh per-batch array via the shared alloc + + // decode primitives (the same ones the direct path uses) + Object col = ArrayFactory.allocateBacking(plan.vt[c], n); + extractColumnInto(cols[c], size, selected, readCodes[c], col, 0); + extracted[c] = col; + } + fileBatchCols.add(extracted); + fileBatchSizes.add(n); + }); + fileCols[fi] = fileBatchCols; + fileSizes[fi] = fileBatchSizes; + return null; + }); + } + awaitFileTasks(tasks, fname); + + // flatten the per-file batches in file order and concatenate per column + ArrayList batchCols = new ArrayList<>(); + ArrayList batchSizes = new ArrayList<>(); + int nrow = 0; + for(int i = 0; i < nfiles; i++) { + batchCols.addAll(fileCols[i]); + batchSizes.addAll(fileSizes[i]); + for(int n : fileSizes[i]) + nrow += n; + } + + Array[] columns = new Array[ncol]; + for(int c = 0; c < ncol; c++) + columns[c] = concatColumn(plan.vt[c], nrow, batchCols, batchSizes, c); + + FrameBlock ret = new FrameBlock(columns); + ret.setColumnNames(plan.cnames); + return ret; + } + + /** + * Run one decode task per data file on the shared common thread pool and await completion. Full parallelism is + * requested (the task count, one per data file, naturally caps concurrency); this avoids the per-thread pool-size + * caching in {@code CommonThreadPool.get(k)} that could otherwise throttle this reader to a smaller pool created + * earlier on the same thread. + */ + private void awaitFileTasks(List> tasks, String fname) throws IOException { + ExecutorService pool = CommonThreadPool.get(_numThreads); + try { + for(Future f : pool.invokeAll(tasks)) + f.get(); + } + catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during parallel read of Delta table: " + fname, ex); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + } + +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java index 4e21d2c3f60..5efbf80b83e 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderFactory.java @@ -51,6 +51,8 @@ public static FrameReader createFrameReader(FileFormat fmt, FileFormatProperties case PROTO: // TODO performance improvement: add parallel reader return new FrameReaderProto(); + case DELTA: + return textParallel ? new FrameReaderDeltaParallel() : new FrameReaderDelta(); default: throw new DMLRuntimeException("Failed to create frame reader for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterDelta.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterDelta.java new file mode 100644 index 00000000000..fe66a7d195f --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterDelta.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.Array; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Single-threaded native Delta Lake writer for frames, built on the Spark-free Delta Kernel library. It creates (or + * recreates) a Delta table whose schema mirrors the frame schema (per-column {@link ValueType} mapped to a Delta type + * and the frame column names), streams the {@link FrameBlock} rows as columnar batches into parquet data files, and + * commits the add-file actions. + */ +public class FrameWriterDelta extends FrameWriter { + + @Override + public void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long clen) + throws IOException, DMLRuntimeException { + if(src.getNumRows() != rlen || src.getNumColumns() != clen) + throw new IOException("Frame dimensions mismatch with metadata: (" + src.getNumRows() + "x" + + src.getNumColumns() + ") vs (" + rlen + "x" + clen + ")."); + int ncol = (int) clen; + int nrow = (int) rlen; + StructType schema = buildSchema(src.getSchema(), src.getColumnNames(), ncol); + + // snapshot the typed column arrays + per-column nullability once, so the + // hot per-cell path can read primitives directly (no boxing) and skip + // null-checks on non-nullable columns. + Array[] cols = new Array[ncol]; + boolean[] nullable = new boolean[ncol]; + for(int c = 0; c < ncol; c++) { + cols[c] = src.getColumn(c); + nullable[c] = cols[c].containsNull(); + } + + int batchRows = ConfigurationManager.getDeltaWriterBatchSize(); + // size data files adaptively (toward one file per parallel reader) for faster parallel reads + Engine engine = DeltaKernelUtils.createWriteEngine(src.getInMemorySize()); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(fname), schema, + new FrameBatchIterator(cols, nullable, schema, nrow, ncol, batchRows)); + } + + private static StructType buildSchema(ValueType[] vtSchema, String[] names, int ncol) { + StructType schema = new StructType(); + for(int c = 0; c < ncol; c++) + schema = schema.add(names[c], toDeltaType(vtSchema[c]), true); + return schema; + } + + static DataType toDeltaType(ValueType vt) { + switch(vt) { + case FP64: + return DoubleType.DOUBLE; + case FP32: + return FloatType.FLOAT; + case INT64: + return LongType.LONG; + case INT32: + case UINT8: + case UINT4: + return IntegerType.INTEGER; + case BOOLEAN: + return BooleanType.BOOLEAN; + default: + return StringType.STRING; // STRING/CHARACTER/HASH*/UNKNOWN + } + } + + /** Chunks the frame columns into fixed-size columnar batches for the kernel write path. */ + private static class FrameBatchIterator implements CloseableIterator { + private final Array[] _cols; + private final boolean[] _nullable; + private final StructType _schema; + private final int _nrow; + private final int _ncol; + private final int _batchRows; + private int _pos = 0; + + FrameBatchIterator(Array[] cols, boolean[] nullable, StructType schema, int nrow, int ncol, int batchRows) { + _cols = cols; + _nullable = nullable; + _schema = schema; + _nrow = nrow; + _ncol = ncol; + _batchRows = batchRows; + } + + @Override + public boolean hasNext() { + return _pos < _nrow; + } + + @Override + public FilteredColumnarBatch next() { + if(!hasNext()) + throw new NoSuchElementException(); + int size = Math.min(_batchRows, _nrow - _pos); + ColumnarBatch batch = new FrameColumnarBatch(_cols, _nullable, _schema, _pos, size, _ncol); + _pos += size; + return new FilteredColumnarBatch(batch, Optional.empty()); + } + + @Override + public void close() { + // nothing to release + } + } + + /** Read-only view of a row range of the frame columns as a Delta Kernel columnar batch. */ + private static class FrameColumnarBatch implements ColumnarBatch { + private final Array[] _cols; + private final boolean[] _nullable; + private final StructType _schema; + private final int _rowStart; + private final int _size; + private final int _ncol; + + FrameColumnarBatch(Array[] cols, boolean[] nullable, StructType schema, int rowStart, int size, int ncol) { + _cols = cols; + _nullable = nullable; + _schema = schema; + _rowStart = rowStart; + _size = size; + _ncol = ncol; + } + + @Override + public StructType getSchema() { + return _schema; + } + + @Override + public ColumnVector getColumnVector(int ordinal) { + if(ordinal < 0 || ordinal >= _ncol) + throw new IndexOutOfBoundsException("column ordinal " + ordinal); + return new FrameColumnVector(_cols[ordinal], _nullable[ordinal], _schema.at(ordinal).getDataType(), + _rowStart, _size); + } + + @Override + public int getSize() { + return _size; + } + } + + /** + * Read-only typed column view over one column {@link Array} row range. Numeric values are read through + * {@link Array#getAsDouble(int)} to avoid boxing, and non-nullable columns short-circuit {@code isNullAt} so the + * kernel never pays for a redundant boxed fetch. + */ + private static class FrameColumnVector implements ColumnVector { + private final Array _col; + private final boolean _nullable; + private final DataType _type; + private final int _rowStart; + private final int _size; + + FrameColumnVector(Array col, boolean nullable, DataType type, int rowStart, int size) { + _col = col; + _nullable = nullable; + _type = type; + _rowStart = rowStart; + _size = size; + } + + @Override + public DataType getDataType() { + return _type; + } + + @Override + public int getSize() { + return _size; + } + + @Override + public boolean isNullAt(int rowId) { + return _nullable && _col.get(_rowStart + rowId) == null; + } + + @Override + public String getString(int rowId) { + Object v = _col.get(_rowStart + rowId); + return (v == null) ? null : v.toString(); + } + + @Override + public boolean getBoolean(int rowId) { + return _col.getAsDouble(_rowStart + rowId) != 0; + } + + @Override + public double getDouble(int rowId) { + return _col.getAsDouble(_rowStart + rowId); + } + + @Override + public float getFloat(int rowId) { + return (float) _col.getAsDouble(_rowStart + rowId); + } + + @Override + public long getLong(int rowId) { + // exact for INT64 (getAsDouble would lose precision beyond 2^53). This boxes one + // Number per cell because Array exposes no primitive getAsLong; a boxing-free + // getAsLong on Array would remove this write-path allocation (follow-up). The + // kernel only calls this after isNullAt() is false, so the cell is never null here. + return ((Number) _col.get(_rowStart + rowId)).longValue(); + } + + @Override + public int getInt(int rowId) { + return (int) _col.getAsDouble(_rowStart + rowId); + } + + @Override + public void close() { + // nothing to release + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java index 3fb3968c96f..ff38eb395dd 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java @@ -50,6 +50,8 @@ public static FrameWriter createFrameWriter(FileFormat fmt, FileFormatProperties return binaryParallel ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock(); case PROTO: return new FrameWriterProto(); + case DELTA: + return new FrameWriterDelta(); default: throw new DMLRuntimeException("Failed to create frame writer for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java b/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java index 0f08bf5517d..55ea8a54297 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java @@ -62,7 +62,11 @@ public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long cle //from the backing double[] (avoids per-cell MatrixBlock.get dispatch). double[] dense = (!src.isInSparseFormat() && src.getDenseBlock() != null && src.getDenseBlock().isContiguous()) ? src.getDenseBlockValues() : null; - Engine engine = DeltaKernelUtils.createEngine(); + //size data files adaptively (toward one file per parallel reader) for faster parallel reads. + //Delta writes every cell as a double, so size by the dense footprint rather than the (possibly + //sparse) in-memory size, which would understate the on-disk table for sparse inputs. + long estimatedBytes = (long) nrow * ncol * 8L; + Engine engine = DeltaKernelUtils.createWriteEngine(estimatedBytes); DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(fname), buildSchema(ncol), new MatrixBatchIterator(src, dense, nrow, ncol, batchRows)); } diff --git a/src/test/java/org/apache/sysds/performance/Main.java b/src/test/java/org/apache/sysds/performance/Main.java index f8d0bbea852..0622e789baa 100644 --- a/src/test/java/org/apache/sysds/performance/Main.java +++ b/src/test/java/org/apache/sysds/performance/Main.java @@ -24,6 +24,7 @@ import org.apache.sysds.performance.compression.Serialize; import org.apache.sysds.performance.compression.StreamCompress; import org.apache.sysds.performance.compression.TransformPerf; +import org.apache.sysds.performance.frame.DeltaFrameRead; import org.apache.sysds.performance.frame.Transform; import org.apache.sysds.performance.generators.ConstMatrix; import org.apache.sysds.performance.generators.FrameFile; @@ -113,6 +114,9 @@ private static void exec(int prog, String[] args) throws Exception { case 17: run17(args); break; + case 18: + run18(args); + break; case 1000: run1000(args); break; @@ -238,6 +242,21 @@ private static void run17(String[] args) throws Exception { new MatrixReplacePerf(100, g, k).run(); } + /** + * Repeatedly read the same on-disk Delta frame table (written once as setup). + * Args: {@code 18 [mode] [targetFileSizeMB]} + * where mode is one of serial|parallel|both (default parallel) and an omitted + * target file size uses the adaptive default sizing. + */ + private static void run18(String[] args) throws Exception { + int rows = Integer.parseInt(args[1]); + int k = Integer.parseInt(args[2]); + int n = Integer.parseInt(args[3]); + String mode = (args.length > 4) ? args[4] : "parallel"; + long targetFileSize = (args.length > 5) ? Long.parseLong(args[5]) * 1024 * 1024 : -1; + new DeltaFrameRead(n, DeltaFrameRead.mixedFrame(rows, 7), k, mode, targetFileSize).run(); + } + private static void run1000(String[] args) { MMSparsityPerformance perf; if (args.length < 3) { diff --git a/src/test/java/org/apache/sysds/performance/frame/DeltaFrameRead.java b/src/test/java/org/apache/sysds/performance/frame/DeltaFrameRead.java new file mode 100644 index 00000000000..ea76fef51b1 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/frame/DeltaFrameRead.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.frame; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.performance.compression.APerfTest; +import org.apache.sysds.performance.generators.ConstFrame; +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReaderDelta; +import org.apache.sysds.runtime.io.FrameReaderDeltaParallel; +import org.apache.sysds.runtime.io.FrameWriterDelta; +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.test.component.io.DeltaFrameTestUtils; + +/** + * Reads the SAME native Delta frame table from disk repeatedly and reports read throughput. The table is written to a + * temporary directory ONCE as (untimed) setup; every timed repetition re-opens the latest snapshot and materializes a + * fresh {@link FrameBlock}, so the numbers reflect the read path only (parquet decode + column materialization), not + * the write. + * + *

+ * This is the target for an async-profiler run: launch the perf jar under the profiler agent and this loop provides a + * long, steady-state read workload to sample. See {@code src/test/java/org/apache/sysds/performance/README.md} for how + * to run this under async-profiler. + *

+ * + *

+ * Dispatched from {@link org.apache.sysds.performance.Main} (program id 18). + *

+ */ +public class DeltaFrameRead extends APerfTest { + + // the Delta reader derives schema/names from the table metadata, so the values + // passed here are placeholders (a single detect column) and are ignored. + private static final ValueType[] DETECT_SCHEMA = new ValueType[] {ValueType.STRING}; + private static final String[] DETECT_NAMES = new String[] {"x"}; + + private final int k; + private final String mode; + private final long targetFileSize; // <=0 -> adaptive default sizing + + private String tablePath; + private Path tableDir; + private long inMemSize; + private long files; + + public DeltaFrameRead(int N, IGenerate gen, int k, String mode, long targetFileSize) { + super(N, gen); + this.k = k; + this.mode = mode; + this.targetFileSize = targetFileSize; + } + + public void run() throws Exception { + try { + setup(); + System.out.println(this); + System.out.printf("table: %s%n", tablePath); + System.out.printf("layout: files=%d, in-memory=%.1f MB, target=%s%n", files, inMemSize / 1048576.0, + targetFileSize > 0 ? (targetFileSize / 1048576) + "MB(fixed)" : "adaptive"); + + if(mode.equals("serial") || mode.equals("both")) + execute(() -> readSerial(), "Delta read serial"); + if(mode.equals("parallel") || mode.equals("both")) + execute(() -> readParallel(), "Delta read parallel(k=" + k + ")"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + if(tableDir != null) + FileUtils.deleteQuietly(tableDir.toFile()); + } + } + + /** Untimed: materialize the source frame and write it to a temp Delta table once. */ + private void setup() throws Exception { + FrameBlock fb = gen.take(); + inMemSize = fb.getInMemorySize(); + + DMLConfig c = new DMLConfig(); + if(targetFileSize > 0) { + c.setTextValue(DMLConfig.DELTA_WRITER_ADAPTIVE_FILE_SIZE, "false"); + c.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(targetFileSize)); + } + ConfigurationManager.setLocalConfig(c); + + tableDir = Files.createTempDirectory("sysds_delta_frame_read_"); + tablePath = new File(tableDir.toFile(), "table").getAbsolutePath(); + new FrameWriterDelta().writeFrameToHDFS(fb, tablePath, fb.getNumRows(), fb.getNumColumns()); + files = DeltaFrameTestUtils.countParquet(tablePath); + } + + private void readSerial() { + try { + FrameBlock fb = new FrameReaderDelta().readFrameFromHDFS(tablePath, DETECT_SCHEMA, DETECT_NAMES, -1, -1); + ret.add(fb.getInMemorySize()); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + private void readParallel() { + try { + FrameBlock fb = new FrameReaderDeltaParallel().readFrameFromHDFS(tablePath, DETECT_SCHEMA, DETECT_NAMES, -1, + -1); + ret.add(fb.getInMemorySize()); + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected String makeResString() { + throw new UnsupportedOperationException("Use makeResString(double[]) with the timed measurements instead."); + } + + @Override + protected String makeResString(double[] times) { + double meanMs = trimmedMean(times); + double mbPerSec = (inMemSize / 1048576.0) / (meanMs / 1000.0); + return String.format("%8.1f MB/s", mbPerSec); + } + + /** 5%-trimmed mean, matching the trimming used by the framework statistics. */ + private static double trimmedMean(double[] times) { + double[] v = times.clone(); + java.util.Arrays.sort(v); + int remove = (int) Math.floor(v.length * 0.05); + double total = 0; + int el = v.length - remove * 2; + for(int i = remove; i < v.length - remove; i++) + total += v[i]; + return total / Math.max(el, 1); + } + + @Override + public String toString() { + return super.toString() + " mode: " + mode + ", threads: " + k; + } + + /** Build a representative mixed-schema frame (string + numeric columns). */ + public static IGenerate mixedFrame(int rows, long seed) { + ValueType[] schema = new ValueType[] {ValueType.STRING, ValueType.INT64, ValueType.FP64, ValueType.BOOLEAN, + ValueType.INT32, ValueType.FP32}; + return new ConstFrame(TestUtils.generateRandomFrameBlock(rows, schema, seed)); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java new file mode 100644 index 00000000000..7012be44426 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java @@ -0,0 +1,737 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Random; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.CompilerConfig; +import org.apache.sysds.conf.CompilerConfig.ConfigType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.DeltaKernelUtils; +import org.apache.sysds.runtime.io.FrameReader; +import org.apache.sysds.runtime.io.FrameReaderDelta; +import org.apache.sysds.runtime.io.FrameReaderDeltaParallel; +import org.apache.sysds.runtime.io.FrameReaderFactory; +import org.apache.sysds.runtime.io.FrameWriterDelta; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Direct (no DML) round-trip tests for the native Delta Kernel based frame reader/writer. Each test writes a FrameBlock + * to a fresh local Delta table directory and reads it back, asserting the discovered schema, column names, dimensions, + * and per-cell values match. Several tests additionally assert that the parallel reader + * ({@link FrameReaderDeltaParallel}) agrees with the serial reader cell-for-cell across a multi-file table (both its + * direct and buffered paths). + */ +public class DeltaFrameReadWriteTest { + + // nonsense schema/dims handed to the reader to confirm it discovers everything + private static final ValueType[] NO_SCHEMA = new ValueType[] {ValueType.STRING}; + private static final String[] NO_NAMES = new String[] {"x"}; + + // small target file size + enough random rows so the writer rolls multiple + // data files, exercising the per-file parallel read path rather than the + // single-file serial fallback. + private static final long SMALL_TARGET_FILE_SIZE = 512L * 1024; + private static final int ROWS_MULTI_FILE = 150_000; + + // mixed-type schema used by the multi-file round-trip tests; random data is + // generated via TestUtils rather than a bespoke per-test generator. + private static final ValueType[] MIXED_SCHEMA = {ValueType.STRING, ValueType.INT64, ValueType.FP64, + ValueType.BOOLEAN, ValueType.INT32, ValueType.FP32}; + + private static FrameBlock writeThenRead(FrameBlock in) throws Exception { + Path dir = Files.createTempDirectory("sysds_delta_frame_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + // pass nonsense schema/dims: the reader must discover everything from the table + return new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + private static FrameBlock alloc(ValueType[] schema, String[] names, int nrow) { + FrameBlock fb = new FrameBlock(schema, names); + fb.ensureAllocatedColumns(nrow); + return fb; + } + + @FunctionalInterface + private interface TableTest { + void accept(FrameBlock in, String tablePath) throws Exception; + } + + /** + * Write {@code in} to a fresh temp Delta table with a small target file size (so the writer rolls multiple data + * files), assert the layout really is multi-file, then run {@code body} against the table. Local config and the + * temp directory are always cleaned up. + */ + private static void withSmallTargetTable(FrameBlock in, TableTest body) throws Exception { + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_mf_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + assertMultiFile(tablePath); + body.accept(in, tablePath); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void roundTripMixedTypes() throws Exception { + ValueType[] schema = {ValueType.STRING, ValueType.INT64, ValueType.FP64, ValueType.BOOLEAN, ValueType.INT32, + ValueType.FP32}; + String[] names = {"name", "id", "score", "active", "count", "ratio"}; + int nrow = 5; + FrameBlock in = alloc(schema, names, nrow); + for(int r = 0; r < nrow; r++) { + in.set(r, 0, "row" + r); + in.set(r, 1, (long) (r * 1000L + 7)); + in.set(r, 2, r + 0.5); + in.set(r, 3, (r % 2 == 0)); + in.set(r, 4, r * 3); + in.set(r, 5, (float) (r / 4.0)); + } + + FrameBlock out = writeThenRead(in); + + assertEquals(nrow, out.getNumRows()); + assertEquals(schema.length, out.getNumColumns()); + // schema and names discovered from the table + for(int c = 0; c < schema.length; c++) { + assertEquals("schema col " + c, schema[c], out.getSchema()[c]); + assertEquals("name col " + c, names[c], out.getColumnNames()[c]); + } + // values (compare as strings to be type-agnostic across boxed numerics) + for(int r = 0; r < nrow; r++) + for(int c = 0; c < schema.length; c++) + assertEquals("cell (" + r + "," + c + ")", in.get(r, c).toString(), out.get(r, c).toString()); + } + + @Test + public void roundTripMultiBatch() throws Exception { + // more rows than the writer batch size (4096) to exercise chunking + ValueType[] schema = {ValueType.INT64, ValueType.STRING}; + String[] names = {"k", "v"}; + int nrow = 10000; + FrameBlock in = alloc(schema, names, nrow); + for(int r = 0; r < nrow; r++) { + in.set(r, 0, (long) r); + in.set(r, 1, "v" + r); + } + + FrameBlock out = writeThenRead(in); + assertEquals(nrow, out.getNumRows()); + assertEquals(2, out.getNumColumns()); + for(int r = 0; r < nrow; r++) { + assertEquals((long) r, ((Number) out.get(r, 0)).longValue()); + assertEquals("v" + r, out.get(r, 1).toString()); + } + } + + @Test + public void roundTripWithStringNulls() throws Exception { + // nulls are only representable in object-backed (string) columns; numeric + // frame columns store primitives and cannot carry a null. + ValueType[] schema = {ValueType.STRING, ValueType.FP64}; + String[] names = {"s", "d"}; + int nrow = 4; + FrameBlock in = alloc(schema, names, nrow); + in.set(0, 0, "a"); + in.set(0, 1, 1.0); + in.set(1, 0, null); + in.set(1, 1, 2.0); + in.set(2, 0, "c"); + in.set(2, 1, 3.0); + in.set(3, 0, null); + in.set(3, 1, 4.0); + + FrameBlock out = writeThenRead(in); + assertEquals(nrow, out.getNumRows()); + assertEquals(2, out.getNumColumns()); + assertEquals("a", out.get(0, 0).toString()); + assertEquals(1.0, ((Number) out.get(0, 1)).doubleValue(), 1e-12); + assertNull(out.get(1, 0)); + assertEquals(2.0, ((Number) out.get(1, 1)).doubleValue(), 1e-12); + assertEquals("c", out.get(2, 0).toString()); + assertEquals(3.0, ((Number) out.get(2, 1)).doubleValue(), 1e-12); + assertNull(out.get(3, 0)); + assertEquals(4.0, ((Number) out.get(3, 1)).doubleValue(), 1e-12); + } + + @Test + public void parallelReadMatchesSerialMultiFile() throws Exception { + FrameBlock in = TestUtils.generateRandomFrameBlock(ROWS_MULTI_FILE, MIXED_SCHEMA, 13); + withSmallTargetTable(in, (frame, tablePath) -> { + FrameBlock serial = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + FrameBlock parallel = new FrameReaderDeltaParallel().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, + -1); + assertFramesEqual(serial, parallel); + }); + } + + @Test + public void parallelBufferedPathMatchesSerialMultiFile() throws Exception { + // the direct fast path is always taken for SystemDS-written tables (exact + // row stats, no deletion vectors); force the buffered fallback to exercise + // its per-file decode + serial concatenation and assert it matches serial. + FrameBlock in = TestUtils.generateRandomFrameBlock(ROWS_MULTI_FILE, MIXED_SCHEMA, 23); + withSmallTargetTable(in, (frame, tablePath) -> { + FrameBlock serial = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + // subclass that always declines the direct path -> readBuffered() + FrameBlock buffered = new FrameReaderDeltaParallel() { + @Override + protected boolean useDirectPath(DeltaKernelUtils.ScanHandle h) { + return false; + } + }.readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(serial, buffered); + }); + } + + @Test + public void serialBufferedPathMatchesDirectMultiFile() throws Exception { + // the direct (pre-sized, metadata-driven) path is always taken for SystemDS- + // written tables; force the serial buffered fallback (per-batch extract + + // concatenate) to exercise it and assert it matches the direct read. + FrameBlock in = TestUtils.generateRandomFrameBlock(ROWS_MULTI_FILE, MIXED_SCHEMA, 29); + withSmallTargetTable(in, (frame, tablePath) -> { + FrameBlock direct = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + // subclass that always declines the direct path -> buffered extract+concat + FrameBlock buffered = new FrameReaderDelta() { + @Override + protected boolean useDirectPath(DeltaKernelUtils.ScanHandle h) { + return false; + } + }.readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(direct, buffered); + }); + } + + @Test + public void adaptiveTargetFileSizeClampsAndRespectsFlag() { + // cap chosen above the 4MB floor so both clamp directions are observable + final long cap = 64L * 1024 * 1024; + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(cap)); + conf.setTextValue(DMLConfig.DELTA_WRITER_ADAPTIVE_FILE_SIZE, "true"); + ConfigurationManager.setLocalConfig(conf); + try { + assertEquals("estimatedBytes<=0 -> configured cap", cap, DeltaKernelUtils.adaptiveWriterTargetFileSize(0)); + assertEquals("negative estimate -> configured cap", cap, DeltaKernelUtils.adaptiveWriterTargetFileSize(-1)); + assertEquals("huge table -> never above the configured cap", cap, + DeltaKernelUtils.adaptiveWriterTargetFileSize(Long.MAX_VALUE / 2)); + assertEquals("tiny table -> never below the floor", DeltaKernelUtils.ADAPTIVE_WRITER_MIN_FILE_SIZE, + DeltaKernelUtils.adaptiveWriterTargetFileSize(1)); + + conf.setTextValue(DMLConfig.DELTA_WRITER_ADAPTIVE_FILE_SIZE, "false"); + assertEquals("flag OFF -> always the configured cap regardless of size", cap, + DeltaKernelUtils.adaptiveWriterTargetFileSize(1)); + } + finally { + ConfigurationManager.clearLocalConfigs(); + } + } + + @Test + public void factoryRoutesDeltaToParallelWhenEnabled() { + // the factory must pick the parallel frame reader iff parallel CP read is enabled + CompilerConfig cc = ConfigurationManager.getCompilerConfig(); + try { + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, true); + ConfigurationManager.setLocalConfig(cc); + FrameReader par = FrameReaderFactory.createFrameReader(FileFormat.DELTA); + assertTrue("expected FrameReaderDeltaParallel when parallel read enabled", + par instanceof FrameReaderDeltaParallel); + + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false); + ConfigurationManager.setLocalConfig(cc); + FrameReader ser = FrameReaderFactory.createFrameReader(FileFormat.DELTA); + assertTrue("expected serial FrameReaderDelta when parallel read disabled", + ser instanceof FrameReaderDelta && !(ser instanceof FrameReaderDeltaParallel)); + } + finally { + ConfigurationManager.clearLocalConfigs(); + } + } + + @Test + public void readerBatchSizeConfigRoundTrips() throws Exception { + // a non-default reader batch size must not change the result (more, smaller + // batches exercise the per-batch extract/concatenate loop more often). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_READER_BATCH_SIZE, "128"); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_bs_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + FrameBlock in = TestUtils.generateRandomFrameBlock(5000, MIXED_SCHEMA, 31); + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(in, out); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerTargetFileSizeConfigProducesMoreFiles() throws Exception { + // a smaller configured target file size must make the writer roll more + // data files for the same frame (the lever the parallel reader relies on); + // the multi-file layout is asserted inside withSmallTargetTable. + FrameBlock in = TestUtils.generateRandomFrameBlock(ROWS_MULTI_FILE, MIXED_SCHEMA, 41); + withSmallTargetTable(in, (frame, tablePath) -> { + // data still round-trips correctly with the custom layout + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(frame, out); + }); + } + + @Test + public void emptyFrameRoundTrip() throws Exception { + // a schema-only Delta table (no data files, 0 rows); the reader must + // rebuild empty typed columns and discover the schema/names from the table. + ValueType[] schema = {ValueType.STRING, ValueType.FP64, ValueType.INT64}; + String[] names = {"s", "d", "k"}; + DataType[] dtypes = {StringType.STRING, DoubleType.DOUBLE, LongType.LONG}; + + Path dir = Files.createTempDirectory("sysds_delta_frame_empty_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + writeEmptyTable(tablePath, names, dtypes); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertEquals("rows", 0, out.getNumRows()); + assertEquals("cols", schema.length, out.getNumColumns()); + for(int c = 0; c < schema.length; c++) { + assertEquals("schema col " + c, schema[c], out.getSchema()[c]); + assertEquals("name col " + c, names[c], out.getColumnNames()[c]); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readDiscoversSchemaAndDims() throws Exception { + // reader handed -1 dims and a nonsense schema must discover both from the table + ValueType[] schema = {ValueType.INT32, ValueType.FP32, ValueType.BOOLEAN, ValueType.STRING}; + String[] names = {"a", "b", "c", "d"}; + int nrow = 321; + FrameBlock in = alloc(schema, names, nrow); + Random rnd = new Random(7); + for(int r = 0; r < nrow; r++) { + in.set(r, 0, rnd.nextInt()); + in.set(r, 1, rnd.nextFloat()); + in.set(r, 2, rnd.nextBoolean()); + in.set(r, 3, "s" + r); + } + + Path dir = Files.createTempDirectory("sysds_delta_frame_disc_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, nrow, schema.length); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertEquals("rows", nrow, out.getNumRows()); + assertEquals("cols", schema.length, out.getNumColumns()); + for(int c = 0; c < schema.length; c++) { + assertEquals("schema col " + c, schema[c], out.getSchema()[c]); + assertEquals("name col " + c, names[c], out.getColumnNames()[c]); + } + assertFramesEqual(in, out); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readNonMappableColumnRejected() throws Exception { + // a Delta column type that does not map to a frame value type (date) must + // be rejected by the reader rather than silently mis-read. + Path dir = Files.createTempDirectory("sysds_delta_frame_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + writeDateColumn(tablePath, new int[] {0, 1, 100, 18000}); + try { + new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + fail("expected a DMLRuntimeException for a non-mappable (date) Delta column"); + } + catch(DMLRuntimeException ex) { + assertTrue("message should mention the non-mappable column, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("non-mappable")); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readShortByteColumnsCoercedToInt32() throws Exception { + // the kernel can store short/byte columns; the frame reader has no narrower + // integer value type, so both must surface as INT32 with the values intact. + short[] shorts = {0, 1, -1, Short.MAX_VALUE, Short.MIN_VALUE}; + byte[] bytes = {0, 7, -7, Byte.MAX_VALUE, Byte.MIN_VALUE}; + Path dir = Files.createTempDirectory("sysds_delta_frame_sb_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + writeShortByteColumns(tablePath, shorts, bytes); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertEquals("rows", shorts.length, out.getNumRows()); + assertEquals("cols", 2, out.getNumColumns()); + assertEquals("short column coerced to INT32", ValueType.INT32, out.getSchema()[0]); + assertEquals("byte column coerced to INT32", ValueType.INT32, out.getSchema()[1]); + for(int r = 0; r < shorts.length; r++) { + assertEquals("short cell (" + r + ")", shorts[r], ((Number) out.get(r, 0)).intValue()); + assertEquals("byte cell (" + r + ")", bytes[r], ((Number) out.get(r, 1)).intValue()); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerRejectsDimensionMismatch() throws Exception { + ValueType[] schema = {ValueType.STRING, ValueType.INT64}; + String[] names = {"s", "k"}; + int nrow = 3; + FrameBlock fb = alloc(schema, names, nrow); + for(int r = 0; r < nrow; r++) { + fb.set(r, 0, "r" + r); + fb.set(r, 1, (long) r); + } + Path dir = Files.createTempDirectory("sysds_delta_frame_dim_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + // declare one more row than the frame actually has -> writer must reject + new FrameWriterDelta().writeFrameToHDFS(fb, tablePath, fb.getNumRows() + 1, fb.getNumColumns()); + fail("expected an IOException for a frame/metadata dimension mismatch"); + } + catch(IOException ex) { + assertTrue("message should mention the dimension mismatch, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("dimensions mismatch")); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readFromInputStreamUnsupported() throws Exception { + // Delta is a directory-based table format; stream reads are not supported + try { + new FrameReaderDelta().readFrameFromInputStream(null, NO_SCHEMA, NO_NAMES, -1, -1); + fail("expected UnsupportedOperationException for a Delta input-stream read"); + } + catch(UnsupportedOperationException ex) { + // must throw before touching the (null) stream, for the documented reason + assertTrue("message should mention input stream, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("input stream")); + } + } + + @Test + public void parallelReadStringNullsMatchSerialMultiFile() throws Exception { + // string nulls across a multi-file table: the parallel direct path must + // reproduce the serial read cell-for-cell (assertFramesEqual uses + // assertEquals, so nulls are compared faithfully). + ValueType[] schema = {ValueType.STRING, ValueType.INT64}; + String[] names = {"s", "k"}; + int nrow = ROWS_MULTI_FILE; + FrameBlock in = alloc(schema, names, nrow); + for(int r = 0; r < nrow; r++) { + // interspersed string nulls (every 7th row) plus a numeric column + in.set(r, 0, (r % 7 == 0) ? null : "s" + r); + in.set(r, 1, (long) r); + } + withSmallTargetTable(in, (frame, tablePath) -> { + FrameBlock serial = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + FrameBlock parallel = new FrameReaderDeltaParallel().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, + -1); + assertFramesEqual(serial, parallel); + }); + } + + private static void assertMultiFile(String tablePath) throws Exception { + long files = DeltaFrameTestUtils.countParquet(tablePath); + assertTrue("expected a multi-file Delta table to exercise the parallel path, got " + files, files > 1); + } + + private static void assertFramesEqual(FrameBlock expected, FrameBlock actual) { + assertEquals("rows", expected.getNumRows(), actual.getNumRows()); + assertEquals("cols", expected.getNumColumns(), actual.getNumColumns()); + int ncol = expected.getNumColumns(); + for(int c = 0; c < ncol; c++) { + assertEquals("schema col " + c, expected.getSchema()[c], actual.getSchema()[c]); + assertEquals("name col " + c, expected.getColumnNames()[c], actual.getColumnNames()[c]); + } + int nrow = expected.getNumRows(); + for(int r = 0; r < nrow; r++) + for(int c = 0; c < ncol; c++) + assertEquals("cell (" + r + "," + c + ")", expected.get(r, c), actual.get(r, c)); + } + + /** Commits a schema-only Delta table (no data files) to exercise the 0-row read path. */ + private static void writeEmptyTable(String tablePath, String[] names, DataType[] dtypes) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + StructType schema = new StructType(); + for(int c = 0; c < dtypes.length; c++) + schema = schema.add(names[c], dtypes[c], true); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, empty()); + } + + private static CloseableIterator empty() { + return new CloseableIterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public FilteredColumnarBatch next() { + throw new NoSuchElementException(); + } + + @Override + public void close() { + } + }; + } + + /** + * Writes a single date column (kernel stores dates as INT32 days) used to assert the frame reader rejects a + * non-mappable column type. + */ + private static void writeDateColumn(String tablePath, int[] days) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + final StructType schema = new StructType().add("d", DateType.DATE, false); + ColumnarBatch batch = new ColumnarBatch() { + @Override + public StructType getSchema() { + return schema; + } + + @Override + public int getSize() { + return days.length; + } + + @Override + public ColumnVector getColumnVector(int ordinal) { + return new DateVector(days); + } + }; + FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty()); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb)); + } + + /** + * Writes a short column and a byte column (kernel stores these as 16/8-bit integers) used to assert the frame + * reader coerces both to INT32. + */ + private static void writeShortByteColumns(String tablePath, short[] shorts, byte[] bytes) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + final StructType schema = new StructType().add("sh", ShortType.SHORT, false).add("by", ByteType.BYTE, false); + ColumnarBatch batch = new ColumnarBatch() { + @Override + public StructType getSchema() { + return schema; + } + + @Override + public int getSize() { + return shorts.length; + } + + @Override + public ColumnVector getColumnVector(int ordinal) { + return (ordinal == 0) ? new ShortVector(shorts) : new ByteVector(bytes); + } + }; + FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty()); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb)); + } + + private static CloseableIterator singleton(FilteredColumnarBatch fcb) { + return new CloseableIterator() { + private boolean _done = false; + + @Override + public boolean hasNext() { + return !_done; + } + + @Override + public FilteredColumnarBatch next() { + if(_done) + throw new NoSuchElementException(); + _done = true; + return fcb; + } + + @Override + public void close() { + } + }; + } + + /** Column view exposing an int[] as a Delta date column. */ + private static class DateVector implements ColumnVector { + private final int[] _days; + + DateVector(int[] days) { + _days = days; + } + + @Override + public DataType getDataType() { + return DateType.DATE; + } + + @Override + public int getSize() { + return _days.length; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public int getInt(int rowId) { + return _days[rowId]; + } + + @Override + public void close() { + } + } + + /** Column view exposing a short[] as a Delta short column. */ + private static class ShortVector implements ColumnVector { + private final short[] _vals; + + ShortVector(short[] vals) { + _vals = vals; + } + + @Override + public DataType getDataType() { + return ShortType.SHORT; + } + + @Override + public int getSize() { + return _vals.length; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public short getShort(int rowId) { + return _vals[rowId]; + } + + @Override + public void close() { + } + } + + /** Column view exposing a byte[] as a Delta byte column. */ + private static class ByteVector implements ColumnVector { + private final byte[] _vals; + + ByteVector(byte[] vals) { + _vals = vals; + } + + @Override + public DataType getDataType() { + return ByteType.BYTE; + } + + @Override + public int getSize() { + return _vals.length; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public byte getByte(int rowId) { + return _vals[rowId]; + } + + @Override + public void close() { + } + } +} diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaFrameSparkInteropTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameSparkInteropTest.java new file mode 100644 index 00000000000..d17d9ae4005 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameSparkInteropTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.FrameReaderDelta; +import org.apache.sysds.runtime.io.FrameReaderDeltaParallel; +import org.apache.sysds.runtime.io.FrameWriterDelta; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Cross-engine interoperability tests for the native (Delta Kernel based) frame reader/writer against the reference + * Delta implementation (Delta's Spark connector, {@code delta-spark}, pulled in test-only). + * + *

+ * The other Delta frame tests round-trip exclusively through SystemDS' own Kernel-based read/write paths, so they + * cannot catch a table that SystemDS writes in a way other Delta engines reject (or vice versa). These tests close that + * gap by routing a mixed-type (long/double/string/boolean) frame through two independent engines: + *

    + *
  • SystemDS writes -> Spark/Delta reads (our output is spec-compliant), and
  • + *
  • Spark/Delta writes -> SystemDS reads, including a multi-file layout and a table with deletion vectors / a + * second commit that the SystemDS writer never produces itself.
  • + *
+ * + *

+ * Row order is never assumed: every table carries a unique id in column 0 and comparisons are keyed by that id, since + * neither engine guarantees row order across files. + */ +@net.jcip.annotations.NotThreadSafe +public class DeltaFrameSparkInteropTest { + + // nonsense schema/dims handed to the reader to confirm it discovers everything from the table + private static final ValueType[] NO_SCHEMA = new ValueType[] {ValueType.STRING}; + private static final String[] NO_NAMES = new String[] {"x"}; + + private static SparkSession spark; + + @BeforeClass + public static void startSpark() { + // each test class runs in its own fork (surefire reuseForks=false), so this + // is the only SparkSession in the JVM and gets the Delta extensions injected. + SparkSession.clearActiveSession(); + SparkSession.clearDefaultSession(); + spark = SparkSession.builder().appName("sysds-delta-frame-interop").master("local[2]") + .config("spark.ui.enabled", "false").config("spark.sql.shuffle.partitions", "2") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + if(spark != null) + spark.stop(); + SparkSession.clearActiveSession(); + SparkSession.clearDefaultSession(); + spark = null; + } + + @Test + public void systemdsWriteSparkReadMultiFile() throws Exception { + // SystemDS writes a (forced) multi-file mixed-type frame Delta table; the + // reference Delta engine (Spark) must read every data file back with + // matching values across all four column types. + int rows = 20_000, cols = 4; + FrameBlock in = indexedFrame(rows); + + // small target file size -> multiple parquet data files (exercise that an + // external reader stitches all of our data files, not just the first). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(16L * 1024)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_s2s_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, rows, cols); + assertTrue("writer should have produced a multi-file table", countParquet(tablePath) > 1); + + Dataset df = spark.read().format("delta").load(tablePath); + assertEquals("rows", rows, df.count()); + assertEquals("cols", cols, df.schema().fields().length); + + List read = df.collectAsList(); + assertEquals(rows, read.size()); + boolean[] seen = new boolean[rows]; + for(Row r : read) { + int id = (int) r.getLong(0); + assertTrue("id in range and unique: " + id, id >= 0 && id < rows && !seen[id]); + seen[id] = true; + assertEquals("id" + id + " c1", dval(id), r.getDouble(1), 1e-9); + assertEquals("id" + id + " c2", sval(id), r.getString(2)); + assertEquals("id" + id + " c3", Boolean.valueOf(bval(id)), Boolean.valueOf(r.getBoolean(3))); + } + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void sparkWriteSystemdsReadMultiFile() throws Exception { + // the reference Delta engine writes a multi-file mixed-type table; both the + // serial and parallel SystemDS frame readers must reconstruct it cell-for-cell. + int rows = 600; + Dataset df = indexedDataFrame(rows).repartition(3); // -> multiple data files + Path dir = Files.createTempDirectory("sysds_delta_frame_p2s_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + df.write().format("delta").save(tablePath); + assertTrue("spark should have written a multi-file table", countParquet(tablePath) > 1); + + Set expected = idRange(0, rows); + assertFrameMatchesIds(new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1), + expected, "serial"); + assertFrameMatchesIds( + new FrameReaderDeltaParallel().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1), expected, + "parallel"); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void sparkDeletionVectorsSystemdsRead() throws Exception { + // a Delta table with deletion vectors + a second commit (the DELETE) is a + // layout the SystemDS writer never emits; the frame readers must honor the DV + // and return only the surviving rows. With DVs present hasExactRowCounts() is + // false, so this drives the buffered (selection-mask) frame read path. + int rows = 400, deleteBelow = 50; + Path dir = Files.createTempDirectory("sysds_delta_frame_dv_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + // enable deletion vectors for tables created in this block, then delete a + // row range so Delta records a DV rather than rewriting the data files. + spark.conf().set(DV_DEFAULT, "true"); + indexedDataFrame(rows).write().format("delta").save(tablePath); + spark.sql("DELETE FROM delta.`" + tablePath + "` WHERE c0 < " + deleteBelow); + + Set expected = idRange(deleteBelow, rows); + + FrameBlock serial = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertEquals("surviving rows (serial)", rows - deleteBelow, serial.getNumRows()); + assertFrameMatchesIds(serial, expected, "serial-dv"); + + FrameBlock parallel = new FrameReaderDeltaParallel().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, + -1); + assertEquals("surviving rows (parallel)", rows - deleteBelow, parallel.getNumRows()); + assertFrameMatchesIds(parallel, expected, "parallel-dv"); + } + finally { + // fresh fork per test class, so simply clearing the override is enough + spark.conf().unset(DV_DEFAULT); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + private static final String DV_DEFAULT = "spark.databricks.delta.properties.defaults.enableDeletionVectors"; + + // deterministic, exactly-representable cell values keyed by the row id in column 0 + private static double dval(int id) { + return id * 0.5 - 1.0; + } + + private static String sval(int id) { + return "s" + id; + } + + private static boolean bval(int id) { + return id % 2 == 0; + } + + /** Frame whose column 0 is the row id and the remaining columns are exact per-id values. */ + private static FrameBlock indexedFrame(int rows) { + ValueType[] schema = {ValueType.INT64, ValueType.FP64, ValueType.STRING, ValueType.BOOLEAN}; + String[] names = {"c0", "c1", "c2", "c3"}; + FrameBlock fb = new FrameBlock(schema, names); + fb.ensureAllocatedColumns(rows); + for(int r = 0; r < rows; r++) { + fb.set(r, 0, (long) r); + fb.set(r, 1, dval(r)); + fb.set(r, 2, sval(r)); + fb.set(r, 3, bval(r)); + } + return fb; + } + + /** Spark DataFrame mirroring {@link #indexedFrame} with columns c0..c3 (long/double/string/boolean). */ + private Dataset indexedDataFrame(int rows) { + StructType schema = DataTypes + .createStructType(new StructField[] {DataTypes.createStructField("c0", DataTypes.LongType, false), + DataTypes.createStructField("c1", DataTypes.DoubleType, false), + DataTypes.createStructField("c2", DataTypes.StringType, false), + DataTypes.createStructField("c3", DataTypes.BooleanType, false)}); + + List data = new ArrayList<>(rows); + for(int r = 0; r < rows; r++) + data.add(RowFactory.create((long) r, dval(r), sval(r), bval(r))); + return spark.createDataFrame(data, schema); + } + + private static Set idRange(int fromInclusive, int toExclusive) { + Set ids = new LinkedHashSet<>(toExclusive - fromInclusive); + for(int id = fromInclusive; id < toExclusive; id++) + ids.add(id); + return ids; + } + + /** Asserts every row of {@code out} (keyed by its column-0 id) is expected and carries the exact per-id values. */ + private static void assertFrameMatchesIds(FrameBlock out, Set expectedIds, String tag) { + assertEquals(tag + " rows", expectedIds.size(), out.getNumRows()); + assertEquals(tag + " cols", 4, out.getNumColumns()); + // discovered types: long->INT64, double->FP64, string->STRING, boolean->BOOLEAN + assertEquals(tag + " c0 type", ValueType.INT64, out.getSchema()[0]); + assertEquals(tag + " c1 type", ValueType.FP64, out.getSchema()[1]); + assertEquals(tag + " c2 type", ValueType.STRING, out.getSchema()[2]); + assertEquals(tag + " c3 type", ValueType.BOOLEAN, out.getSchema()[3]); + Set seen = new HashSet<>(); + for(int r = 0; r < out.getNumRows(); r++) { + int id = ((Number) out.get(r, 0)).intValue(); + assertTrue(tag + ": unexpected/duplicate id " + id, expectedIds.contains(id) && seen.add(id)); + assertEquals(tag + " id" + id + " c1", dval(id), ((Number) out.get(r, 1)).doubleValue(), 1e-9); + assertEquals(tag + " id" + id + " c2", sval(id), out.get(r, 2).toString()); + assertEquals(tag + " id" + id + " c3", Boolean.valueOf(bval(id)), out.get(r, 3)); + } + } + + private static long countParquet(String tablePath) throws Exception { + return DeltaFrameTestUtils.countParquet(tablePath); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaFrameTestUtils.java b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameTestUtils.java new file mode 100644 index 00000000000..fe025c40eae --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameTestUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; + +/** Shared helpers for the native Delta frame read/write tests and benchmarks. */ +public class DeltaFrameTestUtils { + + private DeltaFrameTestUtils() { + // utility class + } + + /** Count the parquet data files under a Delta table directory. */ + public static long countParquet(String tablePath) throws Exception { + try(Stream s = Files.walk(new File(tablePath).toPath())) { + return s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java new file mode 100644 index 00000000000..2c4799d16f0 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.delta; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.HashMap; + +import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +/** + * End-to-end DML test of the native Delta frame read/write path. + * + *

+ * As in the matrix variant, the write and the read run as two separate SystemDS executions so the read is a genuine + * disk read rather than an in-memory cache hit. We additionally assert via {@link CacheStatistics} that the write run + * wrote (delta + text reference) and the read run read (delta + text reference) from HDFS, so a short-circuited path + * would fail the test. + *

+ */ +public class FrameDeltaReadWriteTest extends AutomatedTestBase { + + private final static String TEST_DIR = "functions/io/delta/"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameDeltaReadWriteTest.class.getSimpleName() + "/"; + private final static String WRITE_NAME = "FrameDeltaWrite"; + private final static String READ_NAME = "FrameDeltaReadCompare"; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(WRITE_NAME, new TestConfiguration(TEST_CLASS_DIR, WRITE_NAME, new String[] {"ref"})); + addTestConfiguration(READ_NAME, new TestConfiguration(TEST_CLASS_DIR, READ_NAME, new String[] {"R"})); + } + + @Test + public void testDenseRoundTrip() { + runFrameDeltaRoundTrip(200, 12, 1.0); + } + + @Test + public void testSparseRoundTrip() { + runFrameDeltaRoundTrip(640, 8, 0.2); + } + + @Test + public void testMultiBatchRoundTrip() { + runFrameDeltaRoundTrip(9000, 4, 1.0); + } + + private void runFrameDeltaRoundTrip(int rows, int cols, double sparsity) { + try { + String HOME = SCRIPT_DIR + TEST_DIR; + + // ---- phase 1: write the frame as a Delta table + text reference ---- + getAndLoadTestConfiguration(WRITE_NAME); + String deltaPath = output("deltaTable"); + String refPath = output("ref"); + fullDMLScriptName = HOME + WRITE_NAME + ".dml"; + programArgs = new String[] {"-stats", "-args", String.valueOf(rows), String.valueOf(cols), + String.valueOf(sparsity), deltaPath, refPath}; + runTest(true, false, null, -1); + + // the write run must materialize two objects to disk: the frame Delta + // table under test + the matrix text reference. FrameWriterDelta genuinely + // hitting HDFS is what produces the frame-side write statistic. + long hdfsWrites = CacheStatistics.getHDFSWrites(); + assertTrue("expected >= 2 HDFS writes in the write run (delta frame + reference), got " + hdfsWrites, + hdfsWrites >= 2); + // and a real Delta table (transaction log) must have been created + assertTrue("missing Delta transaction log under " + deltaPath, + new File(deltaPath, "_delta_log").isDirectory()); + + // ---- phase 2: fresh execution reads the Delta frame and compares ---- + getAndLoadTestConfiguration(READ_NAME); + fullDMLScriptName = HOME + READ_NAME + ".dml"; + programArgs = new String[] {"-stats", "-args", deltaPath, refPath, output("R")}; + runTest(true, false, null, -1); + + long hdfsReads = CacheStatistics.getHDFSHits(); + assertTrue("expected >= 2 HDFS reads in the read run (delta + reference), got " + hdfsReads, + hdfsReads >= 2); + + HashMap R = readDMLMatrixFromOutputDir("R"); + double diff = R.getOrDefault(new CellIndex(1, 1), 0.0); + double nrow = R.getOrDefault(new CellIndex(1, 2), 0.0); + double ncol = R.getOrDefault(new CellIndex(1, 3), 0.0); + + assertEquals("reconstruction error", 0.0, diff, 1e-12); + assertEquals("discovered rows", rows, (int) nrow); + assertEquals("discovered cols", cols, (int) ncol); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml new file mode 100644 index 00000000000..cdf1f0794fc --- /dev/null +++ b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml @@ -0,0 +1,35 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Reader side of the native Delta frame round-trip test. Reads the Delta table +# as a frame (schema + dimensions discovered from the transaction log) and the +# text matrix reference, both genuine HDFS reads in a fresh process, then +# reports the elementwise reconstruction error and the discovered dimensions. + +Y = read($1, data_type="frame", format="delta") +Xref = read($2, format="text") + +M = as.matrix(Y) +R = matrix(0, rows=1, cols=3) +R[1,1] = sum(abs(Xref - M)) # 0 if FrameReaderDelta reconstructed the frame exactly +R[1,2] = nrow(Y) # discovered row count +R[1,3] = ncol(Y) # discovered column count +write(R, $3) diff --git a/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml new file mode 100644 index 00000000000..5e152dde013 --- /dev/null +++ b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml @@ -0,0 +1,32 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Writer side of the native Delta frame round-trip test. Generates a matrix, +# converts it to a frame, and materializes it as a Delta table (under test). +# The same matrix is also written as a plain text reference. Running the +# read/compare in a SEPARATE process prevents SystemDS from short-circuiting +# the subsequent read against the in-memory frame, so FrameReaderDelta is +# actually exercised. + +X = rand(rows=$1, cols=$2, min=-5, max=5, seed=7, sparsity=$3) +F = as.frame(X) +write(F, $4, format="delta") +write(X, $5, format="text")