Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,16 @@ public static int getDeltaWriterBatchSize() {
return getDMLConfig().getIntValue(DMLConfig.DELTA_WRITER_BATCH_SIZE);
}

/** @return target data-file size (bytes) for the native Delta writer */
/** @return upper bound (bytes) on the native Delta writer's target data-file size */
public static long getDeltaWriterTargetFileSize() {
return Long.parseLong(getDMLConfig().getTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE));
}

/** @return whether the native Delta writer adaptively sizes data files for parallel reads */
public static boolean isDeltaWriterAdaptiveFileSize() {
return getDMLConfig().getBooleanValue(DMLConfig.DELTA_WRITER_ADAPTIVE_FILE_SIZE);
}

public static boolean isFederatedSSL(){
return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION);
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/apache/sysds/conf/DMLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public class DMLConfig
public static final String IO_COMPRESSION_CODEC = "sysds.io.compression.encoding";
public static final String DELTA_READER_BATCH_SIZE = "sysds.io.delta.reader.batchsize"; // int: rows per parquet read batch
public static final String DELTA_WRITER_BATCH_SIZE = "sysds.io.delta.writer.batchsize"; // int: matrix rows materialized per columnar batch handed to the engine
public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: target data-file size in bytes (smaller -> more files -> more parallel-read throughput)
public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: upper bound on target data-file size in bytes; adaptive sizing may pick smaller -> more files -> more parallel-read throughput
public static final String DELTA_WRITER_ADAPTIVE_FILE_SIZE = "sysds.io.delta.writer.adaptivefilesize"; // boolean: size data files toward one per parallel reader (capped by targetfilesize)
public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply
public static final String PARALLEL_ENCODE_STAGED = "sysds.parallel.encode.staged";
public static final String PARALLEL_ENCODE_APPLY_BLOCKS = "sysds.parallel.encode.applyBlocks";
Expand Down Expand Up @@ -163,7 +164,8 @@ public class DMLConfig
_defaultVals.put(IO_COMPRESSION_CODEC, "none");
_defaultVals.put(DELTA_READER_BATCH_SIZE, "4096"); // rows per parquet read batch (Delta Kernel default 1024)
_defaultVals.put(DELTA_WRITER_BATCH_SIZE, "4096"); // matrix rows materialized per columnar batch handed to the engine
_defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(64L * 1024 * 1024)); // 64MB target data-file size (Delta Kernel default 128MB) -> more files -> more parallel-read throughput
_defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(64L * 1024 * 1024)); // 64MB cap on target data-file size; adaptive sizing may pick smaller -> more files -> more parallel-read throughput
_defaultVals.put(DELTA_WRITER_ADAPTIVE_FILE_SIZE, "true"); // size data files toward one per parallel reader
_defaultVals.put(PARALLEL_TOKENIZE, "false");
_defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64");
_defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.sysds.runtime.controlprogram.caching;


import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
Expand Down Expand Up @@ -54,13 +54,11 @@
import java.util.List;
import java.util.concurrent.Future;


public class FrameObject extends CacheableData<FrameBlock>
{
public class FrameObject extends CacheableData<FrameBlock> {
private static final long serialVersionUID = 1755082174281927785L;

private ValueType[] _schema = null;

protected FrameObject() {
super(DataType.FRAME, ValueType.STRING);
}
Expand All @@ -82,89 +80,88 @@ public FrameObject(String fname, MetaData meta, ValueType[] schema) {
setMetaData(meta);
setSchema(schema);
}

/**
* Copy constructor that copies meta data but NO data.
*
*
* @param fo frame object
*/
public FrameObject(FrameObject fo) {
super(fo);

MetaDataFormat metaOld = (MetaDataFormat) fo.getMetaData();
_metaData = new MetaDataFormat(
new MatrixCharacteristics(metaOld.getDataCharacteristics()),
_metaData = new MetaDataFormat(new MatrixCharacteristics(metaOld.getDataCharacteristics()),
metaOld.getFileFormat());
_schema = fo._schema.clone();
}

@Override
public ValueType[] getSchema() {
return _schema;
}

/**
* Obtain schema of value types
*
*
* @param cl column lower bound, inclusive
* @param cu column upper bound, inclusive
* @return schema of value types
*/
public ValueType[] getSchema(int cl, int cu) {
return (_schema!=null && _schema.length>cu) ? Arrays.copyOfRange(_schema, cl, cu+1) :
UtilFunctions.nCopies(cu-cl+1, ValueType.STRING);
return (_schema != null && _schema.length > cu) ? Arrays.copyOfRange(_schema, cl, cu + 1) : UtilFunctions
.nCopies(cu - cl + 1, ValueType.STRING);
}

/**
* Creates a new collection which contains the schema of the current
* frame object concatenated with the schema of the passed frame object.
*
* Creates a new collection which contains the schema of the current frame object concatenated with the schema of
* the passed frame object.
*
* @param fo frame object
* @return schema of value types
*/
public ValueType[] mergeSchemas(FrameObject fo) {
return ArrayUtils.addAll(
(_schema!=null) ? _schema : UtilFunctions.nCopies((int)getNumColumns(), ValueType.STRING),
(fo._schema!=null) ? fo._schema : UtilFunctions.nCopies((int)fo.getNumColumns(), ValueType.STRING));
}
(_schema != null) ? _schema : UtilFunctions.nCopies((int) getNumColumns(), ValueType.STRING),
(fo._schema != null) ? fo._schema : UtilFunctions.nCopies((int) fo.getNumColumns(), ValueType.STRING));
}

public void setSchema(String schema) {
if( schema.equals("*") ) {
//populate default schema
if(schema.equals("*")) {
// populate default schema
int clen = (int) getNumColumns();
if( clen >= 0 ) //known number of cols
if(clen >= 0) // known number of cols
_schema = UtilFunctions.nCopies(clen, ValueType.STRING);
}
else
else
_schema = parseSchema(schema);
}

public static ValueType[] parseSchema(String schema) {
if(schema == null)
return new ValueType[]{ValueType.STRING};
return new ValueType[] {ValueType.STRING};
// parse given schema
String[] parts = schema.split(DataExpression.DEFAULT_DELIM_DELIMITER);
ValueType[] ret = new ValueType[parts.length];
for(int i = 0; i < parts.length; i++)
ret[i] = ValueType.fromExternalString(parts[i].toUpperCase());
return ret;
}

public void setSchema(ValueType[] schema) {
_schema = schema;
}

@Override
public void refreshMetaData() {
if ( _data == null || _metaData ==null ) //refresh only for existing data
throw new DMLRuntimeException("Cannot refresh meta data because there is no data or meta data. ");
if(_data == null || _metaData == null) // refresh only for existing data
throw new DMLRuntimeException("Cannot refresh meta data because there is no data or meta data. ");

//update matrix characteristics
// update matrix characteristics
DataCharacteristics dc = _metaData.getDataCharacteristics();
dc.setDimension( _data.getNumRows(),_data.getNumColumns() );
dc.setNonZeros(_data.getNumRows()*_data.getNumColumns());
//update schema information
dc.setDimension(_data.getNumRows(), _data.getNumColumns());
dc.setNonZeros(_data.getNumRows() * _data.getNumColumns());

// update schema information
_schema = _data.getSchema();
}

Expand All @@ -177,14 +174,14 @@ public long getNumColumns() {
DataCharacteristics dc = getDataCharacteristics();
return dc.getCols();
}

@Override
protected FrameBlock readBlobFromCache(String fname) throws IOException {
FrameBlock fb = null;
if (OptimizerUtils.isUMMEnabled())
if(OptimizerUtils.isUMMEnabled())
fb = (FrameBlock) UnifiedMemoryManager.readBlock(fname, false);
else
fb = (FrameBlock)LazyWriteBuffer.readBlock(fname, false);
fb = (FrameBlock) LazyWriteBuffer.readBlock(fname, false);
return fb;
}

Expand All @@ -203,61 +200,63 @@ protected FrameBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcept
.createFrameReader(iimd.getFileFormat(), getFileFormatProperties())
.readFrameFromHDFS(fname, lschema, dc.getRows(), dc.getCols());

if(iimd.getFileFormat() == FileFormat.CSV)
// sanity check correct output (before dereferencing data below)
if(data == null)
throw new IOException("Unable to load frame from file: " + fname);

// Delta and CSV discover dimensions (and Delta also schema) at read time, so
// refresh the cached metadata to reflect the materialized frame block.
if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) {
_metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(data.getDataCharacteristics(),
iimd.getFileFormat()) : new MetaData(data.getDataCharacteristics());
if(iimd.getFileFormat() == FileFormat.DELTA)
_schema = data.getSchema();
}

// sanity check correct output
if(data == null)
throw new IOException("Unable to load frame from file: " + fname);
return data;
}

@Override
protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status)
throws IOException
{
//note: the read of a frame block from an RDD might trigger
//lazy evaluation of pending transformations.
protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status) throws IOException {
// note: the read of a frame block from an RDD might trigger
// lazy evaluation of pending transformations.
RDDObject lrdd = rdd;

//prepare return status (by default only collect)
// prepare return status (by default only collect)
status.setValue(false);

MetaDataFormat iimd = (MetaDataFormat) _metaData;
DataCharacteristics dc = iimd.getDataCharacteristics();
int rlen = (int)dc.getRows();
int clen = (int)dc.getCols();
//handle missing schema if necessary
ValueType[] lschema = (_schema!=null) ? _schema :
UtilFunctions.nCopies(clen>=1 ? (int)clen : 1, ValueType.STRING);
int rlen = (int) dc.getRows();
int clen = (int) dc.getCols();

// handle missing schema if necessary
ValueType[] lschema = (_schema != null) ? _schema : UtilFunctions.nCopies(clen >= 1 ? (int) clen : 1,
ValueType.STRING);

FrameBlock fb = null;
try {
//prevent unnecessary collect through rdd checkpoint
if( rdd.allowsShortCircuitCollect() ) {
lrdd = (RDDObject)rdd.getLineageChilds().get(0);
try {
// prevent unnecessary collect through rdd checkpoint
if(rdd.allowsShortCircuitCollect()) {
lrdd = (RDDObject) rdd.getLineageChilds().get(0);
}
//collect frame block from binary block RDD
fb = SparkExecutionContext.toFrameBlock(lrdd, lschema, rlen, clen);

// collect frame block from binary block RDD
fb = SparkExecutionContext.toFrameBlock(lrdd, lschema, rlen, clen);
}
catch(DMLRuntimeException ex) {
throw new IOException(ex);
}
//sanity check correct output
if( fb == null )

// sanity check correct output
if(fb == null)
throw new IOException("Unable to load frame from rdd.");

return fb;
}

@Override
protected FrameBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
throws IOException
{
protected FrameBlock readBlobFromFederated(FederationMap fedMap, long[] dims) throws IOException {
FrameBlock ret = new FrameBlock(_schema);
// provide long support?
ret.ensureAllocatedColumns((int) dims[0]);
Expand All @@ -268,8 +267,8 @@ protected FrameBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
FederatedResponse response = readResponse.getRight().get();
// add result
FrameBlock multRes = (FrameBlock) response.getData()[0];
for (int r = 0; r < multRes.getNumRows(); r++) {
for (int c = 0; c < multRes.getNumColumns(); c++) {
for(int r = 0; r < multRes.getNumRows(); r++) {
for(int c = 0; c < multRes.getNumColumns(); c++) {
int destRow = range.getBeginDimsInt()[0] + r;
int destCol = range.getBeginDimsInt()[1] + c;
ret.set(destRow, destCol, multRes.get(r, c));
Expand All @@ -280,38 +279,37 @@ protected FrameBlock readBlobFromFederated(FederationMap fedMap, long[] dims)
catch(Exception e) {
throw new DMLRuntimeException("Federated Frame read failed.", e);
}

return ret;
}

@Override
protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
throws IOException, DMLRuntimeException
{
throws IOException, DMLRuntimeException {
MetaDataFormat iimd = (MetaDataFormat) _metaData;
FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : iimd.getFileFormat());

FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop);
writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns());

if(DMLScript.STATISTICS)
CacheStatistics.incrementHDFSWrites();
}

@Override
protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
throws IOException, DMLRuntimeException
{
throws IOException, DMLRuntimeException {
throw new UnsupportedOperationException();
}


@Override
protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
throws IOException, DMLRuntimeException
{
//prepare output info
throws IOException, DMLRuntimeException {
// prepare output info
MetaDataFormat iimd = (MetaDataFormat) _metaData;

//note: the write of an RDD to HDFS might trigger
//lazy evaluation of pending transformations.
// note: the write of an RDD to HDFS might trigger
// lazy evaluation of pending transformations.
SparkExecutionContext.writeFrameRDDtoHDFS(rdd, fname, iimd.getFileFormat());
}

Expand All @@ -320,11 +318,9 @@ protected FrameBlock readBlobFromStream(OOCStream<IndexedMatrixValue> stream) th
// TODO Auto-generated method stub
return null;
}

@Override
protected FrameBlock reconstructByLineage(LineageItem li) throws IOException {
return ((FrameObject) LineageRecomputeUtils
.parseNComputeLineageTrace(li.getData()))
.acquireReadAndRelease();
return ((FrameObject) LineageRecomputeUtils.parseNComputeLineageTrace(li.getData())).acquireReadAndRelease();
}
}
Loading
Loading