Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
365d2b8
Basic Structure for ColumnDecoder
Isso-W May 17, 2025
d5dc7c1
Basic Structure for ColumnDecoder
Isso-W Jun 9, 2025
3712b9c
Main logic in ColumnDecoderComposite done, ColumnDecoderBin as exampl…
Isso-W Jun 9, 2025
ca836a4
update two encode methods for PassThrough and Recode
EmilyHongYT Jun 10, 2025
f43a853
Main logic in ColumnDecoderComposite done, ColumnDecoderBin as exampl…
Isso-W Jun 12, 2025
84077b0
Mixed test and optimization on Pass through
Isso-W Jun 13, 2025
8fcbbcc
Middle version
Isso-W Jun 15, 2025
c02fa76
Middle version
Isso-W Jun 15, 2025
3c66fcc
Middle version
Isso-W Jun 15, 2025
8da2e0d
Middle version
Isso-W Jun 16, 2025
19a4437
Middle version
Isso-W Jun 16, 2025
34a4341
Bin, Recode, Passthrough done
Isso-W Jun 16, 2025
0c5acdb
update dummy funtion for Processing columns in parallel
EmilyHongYT Jun 18, 2025
98865e7
Middle version
Isso-W Jun 18, 2025
2886537
Middle version
Isso-W Jun 18, 2025
b75b7d1
Middle version
Isso-W Jun 18, 2025
61f6d39
Update pom.xml
Isso-W Jun 19, 2025
839b050
Add Licenses to new classes
Isso-W Jun 30, 2025
595f4b7
Merge remote-tracking branch 'origin/main'
Isso-W Jun 30, 2025
bfcb133
Update ColumnDecoderMixedMethodsTest.java
Isso-W Jun 30, 2025
0e67c0a
little fix to multi-thread, update time measure methode
Isso-W Jun 30, 2025
91fbc54
Merge remote-tracking branch 'origin/main'
Isso-W Jun 30, 2025
ec7fe78
multi-thread fix, bin optimization
Isso-W Jul 1, 2025
6dfa925
Huge refactor: Changed input structure for decoders. Now working prop…
Isso-W Jul 5, 2025
b8c4f88
update record funtion for Processing columns and test it
EmilyHongYT Jul 9, 2025
d62f358
small fixes on bin
Isso-W Jul 10, 2025
56e7cd9
Merge remote-tracking branch 'origin/main'
Isso-W Jul 10, 2025
d7a3228
update recode & dummy funtion for Processing columns and test it
EmilyHongYT Jul 15, 2025
0a35c28
update Factory & PassThrough part for Processing columns and test it
EmilyHongYT Jul 15, 2025
81c405f
print runtime for dummy and recode when processing single column
EmilyHongYT Jul 15, 2025
91cc343
fixed dummy decoder
Isso-W Jul 16, 2025
8448990
update recode & dummy funtion for Processing columns and test it
EmilyHongYT Jul 15, 2025
dd14314
update Factory & PassThrough part for Processing columns and test it
EmilyHongYT Jul 15, 2025
41cb69b
print runtime for dummy and recode when processing single column
EmilyHongYT Jul 15, 2025
ba3dc04
fixed dummy decoder
Isso-W Jul 16, 2025
bb92e76
little improvement
Isso-W Jul 16, 2025
abafe03
Bug fixes and replace decoder
Isso-W Jul 17, 2025
9b67321
Update pom.xml
Isso-W Jul 21, 2025
64651ec
Merge branch 'apache:main' into main
Isso-W Jul 21, 2025
808cba2
cleaned code and add documentations
Isso-W Jul 28, 2025
0156c58
Merge remote-tracking branch 'origin/main'
Isso-W Jul 28, 2025
337334e
remove internal time counter, add test class using FT Benchmark adult…
Isso-W Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.transform.decode;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;


import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

public abstract class ColumnDecoder implements Externalizable {
protected static final Log LOG = LogFactory.getLog(Decoder.class.getName());
private static final long serialVersionUID = -1732411001366177787L;

protected ValueType[] _schema;
protected int[] _colList;
protected String[] _colnames = null;
protected ColumnDecoder(ValueType[] schema, int[] colList) {
_schema = schema;
_colList = colList;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a column list? A column encoder should work on a single column.


public ValueType[] getSchema() {
return _schema;
}

public void setColnames(String[] colnames) {
_colnames = colnames;
}

public String[] getColnames() {
return _colnames;
}

public int[] getColList() {return _colList;}
/**
* Block decode API converting a matrix block into a frame block.
*
* @param in Input matrix block
* @param out Output frame block
* @return returns given output frame block for convenience
*/
public abstract FrameBlock columnDecode(MatrixBlock in, FrameBlock out);

/**
* Block decode API converting a matrix block into a frame block in parallel.
*
* @param in Input matrix block
* @param out Output frame block
* @param k Parallelization degree
* @return returns the given output frame block for convenience
*/
public FrameBlock columnDecode(MatrixBlock in, FrameBlock out, int k) {
return columnDecode(in, out);
}

/**
* Block decode row block
*
* @param in input Matrix Block
* @param out output FrameBlock
* @param rl row start to decode
* @param ru row end to decode (not inclusive)
*/
public abstract void columnDecode(MatrixBlock in, FrameBlock out, int rl, int ru);

/**
* Returns a new Decoder that only handles a sub range of columns. The sub-range refers to the columns after
* decoding.
*
* @param colStart the start index of the sub-range (1-based, inclusive)
* @param colEnd the end index of the sub-range (1-based, exclusive)
* @param dummycodedOffset the offset of dummycoded segments before colStart
* @return a decoder of the same type, just for the sub-range
*/
public ColumnDecoder subRangeDecoder(int colStart, int colEnd, int dummycodedOffset) {
throw new DMLRuntimeException(
getClass().getSimpleName() + " does not support the creation of a sub-range decoder");
}

/**
* Update index-ranges to after decoding. Note that only Dummycoding changes the ranges.
*
* @param beginDims the begin indexes before encoding
* @param endDims the end indexes before encoding
*/
public void updateIndexRanges(long[] beginDims, long[] endDims) {
// do nothing - default
}

public abstract void initMetaData(FrameBlock meta);

/**
* Redirects the default java serialization via externalizable to our default
* hadoop writable serialization for efficient broadcast/rdd serialization.
*
* @param os object output
* @throws IOException if IOException occurs
*/
@Override
public void writeExternal(ObjectOutput os)
throws IOException
{
int size1 = (_colList == null) ? 0 : _colList.length;
os.writeInt(size1);
for(int i = 0; i < size1; i++)
os.writeInt(_colList[i]);

int size2 = (_colnames == null) ? 0 : _colnames.length;
os.writeInt(size2);
for(int j = 0; j < size2; j++)
os.writeUTF(_colnames[j]);

int size3 = (_schema == null) ? 0 : _schema.length;
os.writeInt(size3);
for(int j = 0; j < size3; j++)
os.writeByte(_schema[j].ordinal());
}

/**
* Redirects the default java serialization via externalizable to our default
* hadoop writable serialization for efficient broadcast/rdd deserialization.
*
* @param in object input
* @throws IOException if IOException occur
*/
@Override
public void readExternal(ObjectInput in)
throws IOException
{
int size1 = in.readInt();
_colList = (size1 == 0) ? null : new int[size1];
for(int i = 0; i < size1; i++)
_colList[i] = in.readInt();

int size2 = in.readInt();
_colnames = (size2 == 0) ? null : new String[size2];
for(int j = 0; j < size2; j++) {
_colnames[j] = in.readUTF();
}

int size3 = in.readInt();
_schema = (size3 == 0) ? null : new ValueType[size3];
for(int j = 0; j < size3; j++) {
_schema[j] = ValueType.values()[in.readByte()];
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.runtime.transform.decode;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.frame.data.columns.Array;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.UtilFunctions;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

public class ColumnDecoderBin extends ColumnDecoder {
private static final long serialVersionUID = -3784249774608228805L;

private int[] _numBins;
private double[][] _binMins = null;
private double[][] _binMaxs = null;

public ColumnDecoderBin() {
super(null, null);
}

protected ColumnDecoderBin(ValueType[] schema, int[] binCols) {
super(schema, binCols);
}


//@Override
//public FrameBlock columnDecode(MatrixBlock in, FrameBlock out) {
//
// long b1 = System.nanoTime();
// out.ensureAllocatedColumns(in.getNumRows());
// for (int i = 0; i < in.getNumRows(); i++) {
// for (int j = 0; j < _colList.length; j++) {
// double val = in.get(i, j);
// if (!Double.isNaN(val)) {
// int key = (int) Math.round(val);
// double bmin = _binMins[j][key - 1];
// double bmax = _binMaxs[j][key - 1];
// double oval = bmin + (bmax - bmin) / 2 + (val - key) * (bmax - bmin);
// out.getColumn(_colList[j] - 1).set(i, oval);
// } else {
// out.getColumn(_colList[j] - 1).set(i, val);
// }
// }
// }
// //columnDecode(in, out, 0, in.getNumRows());
// long b2 = System.nanoTime();
// System.out.println(this.getClass() + "time: " + (b2 - b1) / 1e6 + " ms");
// return out;
//}

@Override
public FrameBlock columnDecode(MatrixBlock in, FrameBlock out) {
long b1 = System.nanoTime();
out.ensureAllocatedColumns(in.getNumRows());

final int outColIndex = _colList[0] - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the outColIndex is always _colList[0] - 1?

final double[] binMins = _binMins[0];
final double[] binMaxs = _binMaxs[0];
final int nRows = in.getNumRows();
Array<?> a = out.getColumn(0);
for (int i = 0; i < nRows; i++) {
double val = in.get(i, 0);
double decoded;
if (!Double.isNaN(val)) {
int key = (int) Math.round(val);
double bmin = binMins[key - 1];
double bmax = binMaxs[key - 1];
decoded = bmin + (bmax - bmin) / 2
+ (val - key) * (bmax - bmin);
a.set(i, decoded);
} else {
a.set(i, val);
}
}
long b2 = System.nanoTime();
System.out.println(this.getClass() +": "+ (b2 - b1) / 1e6 + " ms");
return out;
}


@Override
public void columnDecode(MatrixBlock in, FrameBlock out, int rl, int ru) {
for (int i = rl; i < ru; i++) {
for (int j = 0; j < _colList.length; j++) {
double val = in.get(i, j);
if (!Double.isNaN(val)) {
int key = (int) Math.round(val);
double bmin = _binMins[j][key - 1];
double bmax = _binMaxs[j][key - 1];
double oval = bmin + (bmax - bmin) / 2 + (val - key) * (bmax - bmin);
out.getColumn(_colList[j] - 1).set(i, oval);
} else {
out.getColumn(_colList[j] - 1).set(i, val);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you are iterating all columns in a column decoder.

}
}

//@Override
//public ColumnDecoder subRangeDecoder(int colStart, int colEnd, int dummycodedOffset) {
// if (colEnd - colStart != 1)
// throw new NotImplementedException();
//
// for (int i = 0; i < _colList.length; i++) {
// if (_colList[i] == colStart) {
// ValueType[] schema = (_schema != null) ? new ValueType[]{_schema[colStart - 1]} : null;
// ColumnDecoderBin sub = new ColumnDecoderBin(schema, new int[]{colStart});
// sub._numBins = new int[]{_numBins[i]};
// sub._binMins = new double[][]{_binMins[i]};
// sub._binMaxs = new double[][]{_binMaxs[i]};
// return sub;
// }
// }
// return null;
//}

@Override
public ColumnDecoder subRangeDecoder(int colStart, int colEnd, int dummycodedOffset) {

for (int i = 0; i < _colList.length; i++) {
long b1 = System.nanoTime();
ValueType[] schema = (_schema != null) ? new ValueType[]{_schema[colStart - 1]} : null;
if (_colList[i] == colStart) {
ColumnDecoderBin sub = new ColumnDecoderBin(schema, new int[]{colStart});
sub._numBins = new int[]{_numBins[i]};
sub._binMins = new double[][]{_binMins[i]};
sub._binMaxs = new double[][]{_binMaxs[i]};
return sub;
}
long b2 = System.nanoTime();
System.out.println("time: " + (b2 - b1) / 1e6 + " ms");
}
return null;
}

@Override
public void initMetaData(FrameBlock meta) {
//initialize bin boundaries
_numBins = new int[_colList.length];
_binMins = new double[_colList.length][];
_binMaxs = new double[_colList.length][];

//parse and insert bin boundaries
for( int j=0; j<_colList.length; j++ ) {
int numBins = (int)meta.getColumnMetadata(_colList[j]-1).getNumDistinct();
_binMins[j] = new double[numBins];
_binMaxs[j] = new double[numBins];
for( int i=0; i<meta.getNumRows() & i<numBins; i++ ) {
if( meta.get(i, _colList[j]-1)==null ) {
if( i+1 < numBins )
throw new DMLRuntimeException("Did not reach number of bins: "+(i+1)+"/"+numBins);
break; //reached end of bins
}
String[] parts = UtilFunctions.splitRecodeEntry(
meta.get(i, _colList[j]-1).toString());
_binMins[j][i] = Double.parseDouble(parts[0]);
_binMaxs[j][i] = Double.parseDouble(parts[1]);
}
}
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
for( int i=0; i<_colList.length; i++ ) {
int len = _numBins[i];
out.writeInt(len);
for(int j=0; j<len; j++) {
out.writeDouble(_binMins[i][j]);
out.writeDouble(_binMaxs[i][j]);
}
}
}

@Override
public void readExternal(ObjectInput in) throws IOException {
super.readExternal(in);
_numBins = new int[_colList.length];
_binMins = new double[_colList.length][];
_binMaxs = new double[_colList.length][];
for( int i=0; i<_colList.length; i++ ) {
int len = in.readInt();
_numBins[i] = len;
for(int j=0; j<len; j++) {
_binMins[i][j] = in.readDouble();
_binMaxs[i][j] = in.readDouble();
}
}
}
}
Loading
Loading