From 321156578f08ae6478185e278a536ddc06492e49 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Sat, 13 Jun 2026 16:54:26 +0200 Subject: [PATCH 01/16] Add CompressionType enum --- .../runtime/compress/CompressionType.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/CompressionType.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionType.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionType.java new file mode 100644 index 00000000000..ba6a7b9851d --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionType.java @@ -0,0 +1,43 @@ +package org.apache.sysds.runtime.compress; + +/** + * Enumeration of supported compression techniques for federated learning. + * Used for configuration, serialization, and technique selection. + * + * @author Nirvan C. Udaysingh Jhurree + */ +public enum CompressionType { + + /** TopK sparsification: keep largest-magnitude elements only */ + TOPK("topk", "Top-K Sparsification"), + + /** Probabilistic quantization: reduce precision with stochastic rounding */ + PROBABILISTIC_QUANTIZATION("prob_quant", "Probabilistic Quantization"), + + /** 1-bit compressed sensing: sign-only transmission + iterative reconstruction */ + ONE_BIT_CS("1bit_cs", "1-Bit Compressed Sensing"), + + /** No compression (passthrough) */ + NONE("none", "No Compression"); + + private final String id; + private final String description; + + CompressionType(String id, String description) { + this.id = id; + this.description = description; + } + + public String getId() { return id; } + public String getDescription() { return description; } + + /** Parse from string identifier (case-insensitive) */ + public static CompressionType fromString(String text) { + for (CompressionType type : CompressionType.values()) { + if (type.id.equalsIgnoreCase(text)) { + return type; + } + } + throw new IllegalArgumentException("Unknown compression type: " + text); + } +} \ No newline at end of file From 36d7263cc9ab7bbd6f3a024f1313ad2d8d3cb533 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Sat, 13 Jun 2026 16:58:36 +0200 Subject: [PATCH 02/16] Add CompressionException and DecompressionException --- .../exceptions/CompressionException.java | 19 +++++++++++++++++++ .../exceptions/DecompressionException.java | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/exceptions/CompressionException.java create mode 100644 src/main/java/org/apache/sysds/runtime/compress/exceptions/DecompressionException.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/exceptions/CompressionException.java b/src/main/java/org/apache/sysds/runtime/compress/exceptions/CompressionException.java new file mode 100644 index 00000000000..da62873aa7d --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/exceptions/CompressionException.java @@ -0,0 +1,19 @@ +package org.apache.sysds.runtime.compress.exceptions; + +/** + * Exception thrown when matrix compression fails. + * + * @author Nirvan C. UdaysinghJhurree + */ +public class CompressionException extends Exception { + + private static final long serialVersionUID = 1L; + + public CompressionException(String message) { + super(message); + } + + public CompressionException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/compress/exceptions/DecompressionException.java b/src/main/java/org/apache/sysds/runtime/compress/exceptions/DecompressionException.java new file mode 100644 index 00000000000..ff060588200 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/exceptions/DecompressionException.java @@ -0,0 +1,19 @@ +package org.apache.sysds.runtime.compress.exceptions; + +/** + * Exception thrown when matrix decompression fails. + * + * @author Nirvan C. Udaysingh Jhurree + */ +public class DecompressionException extends Exception { + + private static final long serialVersionUID = 1L; + + public DecompressionException(String message) { + super(message); + } + + public DecompressionException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file From 23d7022e0f785b6ed8969556f73f3e5d6e1022cf Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Sat, 13 Jun 2026 17:02:07 +0200 Subject: [PATCH 03/16] Add CompressedMatrix container class --- .../runtime/compress/CompressedMatrix.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/CompressedMatrix.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrix.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrix.java new file mode 100644 index 00000000000..a24b5255d1c --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrix.java @@ -0,0 +1,64 @@ +package org.apache.sysds.runtime.compress; + +import java.io.Serializable; + +/** + * Generic container for compressed matrix data. + * Stores the compressed representation along with metadata + * needed for decompression and size estimation. + * + * @author Nirvan C. Udaysingh Jhurree + */ +public class CompressedMatrix implements Serializable { + + private static final long serialVersionUID = 1L; + + private final CompressionType type; + private final int numRows; + private final int numCols; + private final Object compressedData; // Technique-specific data + private final double compressionRatio; + private final byte[] metadata; // Optional: scaling factors, etc. + + public CompressedMatrix(CompressionType type, int numRows, int numCols, + Object compressedData, double compressionRatio) { + this(type, numRows, numCols, compressedData, compressionRatio, null); + } + + public CompressedMatrix(CompressionType type, int numRows, int numCols, + Object compressedData, double compressionRatio, + byte[] metadata) { + this.type = type; + this.numRows = numRows; + this.numCols = numCols; + this.compressedData = compressedData; + this.compressionRatio = compressionRatio; + this.metadata = metadata; + } + + public CompressionType getType() { return type; } + public int getNumRows() { return numRows; } + public int getNumCols() { return numCols; } + public Object getCompressedData() { return compressedData; } + public double getCompressionRatio() { return compressionRatio; } + public byte[] getMetadata() { return metadata; } + + /** Estimate original size in bytes (8 bytes per double) */ + public long estimateOriginalSizeBytes() { + return (long) numRows * numCols * 8; + } + + /** Estimate compressed size in bytes */ + public long getCompressedSizeBytes() { + if (compressedData instanceof byte[]) { + return ((byte[]) compressedData).length; + } + return 0; + } + + @Override + public String toString() { + return String.format("CompressedMatrix[%s, %dx%d, ratio=%.2fx]", + type.getId(), numRows, numCols, compressionRatio); + } +} \ No newline at end of file From ec8f5df0149073642dbec57734a6158f94949d7a Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Sat, 13 Jun 2026 17:06:12 +0200 Subject: [PATCH 04/16] Add MatrixCompressor interface --- .../runtime/compress/MatrixCompressor.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/MatrixCompressor.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/MatrixCompressor.java b/src/main/java/org/apache/sysds/runtime/compress/MatrixCompressor.java new file mode 100644 index 00000000000..88d235e9d2f --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/MatrixCompressor.java @@ -0,0 +1,44 @@ +package org.apache.sysds.runtime.compress; + +import org.apache.sysds.runtime.compress.exceptions.CompressionException; +import org.apache.sysds.runtime.compress.exceptions.DecompressionException; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +/** + * Interface for matrix compression techniques in federated learning. + * All compressors must implement compress/decompress operations. + * + * @author Nirvan C. Udaysingh Jhurree + */ +public interface MatrixCompressor { + + /** + * Compress a matrix block for transmission. + * @param input The source matrix to compress + * @return CompressedMatrix containing compressed data and metadata + * @throws CompressionException if compression fails + */ + CompressedMatrix compress(MatrixBlock input) throws CompressionException; + + /** + * Decompress a compressed matrix back to MatrixBlock. + * @param compressed The compressed data to decompress + * @return Reconstructed MatrixBlock (may be approximate) + * @throws DecompressionException if decompression fails + */ + MatrixBlock decompress(CompressedMatrix compressed) throws DecompressionException; + + /** + * Get the compression technique identifier. + * @return CompressionType enum value + */ + CompressionType getCompressionType(); + + /** + * Estimate the compression ratio achieved. + * Higher is better (e.g. 10.0 means 10x smaller). + */ + default double estimateCompressionRatio(long originalSize, long compressedSize) { + return compressedSize == 0 ? Double.MAX_VALUE : (double) originalSize / compressedSize; + } +} \ No newline at end of file From 58e592dfa837c3e84cf4e207c6a54d93c0e0078d Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Sat, 13 Jun 2026 17:09:01 +0200 Subject: [PATCH 05/16] Add TopKData holder class --- .../sysds/runtime/compress/TopK/TopKData.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/TopK/TopKData.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKData.java b/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKData.java new file mode 100644 index 00000000000..509653f7cd0 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKData.java @@ -0,0 +1,44 @@ +package org.apache.sysds.runtime.compress.TopK; + +import java.io.Serializable; + +/** + * Immutable container for TopK-compressed matrix data. + * Stores only the K largest-magnitude elements with their positions, + * designed for efficient serialization across federated workers. + * + * @author Nirvan C. Udaysingh Jhurree + */ +public class TopKData implements Serializable { + + private static final long serialVersionUID = 1L; + + public final int[] indices; // Linear indices of kept elements (row*numCols + col) + public final double[] values; // Corresponding original values + public final int numCols; // Needed for index → (row, col) conversion + + public TopKData(int[] indices, double[] values, int numCols) { + if (indices.length != values.length) { + throw new IllegalArgumentException( + "Indices and values arrays must have the same length"); + } + this.indices = indices.clone(); // Defensive copy + this.values = values.clone(); + this.numCols = numCols; + } + + /** Number of kept elements */ + public int size() { + return indices.length; + } + + /** Estimate serialized size in bytes (4 bytes per int + 8 bytes per double) */ + public long estimateSizeBytes() { + return (long) indices.length * 12 + 64; // +64 for object headers + } + + @Override + public String toString() { + return String.format("TopKData[k=%d, numCols=%d]", indices.length, numCols); + } +} \ No newline at end of file From 7ef87da09adf5d40c87415fd97cb586ae65281b4 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Sat, 13 Jun 2026 17:11:13 +0200 Subject: [PATCH 06/16] Add TopKCompressor implementation --- .../runtime/compress/TopK/TopKCompressor.java | 225 ++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java b/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java new file mode 100644 index 00000000000..d178d6cdc26 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java @@ -0,0 +1,225 @@ +package org.apache.sysds.runtime.compress.TopK; + +import org.apache.sysds.runtime.compress.CompressedMatrix; +import org.apache.sysds.runtime.compress.CompressionType; +import org.apache.sysds.runtime.compress.MatrixCompressor; +import org.apache.sysds.runtime.compress.exceptions.CompressionException; +import org.apache.sysds.runtime.compress.exceptions.DecompressionException; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * TopK Sparsification Compressor. + * + * Keeps only the K largest-magnitude elements in the matrix, + * setting all others to zero. Optimal for gradient sparsification + * in federated learning where most gradient values are near zero. + * + * Compression ratio: approximately 1/sparsityRatio + * e.g. sparsityRatio=0.01 keeps 1% of elements → ~100x compression + * + * @author Nirvan Jhurree + */ +public class TopKCompressor implements MatrixCompressor { + + private final double sparsityRatio; // Fraction of elements to keep (0, 1] + private final boolean useHeap; // Use min-heap for O(n log k) selection + + /** + * @param sparsityRatio Fraction of elements to retain e.g. 0.01 = keep top 1% + * @param useHeap If true, use priority queue (faster for large matrices) + */ + public TopKCompressor(double sparsityRatio, boolean useHeap) { + if (sparsityRatio <= 0 || sparsityRatio > 1) { + throw new IllegalArgumentException("sparsityRatio must be in (0, 1]"); + } + this.sparsityRatio = sparsityRatio; + this.useHeap = useHeap; + } + + /** Default constructor: uses heap-based selection */ + public TopKCompressor(double sparsityRatio) { + this(sparsityRatio, true); + } + + @Override + public CompressedMatrix compress(MatrixBlock input) throws CompressionException { + try { + int numRows = input.getNumRows(); + int numCols = input.getNumColumns(); + int totalElements = numRows * numCols; + int k = (int) Math.max(1, Math.ceil(totalElements * sparsityRatio)); + + // If k covers everything, no compression needed + if (k >= totalElements) { + return new CompressedMatrix( + CompressionType.TOPK, numRows, numCols, input, 1.0); + } + + // Extract all non-zero elements with their linear indices + List elements = extractElements(input, numRows, numCols); + + // If fewer non-zeros than k, keep all of them + List topK = (elements.size() <= k) + ? new ArrayList<>(elements) + : selectTopK(elements, k); + + // Pack into TopKData + TopKData data = convertToTopKData(topK, numCols); + + double ratio = calculateCompressionRatio(totalElements, topK.size()); + + return new CompressedMatrix( + CompressionType.TOPK, numRows, numCols, data, ratio); + + } catch (Exception e) { + throw new CompressionException("TopK compression failed: " + e.getMessage(), e); + } + } + + @Override + public MatrixBlock decompress(CompressedMatrix compressed) throws DecompressionException { + try { + // Handle passthrough case (no compression was applied) + if (compressed.getCompressedData() instanceof MatrixBlock) { + return (MatrixBlock) compressed.getCompressedData(); + } + + TopKData data = (TopKData) compressed.getCompressedData(); + MatrixBlock result = new MatrixBlock( + compressed.getNumRows(), + compressed.getNumCols(), + true // Start sparse + ); + result.allocateSparseRowsBlock(); + + // Place values back at their original positions + for (int i = 0; i < data.indices.length; i++) { + int linearIdx = data.indices[i]; + int row = linearIdx / data.numCols; + int col = linearIdx % data.numCols; + result.setValue(row, col, data.values[i]); + } + + result.examSparsity(); + return result; + + } catch (ClassCastException e) { + throw new DecompressionException("Invalid compressed data type for TopK", e); + } catch (Exception e) { + throw new DecompressionException("TopK decompression failed: " + e.getMessage(), e); + } + } + + @Override + public CompressionType getCompressionType() { + return CompressionType.TOPK; + } + + // ----------------------------------------------------------------------- + // Private helpers + // ----------------------------------------------------------------------- + + /** + * Extract all non-zero elements with their linear indices. + * Handles both dense and sparse MatrixBlock representations. + */ + private List extractElements(MatrixBlock input, int numRows, int numCols) { + List elements = new ArrayList<>(); + + if (input.isInSparseFormat()) { + // Sparse: iterate only over non-zero entries + for (int i = 0; i < numRows; i++) { + if (input.getSparseBlock() == null) continue; + if (input.getSparseBlock().isEmpty(i)) continue; + int[] rowIndices = input.getSparseBlock().indexes(i); + double[] rowValues = input.getSparseBlock().values(i); + int rowSize = input.getSparseBlock().size(i); + for (int j = 0; j < rowSize; j++) { + double val = rowValues[j]; + if (val != 0.0) { + int linearIdx = i * numCols + rowIndices[j]; + elements.add(new Element(linearIdx, val, Math.abs(val))); + } + } + } + } else { + // Dense: iterate all elements, skip zeros + double[] denseBlock = input.getDenseBlockValues(); + if (denseBlock != null) { + for (int i = 0; i < denseBlock.length; i++) { + if (denseBlock[i] != 0.0) { + elements.add(new Element(i, denseBlock[i], Math.abs(denseBlock[i]))); + } + } + } + } + return elements; + } + + /** + * Select top K elements by absolute value. + * Uses min-heap for O(n log k) when useHeap=true, + * or full sort O(n log n) otherwise. + */ + private List selectTopK(List elements, int k) { + if (useHeap) { + PriorityQueue minHeap = new PriorityQueue<>( + k, Comparator.comparingDouble(e -> e.absValue) + ); + for (Element e : elements) { + if (minHeap.size() < k) { + minHeap.offer(e); + } else if (e.absValue > minHeap.peek().absValue) { + minHeap.poll(); + minHeap.offer(e); + } + } + List result = new ArrayList<>(minHeap); + result.sort(Comparator.comparingInt(e -> e.index)); + return result; + } else { + elements.sort((a, b) -> Double.compare(b.absValue, a.absValue)); + return new ArrayList<>(elements.subList(0, k)); + } + } + + private TopKData convertToTopKData(List topK, int numCols) { + int[] indices = new int[topK.size()]; + double[] values = new double[topK.size()]; + for (int i = 0; i < topK.size(); i++) { + indices[i] = topK.get(i).index; + values[i] = topK.get(i).value; + } + return new TopKData(indices, values, numCols); + } + + private double calculateCompressionRatio(int total, int kept) { + if (kept == 0) return Double.MAX_VALUE; + // Original: total * 8 bytes (doubles) + // Compressed: kept * 12 bytes (int index + double value) + long originalBytes = (long) total * 8; + long compressedBytes = (long) kept * 12; + return (double) originalBytes / compressedBytes; + } + + // ----------------------------------------------------------------------- + // Inner class: element tracking during compression + // ----------------------------------------------------------------------- + + private static class Element { + final int index; // Linear index: row * numCols + col + final double value; // Original value + final double absValue; // Absolute value for magnitude comparison + + Element(int index, double value, double absValue) { + this.index = index; + this.value = value; + this.absValue = absValue; + } + } +} \ No newline at end of file From ec3f6be8de8999a5bac08bc6d355af6f8354660b Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Sat, 13 Jun 2026 17:17:27 +0200 Subject: [PATCH 07/16] Fix MatrixBlock API: setValue -> set in TopKCompressor --- .../org/apache/sysds/runtime/compress/TopK/TopKCompressor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java b/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java index d178d6cdc26..a8de26db3a6 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java +++ b/src/main/java/org/apache/sysds/runtime/compress/TopK/TopKCompressor.java @@ -102,7 +102,8 @@ public MatrixBlock decompress(CompressedMatrix compressed) throws DecompressionE int linearIdx = data.indices[i]; int row = linearIdx / data.numCols; int col = linearIdx % data.numCols; - result.setValue(row, col, data.values[i]); + //result.setValue(row, col, data.values[i]); + result.set(row, col, data.values[i]); } result.examSparsity(); From b58b601092c842f04570cc7d05f369082eaab949 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 12:43:57 +0200 Subject: [PATCH 08/16] Add QuantizedData holder class --- .../compress/Quantization/QuantizedData.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/Quantization/QuantizedData.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/Quantization/QuantizedData.java b/src/main/java/org/apache/sysds/runtime/compress/Quantization/QuantizedData.java new file mode 100644 index 00000000000..632066e9854 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/Quantization/QuantizedData.java @@ -0,0 +1,57 @@ +package org.apache.sysds.runtime.compress.Quantization; + +import java.io.Serializable; + +/** + * Immutable container for probabilistically quantized matrix data. + * Stores quantized byte indices and the scaling parameters needed + * to reconstruct approximate original values on decompression. + * + * + */ +public class QuantizedData implements Serializable { + + private static final long serialVersionUID = 1L; + + public final byte[] quantizedValues; // Quantized level indices + public final double min; // Original minimum value + public final double max; // Original maximum value + public final int levels; // Number of quantization levels (2^bits) + public final int bitsPerValue; // Bits used per element + public final int numRows; + public final int numCols; + + public QuantizedData(byte[] quantizedValues, double min, double max, + int levels, int bitsPerValue, int numRows, int numCols) { + this.quantizedValues = quantizedValues.clone(); // Defensive copy + this.min = min; + this.max = max; + this.levels = levels; + this.bitsPerValue = bitsPerValue; + this.numRows = numRows; + this.numCols = numCols; + } + + /** Number of quantized elements */ + public int size() { + return quantizedValues.length; + } + + /** Estimate serialized size in bytes */ + public long estimateSizeBytes() { + return quantizedValues.length + 64; // +64 for scalar fields and headers + } + + /** Reconstruct a double value from a quantized level index */ + public double reconstructValue(byte levelIndex) { + if (max - min < 1e-10) return min; // Constant matrix + int idx = levelIndex & 0xFF; // Treat byte as unsigned + return min + (idx / (double)(levels - 1)) * (max - min); + } + + @Override + public String toString() { + return String.format("QuantizedData[%dx%d, levels=%d, bits=%d, min=%.4f, max=%.4f]", + numRows, numCols, levels, bitsPerValue, min, max); + } +} \ No newline at end of file From 136ff91645040d57fa8c747b0a7854b5e7483dfd Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 12:58:56 +0200 Subject: [PATCH 09/16] Add ProbabilisticQuantizationCompressor implementation --- .../ProbabilisticQuantizationCompressor.java | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressor.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressor.java b/src/main/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressor.java new file mode 100644 index 00000000000..95ce2562b09 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressor.java @@ -0,0 +1,158 @@ +package org.apache.sysds.runtime.compress.Quantization; + +import org.apache.sysds.runtime.compress.CompressedMatrix; +import org.apache.sysds.runtime.compress.CompressionType; +import org.apache.sysds.runtime.compress.MatrixCompressor; +import org.apache.sysds.runtime.compress.exceptions.CompressionException; +import org.apache.sysds.runtime.compress.exceptions.DecompressionException; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import java.util.Random; + +/** + * Probabilistic Quantization Compressor. + * + * Reduces numerical precision using stochastic rounding to maintain + * an unbiased estimator — meaning E[quantized] = original on average. + * This is critical for federated learning convergence guarantees. + * + * Supports 2, 4, or 8 bits per value: + * 2-bit → 4 levels → 16x compression vs 32-bit float + * 4-bit → 16 levels → 8x compression + * 8-bit → 256 levels → 4x compression + * + * + */ +public class ProbabilisticQuantizationCompressor implements MatrixCompressor { + + private final int bitsPerValue; // 2, 4, or 8 + private final Random rng; + + public ProbabilisticQuantizationCompressor(int bitsPerValue) { + if (bitsPerValue != 2 && bitsPerValue != 4 && bitsPerValue != 8) { + throw new IllegalArgumentException("bitsPerValue must be 2, 4, or 8"); + } + this.bitsPerValue = bitsPerValue; + this.rng = new Random(42); // Fixed seed for reproducibility + } + + @Override + public CompressedMatrix compress(MatrixBlock input) throws CompressionException { + try { + int numRows = input.getNumRows(); + int numCols = input.getNumColumns(); + int totalElements = numRows * numCols; + + // Find min and max for normalization + double[] minMax = findMinMax(input, numRows, numCols); + double min = minMax[0]; + double max = minMax[1]; + + int levels = 1 << bitsPerValue; // 2^bits + + // Quantize each element probabilistically + byte[] quantized = new byte[totalElements]; + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < numCols; j++) { + double value = input.get(i, j); + quantized[i * numCols + j] = probabilisticRound(value, min, max, levels); + } + } + + double ratio = 32.0 / bitsPerValue; // vs 32-bit float + + QuantizedData data = new QuantizedData( + quantized, min, max, levels, bitsPerValue, numRows, numCols); + + return new CompressedMatrix( + CompressionType.PROBABILISTIC_QUANTIZATION, + numRows, numCols, data, ratio); + + } catch (Exception e) { + throw new CompressionException( + "Quantization compression failed: " + e.getMessage(), e); + } + } + + @Override + public MatrixBlock decompress(CompressedMatrix compressed) throws DecompressionException { + try { + QuantizedData data = (QuantizedData) compressed.getCompressedData(); + MatrixBlock result = new MatrixBlock(data.numRows, data.numCols, false); + result.allocateDenseBlock(); + + for (int i = 0; i < data.numRows; i++) { + for (int j = 0; j < data.numCols; j++) { + byte levelIndex = data.quantizedValues[i * data.numCols + j]; + double value = data.reconstructValue(levelIndex); + result.set(i, j, value); + } + } + + result.examSparsity(); + return result; + + } catch (ClassCastException e) { + throw new DecompressionException( + "Invalid compressed data type for Quantization", e); + } catch (Exception e) { + throw new DecompressionException( + "Quantization decompression failed: " + e.getMessage(), e); + } + } + + @Override + public CompressionType getCompressionType() { + return CompressionType.PROBABILISTIC_QUANTIZATION; + } + + // ----------------------------------------------------------------------- + // Private helpers + // ----------------------------------------------------------------------- + + /** + * Stochastic rounding: for value x between levels q_i and q_{i+1}: + * P(round up) = (x - q_i) / (q_{i+1} - q_i) + * P(round down) = 1 - P(round up) + * This gives E[output] = x (unbiased). + */ + private byte probabilisticRound(double value, double min, double max, int levels) { + // Handle constant matrix edge case + if (max - min < 1e-10) return 0; + + // Normalize to [0, 1] + double normalized = (value - min) / (max - min); + normalized = Math.max(0.0, Math.min(1.0, normalized)); // Clamp + + // Find bounding level indices + double scaled = normalized * (levels - 1); + int lowerIdx = (int) scaled; + int upperIdx = Math.min(lowerIdx + 1, levels - 1); + + if (lowerIdx == upperIdx) { + return (byte) lowerIdx; + } + + // Probabilistic decision + double probUp = scaled - lowerIdx; + return (rng.nextDouble() < probUp) ? (byte) upperIdx : (byte) lowerIdx; + } + + /** Find min and max values across the entire matrix */ + private double[] findMinMax(MatrixBlock input, int numRows, int numCols) { + double min = Double.MAX_VALUE; + double max = -Double.MAX_VALUE; + + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < numCols; j++) { + double val = input.get(i, j); + if (val < min) min = val; + if (val > max) max = val; + } + } + + // Handle all-zero matrix + if (min == Double.MAX_VALUE) { min = 0; max = 0; } + return new double[]{min, max}; + } +} \ No newline at end of file From 4087cbe547938858158c7ad3318f4a0e38470637 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 13:03:11 +0200 Subject: [PATCH 10/16] Add CompressionConfig builder --- .../runtime/compress/CompressionConfig.java | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/CompressionConfig.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionConfig.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionConfig.java new file mode 100644 index 00000000000..d2634bda528 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionConfig.java @@ -0,0 +1,93 @@ +package org.apache.sysds.runtime.compress; + +import java.util.HashMap; +import java.util.Map; + +/** + * Immutable configuration for compression in federated operations. + * Uses the Builder pattern for flexible, readable configuration. + * + * Usage example: + * CompressionConfig config = CompressionConfig.builder() + * .enable(true) + * .withType(CompressionType.TOPK) + * .withSparsity(0.01) + * .build(); + * + * + */ +public class CompressionConfig { + + private final boolean enabled; + private final CompressionType type; + private final Map parameters; + + private CompressionConfig(Builder builder) { + this.enabled = builder.enabled; + this.type = builder.enabled ? builder.type : CompressionType.NONE; + this.parameters = new HashMap<>(builder.parameters); + } + + public boolean isEnabled() { return enabled; } + public CompressionType getType() { return type; } + public Map getParameters() { return new HashMap<>(parameters); } + + /** Convenience getter for sparsity parameter (TopK) */ + public double getSparsity() { + return (double) parameters.getOrDefault("sparsity", 0.01); + } + + /** Convenience getter for bits parameter (Quantization) */ + public int getBits() { + return (int) parameters.getOrDefault("bits", 4); + } + + @Override + public String toString() { + return String.format("CompressionConfig[enabled=%s, type=%s, params=%s]", + enabled, type.getId(), parameters); + } + + // ----------------------------------------------------------------------- + // Builder + // ----------------------------------------------------------------------- + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private boolean enabled = false; + private CompressionType type = CompressionType.NONE; + private final Map parameters = new HashMap<>(); + + public Builder enable(boolean enabled) { + this.enabled = enabled; + return this; + } + + public Builder withType(CompressionType type) { + this.type = type; + return this; + } + + public Builder withParameter(String key, Object value) { + this.parameters.put(key, value); + return this; + } + + /** Shorthand for TopK sparsity ratio */ + public Builder withSparsity(double sparsity) { + return withParameter("sparsity", sparsity); + } + + /** Shorthand for quantization bit width */ + public Builder withBits(int bits) { + return withParameter("bits", bits); + } + + public CompressionConfig build() { + return new CompressionConfig(this); + } + } +} \ No newline at end of file From 65e35b34b5ade44e667211f3c1ff6ed1da946107 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 13:07:44 +0200 Subject: [PATCH 11/16] Add CompressionFactory with PassthroughCompressor --- .../runtime/compress/CompressionFactory.java | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/compress/CompressionFactory.java diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionFactory.java new file mode 100644 index 00000000000..04c7a982fc2 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionFactory.java @@ -0,0 +1,95 @@ +package org.apache.sysds.runtime.compress; + +import org.apache.sysds.runtime.compress.TopK.TopKCompressor; +import org.apache.sysds.runtime.compress.Quantization.ProbabilisticQuantizationCompressor; + +/** + * Factory for creating compressor instances from configuration. + * Centralizes compressor instantiation and parameter validation. + * + * Usage: + * CompressionConfig config = CompressionConfig.builder() + * .enable(true) + * .withType(CompressionType.TOPK) + * .withSparsity(0.01) + * .build(); + * MatrixCompressor compressor = CompressionFactory.create(config); + * + * + */ +public class CompressionFactory { + + private CompressionFactory() { + // Utility class — no instantiation + } + + /** + * Create a compressor from a CompressionConfig. + * @param config The compression configuration + * @return A ready-to-use MatrixCompressor + * @throws IllegalArgumentException if the config is invalid + */ + public static MatrixCompressor create(CompressionConfig config) { + if (config == null || !config.isEnabled()) { + return new PassthroughCompressor(); + } + return create(config.getType(), config); + } + + /** + * Create a compressor for a specific type with given config. + */ + public static MatrixCompressor create(CompressionType type, CompressionConfig config) { + switch (type) { + case TOPK: + double sparsity = config.getSparsity(); + return new TopKCompressor(sparsity, true); + + case PROBABILISTIC_QUANTIZATION: + int bits = config.getBits(); + return new ProbabilisticQuantizationCompressor(bits); + + case ONE_BIT_CS: + throw new UnsupportedOperationException( + "1-Bit Compressed Sensing not yet implemented"); + + case NONE: + default: + return new PassthroughCompressor(); + } + } + + // ----------------------------------------------------------------------- + // Passthrough compressor (no-op) for when compression is disabled + // ----------------------------------------------------------------------- + + /** + * No-op compressor: returns the matrix as-is. + * Used when compression is disabled or type is NONE. + */ + private static class PassthroughCompressor implements MatrixCompressor { + + @Override + public CompressedMatrix compress(org.apache.sysds.runtime.matrix.data.MatrixBlock input) + throws org.apache.sysds.runtime.compress.exceptions.CompressionException { + return new CompressedMatrix( + CompressionType.NONE, + input.getNumRows(), + input.getNumColumns(), + input, + 1.0 + ); + } + + @Override + public org.apache.sysds.runtime.matrix.data.MatrixBlock decompress(CompressedMatrix compressed) + throws org.apache.sysds.runtime.compress.exceptions.DecompressionException { + return (org.apache.sysds.runtime.matrix.data.MatrixBlock) compressed.getCompressedData(); + } + + @Override + public CompressionType getCompressionType() { + return CompressionType.NONE; + } + } +} \ No newline at end of file From f01d81f5d027033e6428072922a5501b2099b57e Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 13:19:49 +0200 Subject: [PATCH 12/16] Add TopKCompressorTest - 9 tests passing --- .../compress/TopK/TopKCompressorTest.java | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 src/test/java/org/apache/sysds/runtime/compress/TopK/TopKCompressorTest.java diff --git a/src/test/java/org/apache/sysds/runtime/compress/TopK/TopKCompressorTest.java b/src/test/java/org/apache/sysds/runtime/compress/TopK/TopKCompressorTest.java new file mode 100644 index 00000000000..3fd1fd426f2 --- /dev/null +++ b/src/test/java/org/apache/sysds/runtime/compress/TopK/TopKCompressorTest.java @@ -0,0 +1,157 @@ +package org.apache.sysds.runtime.compress.TopK; + +import org.apache.sysds.runtime.compress.CompressedMatrix; +import org.apache.sysds.runtime.compress.CompressionType; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Unit tests for TopKCompressor. + * Verifies compression ratio, reconstruction accuracy, + * and correct handling of edge cases. + * + * + */ +public class TopKCompressorTest { + + // ----------------------------------------------------------------------- + // Basic compression / decompression + // ----------------------------------------------------------------------- + + @Test + public void testTopKKeepsLargestElements() throws Exception { + // 3x3 matrix with three distinct magnitudes + MatrixBlock input = new MatrixBlock(3, 3, false); + input.allocateDenseBlock(); + input.set(0, 0, 10.0); // Largest + input.set(1, 1, 5.0); // Medium + input.set(2, 2, 1.0); // Smallest + input.examSparsity(); + + // Keep top 2 of 9 elements (~22% sparsity) + TopKCompressor compressor = new TopKCompressor(0.22); + CompressedMatrix compressed = compressor.compress(input); + MatrixBlock result = compressor.decompress(compressed); + + // Largest two values must be preserved exactly + assertEquals(10.0, result.get(0, 0), 1e-10); + assertEquals(5.0, result.get(1, 1), 1e-10); + + // Smallest should be zeroed out + assertEquals(0.0, result.get(2, 2), 1e-10); + } + + @Test + public void testCompressionTypeIsTopK() throws Exception { + MatrixBlock input = createDenseMatrix(4, 4, 1.0); + TopKCompressor compressor = new TopKCompressor(0.5); + CompressedMatrix compressed = compressor.compress(input); + assertEquals(CompressionType.TOPK, compressed.getType()); + } + + @Test + public void testDimensionsPreservedAfterDecompression() throws Exception { + MatrixBlock input = createRandomMatrix(10, 20); + TopKCompressor compressor = new TopKCompressor(0.1); + CompressedMatrix compressed = compressor.compress(input); + MatrixBlock result = compressor.decompress(compressed); + + assertEquals(10, result.getNumRows()); + assertEquals(20, result.getNumColumns()); + } + + // ----------------------------------------------------------------------- + // Compression ratio + // ----------------------------------------------------------------------- + + @Test + public void testCompressionRatioIsPositive() throws Exception { + MatrixBlock input = createRandomMatrix(50, 50); + TopKCompressor compressor = new TopKCompressor(0.01); + CompressedMatrix compressed = compressor.compress(input); + assertTrue("Compression ratio must be > 0", + compressed.getCompressionRatio() > 0); + } + + @Test + public void testLowerSparsityGivesHigherRatio() throws Exception { + MatrixBlock input = createRandomMatrix(100, 100); + + TopKCompressor c1 = new TopKCompressor(0.1); + TopKCompressor c2 = new TopKCompressor(0.01); + + double ratio1 = c1.compress(input).getCompressionRatio(); + double ratio2 = c2.compress(input).getCompressionRatio(); + + assertTrue("1% sparsity should compress more than 10%", ratio2 > ratio1); + } + + // ----------------------------------------------------------------------- + // Edge cases + // ----------------------------------------------------------------------- + + @Test + public void testAllZeroMatrix() throws Exception { + MatrixBlock input = new MatrixBlock(5, 5, false); + input.allocateDenseBlock(); + input.examSparsity(); + + TopKCompressor compressor = new TopKCompressor(0.1); + CompressedMatrix compressed = compressor.compress(input); + MatrixBlock result = compressor.decompress(compressed); + + // All zeros in → all zeros out + for (int i = 0; i < 5; i++) + for (int j = 0; j < 5; j++) + assertEquals(0.0, result.get(i, j), 1e-10); + } + + @Test + public void testSparsityOfOneKeepsEverything() throws Exception { + MatrixBlock input = createRandomMatrix(5, 5); + TopKCompressor compressor = new TopKCompressor(1.0); + CompressedMatrix compressed = compressor.compress(input); + MatrixBlock result = compressor.decompress(compressed); + + // With sparsity=1.0, all values should be preserved + for (int i = 0; i < 5; i++) + for (int j = 0; j < 5; j++) + assertEquals(input.get(i, j), result.get(i, j), 1e-10); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidSparsityThrowsException() { + new TopKCompressor(0.0); // Must be > 0 + } + + @Test(expected = IllegalArgumentException.class) + public void testSparsityAboveOneThrowsException() { + new TopKCompressor(1.5); // Must be <= 1 + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private MatrixBlock createRandomMatrix(int rows, int cols) { + MatrixBlock m = new MatrixBlock(rows, cols, false); + m.allocateDenseBlock(); + java.util.Random rng = new java.util.Random(42); + for (int i = 0; i < rows; i++) + for (int j = 0; j < cols; j++) + m.set(i, j, rng.nextGaussian() * 10); + m.examSparsity(); + return m; + } + + private MatrixBlock createDenseMatrix(int rows, int cols, double fillValue) { + MatrixBlock m = new MatrixBlock(rows, cols, false); + m.allocateDenseBlock(); + for (int i = 0; i < rows; i++) + for (int j = 0; j < cols; j++) + m.set(i, j, fillValue); + m.examSparsity(); + return m; + } +} \ No newline at end of file From 927fc196e5940106bf32ef33a9e09e5e7499cdfa Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 13:26:43 +0200 Subject: [PATCH 13/16] Add ProbabilisticQuantizationCompressorTest --- ...obabilisticQuantizationCompressorTest.java | 187 ++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 src/test/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressorTest.java diff --git a/src/test/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressorTest.java b/src/test/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressorTest.java new file mode 100644 index 00000000000..a50ae2f3f79 --- /dev/null +++ b/src/test/java/org/apache/sysds/runtime/compress/Quantization/ProbabilisticQuantizationCompressorTest.java @@ -0,0 +1,187 @@ +package org.apache.sysds.runtime.compress.Quantization; + +import org.apache.sysds.runtime.compress.CompressedMatrix; +import org.apache.sysds.runtime.compress.CompressionType; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Unit tests for ProbabilisticQuantizationCompressor. + * Verifies compression ratio, reconstruction accuracy, + * unbiasedness property, and edge case handling. + * + * + */ +public class ProbabilisticQuantizationCompressorTest { + + // ----------------------------------------------------------------------- + // Basic compression / decompression + // ----------------------------------------------------------------------- + + @Test + public void testCompressionTypeIsProbabilisticQuantization() throws Exception { + MatrixBlock input = createRandomMatrix(4, 4); + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(4); + CompressedMatrix compressed = compressor.compress(input); + assertEquals(CompressionType.PROBABILISTIC_QUANTIZATION, compressed.getType()); + } + + @Test + public void testDimensionsPreservedAfterDecompression() throws Exception { + MatrixBlock input = createRandomMatrix(10, 20); + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(4); + CompressedMatrix compressed = compressor.compress(input); + MatrixBlock result = compressor.decompress(compressed); + + assertEquals(10, result.getNumRows()); + assertEquals(20, result.getNumColumns()); + } + + @Test + public void testReconstructedValuesWithinOriginalRange() throws Exception { + MatrixBlock input = createRandomMatrix(20, 20); + + // Find original min/max + double origMin = Double.MAX_VALUE; + double origMax = -Double.MAX_VALUE; + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 20; j++) { + double v = input.get(i, j); + if (v < origMin) origMin = v; + if (v > origMax) origMax = v; + } + } + + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(4); + MatrixBlock result = compressor.decompress(compressor.compress(input)); + + // All reconstructed values must be within [min, max] + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 20; j++) { + double v = result.get(i, j); + assertTrue("Value below min: " + v, v >= origMin - 1e-9); + assertTrue("Value above max: " + v, v <= origMax + 1e-9); + } + } + } + + // ----------------------------------------------------------------------- + // Compression ratio + // ----------------------------------------------------------------------- + + @Test + public void testCompressionRatio2Bit() throws Exception { + MatrixBlock input = createRandomMatrix(10, 10); + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(2); + CompressedMatrix compressed = compressor.compress(input); + assertEquals(16.0, compressed.getCompressionRatio(), 1e-10); + } + + @Test + public void testCompressionRatio4Bit() throws Exception { + MatrixBlock input = createRandomMatrix(10, 10); + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(4); + CompressedMatrix compressed = compressor.compress(input); + assertEquals(8.0, compressed.getCompressionRatio(), 1e-10); + } + + @Test + public void testCompressionRatio8Bit() throws Exception { + MatrixBlock input = createRandomMatrix(10, 10); + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(8); + CompressedMatrix compressed = compressor.compress(input); + assertEquals(4.0, compressed.getCompressionRatio(), 1e-10); + } + + @Test + public void testFewerBitsGivesHigherRatio() throws Exception { + MatrixBlock input = createRandomMatrix(20, 20); + + double ratio2bit = new ProbabilisticQuantizationCompressor(2) + .compress(input).getCompressionRatio(); + double ratio8bit = new ProbabilisticQuantizationCompressor(8) + .compress(input).getCompressionRatio(); + + assertTrue("2-bit should compress more than 8-bit", ratio2bit > ratio8bit); + } + + // ----------------------------------------------------------------------- + // Unbiasedness: E[quantized] ≈ original over many runs + // ----------------------------------------------------------------------- + + @Test + public void testUnbiasednessOverManyRuns() throws Exception { + // Single element matrix with value 0.5 (midpoint) + MatrixBlock input = new MatrixBlock(1, 1, false); + input.allocateDenseBlock(); + input.set(0, 0, 5.0); + input.examSparsity(); + + // Run quantization 1000 times and average the results + double sum = 0.0; + int runs = 1000; + for (int r = 0; r < runs; r++) { + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(4); + MatrixBlock result = compressor.decompress(compressor.compress(input)); + sum += result.get(0, 0); + } + double average = sum / runs; + + // Average should be close to original value (unbiased estimator) + assertEquals("Quantization should be unbiased", + 5.0, average, 0.5); // Allow 0.5 tolerance + } + + // ----------------------------------------------------------------------- + // Edge cases + // ----------------------------------------------------------------------- + + @Test + public void testConstantMatrix() throws Exception { + // All same value — min == max edge case + MatrixBlock input = new MatrixBlock(3, 3, false); + input.allocateDenseBlock(); + for (int i = 0; i < 3; i++) + for (int j = 0; j < 3; j++) + input.set(i, j, 7.0); + input.examSparsity(); + + ProbabilisticQuantizationCompressor compressor = + new ProbabilisticQuantizationCompressor(4); + CompressedMatrix compressed = compressor.compress(input); + MatrixBlock result = compressor.decompress(compressed); + + // Should not throw; all values should reconstruct to min (7.0 or close) + assertNotNull(result); + assertEquals(3, result.getNumRows()); + assertEquals(3, result.getNumColumns()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidBitsThrowsException() { + new ProbabilisticQuantizationCompressor(3); // Only 2, 4, 8 allowed + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private MatrixBlock createRandomMatrix(int rows, int cols) { + MatrixBlock m = new MatrixBlock(rows, cols, false); + m.allocateDenseBlock(); + java.util.Random rng = new java.util.Random(42); + for (int i = 0; i < rows; i++) + for (int j = 0; j < cols; j++) + m.set(i, j, rng.nextGaussian() * 10); + m.examSparsity(); + return m; + } +} \ No newline at end of file From c6e5ec51123e87a11c6be783bf4574fcc941ece5 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 13:29:39 +0200 Subject: [PATCH 14/16] Add GitHub Actions CI/CD workflow for compression tests --- .github/workflows/compression-tests.yml | 46 +++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 .github/workflows/compression-tests.yml diff --git a/.github/workflows/compression-tests.yml b/.github/workflows/compression-tests.yml new file mode 100644 index 00000000000..36bca793362 --- /dev/null +++ b/.github/workflows/compression-tests.yml @@ -0,0 +1,46 @@ +name: Compression Tests + +on: + push: + branches: [ feature/compression ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Java 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + - name: Cache Maven dependencies + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Build project + run: mvn clean package -Dmaven.test.skip=true -Dmaven.javadoc.skip=true + + - name: Run compression tests + run: | + mvn test \ + -Dtest=TopKCompressorTest,ProbabilisticQuantizationCompressorTest \ + -Dmaven.test.failure.ignore=false \ + -Dmaven.javadoc.skip=true + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v3 + with: + name: test-results + path: target/surefire-reports/ \ No newline at end of file From 43a937b2f92ec065a03c966e91b8ab9709d4dcd6 Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 13:46:40 +0200 Subject: [PATCH 15/16] Integrate compression into FederationMap broadcast --- .../federated/FederationMap.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java index 68dff4785b3..c086e33b2ef 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java @@ -48,6 +48,12 @@ import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.cp.ScalarObject; +import org.apache.sysds.runtime.compress.CompressionConfig; +import org.apache.sysds.runtime.compress.CompressionFactory; +import org.apache.sysds.runtime.compress.CompressionType; +import org.apache.sysds.runtime.compress.CompressedMatrix; +import org.apache.sysds.runtime.compress.MatrixCompressor; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.util.IndexRange; @@ -138,6 +144,27 @@ private FederatedRequest broadcast(CacheableData data, LineageItem lineageIte // is fine, because with broadcast all data on all workers) data.setFedMapping(copyWithNewIDAndRange( cb.getNumRows(), cb.getNumColumns(), id, FType.BROADCAST)); + + // === COMPRESSION INTEGRATION === + // Attempt TopK compression if the block is a MatrixBlock + if(cb instanceof MatrixBlock) { + try { + CompressionConfig config = CompressionConfig.builder() + .enable(true) + .withType(CompressionType.TOPK) + .withSparsity(0.01) + .build(); + MatrixCompressor compressor = CompressionFactory.create(config); + CompressedMatrix compressed = compressor.compress((MatrixBlock) cb); + MatrixBlock decompressed = compressor.decompress(compressed); + return new FederatedRequest(RequestType.PUT_VAR, lineageItem, id, decompressed); + } + catch(Exception ex) { + // Fall back to uncompressed on any error + } + } + // === END COMPRESSION INTEGRATION === + return new FederatedRequest(RequestType.PUT_VAR, lineageItem, id, cb); } From b173446b800da8a7291a90d4245c5a31ba34b5aa Mon Sep 17 00:00:00 2001 From: Charansurya Udaysingh Jhurree Date: Mon, 29 Jun 2026 13:53:01 +0200 Subject: [PATCH 16/16] Fix deprecated upload-artifact v3 -> v4 --- .github/workflows/compression-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/compression-tests.yml b/.github/workflows/compression-tests.yml index 36bca793362..ddfd08db244 100644 --- a/.github/workflows/compression-tests.yml +++ b/.github/workflows/compression-tests.yml @@ -40,7 +40,7 @@ jobs: - name: Upload test results if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results path: target/surefire-reports/ \ No newline at end of file