Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private long readLongSlowPath() {
*
* @param fieldType the element type of the row
*/
static FieldReader createFieldReader(DataType fieldType) {
public static FieldReader createFieldReader(DataType fieldType) {
final FieldReader fieldReader;
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
Expand Down Expand Up @@ -325,7 +325,7 @@ static FieldReader createFieldReader(DataType fieldType) {
*
* @see #createFieldReader(DataType)
*/
interface FieldReader extends Serializable {
public interface FieldReader extends Serializable {
Object readField(CompactedRowReader reader, int pos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ private void writeDouble(double value) {
}

private void writeTimestampNtz(TimestampNtz value, int precision) {
if (TimestampNtz.isCompact(precision)) {
if (precision == 0) {
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
writeLong(value.getMillisecond() / 1000 * 1000);
} else if (TimestampNtz.isCompact(precision)) {
writeLong(value.getMillisecond());
} else {
writeLong(value.getMillisecond());
Expand All @@ -260,7 +263,10 @@ private void writeTimestampNtz(TimestampNtz value, int precision) {
}

private void writeTimestampLtz(TimestampLtz value, int precision) {
if (TimestampLtz.isCompact(precision)) {
if (precision == 0) {
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
writeLong(value.getEpochMillisecond() / 1000 * 1000);
} else if (TimestampLtz.isCompact(precision)) {
writeLong(value.getEpochMillisecond());
} else {
writeLong(value.getEpochMillisecond());
Expand Down Expand Up @@ -332,9 +338,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
break;
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(fieldType);
fieldWriter =
(writer, pos, value) -> {
if (timePrecision == 0) {
// truncate to seconds to keep consistence with ArrowTimeWriter
writer.writeInt((int) value / 1000 * 1000);
} else {
writer.writeInt((int) value);
}
};
break;
case BIGINT:
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.apache.fluss.types.StringType;
import org.apache.fluss.utils.MurmurHashUtils;

import java.util.Arrays;

import static org.apache.fluss.types.DataTypeChecks.getPrecision;

/**
Expand Down Expand Up @@ -252,11 +250,11 @@ public static boolean isFixedLength(DataType dataType) {
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case CHAR:
case BINARY:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return true;
case STRING:
case BINARY:
case BYTES:
return false;
case DECIMAL:
Expand Down Expand Up @@ -414,17 +412,7 @@ public TimestampLtz getTimestampLtz(int pos, int precision) {

@Override
public byte[] getBinary(int pos, int length) {
byte[] bytes = new byte[length];
segment.get(getFieldOffset(pos), bytes, 0, length);

int newLen = 0;
for (int i = length - 1; i >= 0; i--) {
if (bytes[i] != (byte) 0) {
newLen = i + 1;
break;
}
}
return Arrays.copyOfRange(bytes, 0, newLen);
return getBytes(pos);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.fluss.types.DataType;

import java.io.Serializable;
import java.util.Arrays;

import static org.apache.fluss.types.DataTypeChecks.getLength;
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
Expand Down Expand Up @@ -157,8 +156,8 @@ public TimestampNtz readTimestampNtz(int precision) {
return TimestampNtz.fromMillis(milliseconds, nanosOfMillisecond);
}

public byte[] readBinary(int length) {
return readBytesInternal(length);
public byte[] readBinary() {
return readBytes();
}

public byte[] readBytes() {
Expand Down Expand Up @@ -186,25 +185,16 @@ private BinaryString readStringInternal(int length) {
private byte[] readBytesInternal(int length) {
byte[] bytes = new byte[length];
segment.get(position, bytes, 0, length);

int newLen = 0;
for (int i = length - 1; i >= 0; i--) {
if (bytes[i] != (byte) 0) {
newLen = i + 1;
break;
}
}

position += length;
return Arrays.copyOfRange(bytes, 0, newLen);
return bytes;
}

/**
* Creates an accessor for reading elements.
*
* @param fieldType the element type of the row
*/
static FieldReader createFieldReader(DataType fieldType) {
public static FieldReader createFieldReader(DataType fieldType) {
final FieldReader fieldReader;
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
Expand All @@ -219,8 +209,7 @@ static FieldReader createFieldReader(DataType fieldType) {
fieldReader = (reader, pos) -> reader.readBoolean();
break;
case BINARY:
final int binaryLength = getLength(fieldType);
fieldReader = (reader, pos) -> reader.readBinary(binaryLength);
fieldReader = (reader, pos) -> reader.readBinary();
break;
case BYTES:
fieldReader = (reader, pos) -> reader.readBytes();
Expand Down Expand Up @@ -277,7 +266,7 @@ static FieldReader createFieldReader(DataType fieldType) {
*
* @see #createFieldReader(DataType)
*/
interface FieldReader extends Serializable {
public interface FieldReader extends Serializable {
Object readField(IndexedRowReader reader, int pos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ public void writeBinary(byte[] value, int length) {
if (value.length > length) {
throw new IllegalArgumentException();
}
byte[] newByte = new byte[length];
System.arraycopy(value, 0, newByte, 0, value.length);
write(newByte, 0, length);
writeVarLengthToVarLengthList(value.length);
write(value, 0, value.length);
}

public void writeBytes(byte[] value) {
Expand All @@ -173,7 +172,10 @@ public void writeDecimal(Decimal value, int precision) {
}

public void writeTimestampNtz(TimestampNtz value, int precision) {
if (TimestampNtz.isCompact(precision)) {
if (precision == 0) {
// truncate to seconds to keep consistence with ArrowTimestampNtzWriter
writeLong(value.getMillisecond() / 1000 * 1000);
} else if (TimestampNtz.isCompact(precision)) {
writeLong(value.getMillisecond());
} else {
writeLong(value.getMillisecond());
Expand All @@ -182,7 +184,10 @@ public void writeTimestampNtz(TimestampNtz value, int precision) {
}

public void writeTimestampLtz(TimestampLtz value, int precision) {
if (TimestampLtz.isCompact(precision)) {
if (precision == 0) {
// truncate to seconds to keep consistence with ArrowTimestampLtzWriter
writeLong(value.getEpochMillisecond() / 1000 * 1000);
} else if (TimestampLtz.isCompact(precision)) {
writeLong(value.getEpochMillisecond());
} else {
writeLong(value.getEpochMillisecond());
Expand Down Expand Up @@ -301,9 +306,20 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
fieldWriter = (writer, pos, value) -> writer.writeInt((int) value);
break;
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(fieldType);
fieldWriter =
(writer, pos, value) -> {
if (timePrecision == 0) {
// truncate to seconds to keep consistence with ArrowTimeWriter
writer.writeInt((int) value / 1000 * 1000);
} else {
writer.writeInt((int) value);
}
};
break;
case BIGINT:
fieldWriter = (writer, pos, value) -> writer.writeLong((long) value);
break;
Expand Down
Loading