Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.hive.formats.avro.model.AvroLogicalType.TimeMillisLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampMicrosLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampMillisLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampNanosLogicalType;
import io.trino.hive.formats.avro.model.AvroReadAction;
import io.trino.hive.formats.avro.model.AvroReadAction.LongIoFunction;
import io.trino.hive.formats.avro.model.BooleanRead;
Expand Down Expand Up @@ -149,7 +150,7 @@ else if (schema.getType() == Schema.Type.UNION) {
case BytesDecimalLogicalType bytesDecimalLogicalType -> getAvroLogicalTypeSpiType(bytesDecimalLogicalType);
case TimestampMillisLogicalType _ -> hiveSessionTimestamp;
// Other logical types ignored by hive/don't map to hive types
case FixedDecimalLogicalType _, TimeMicrosLogicalType _, TimeMillisLogicalType _, TimestampMicrosLogicalType _, StringUUIDLogicalType _ -> baseTypeFor(schema, this);
case FixedDecimalLogicalType _, TimeMicrosLogicalType _, TimeMillisLogicalType _, TimestampMicrosLogicalType _, TimestampNanosLogicalType _, StringUUIDLogicalType _ -> baseTypeFor(schema, this);
};
};
}
Expand Down Expand Up @@ -191,7 +192,7 @@ public BlockBuildingDecoder blockBuildingDecoderFor(AvroReadAction readAction)
// Hive only supports Bytes Decimal Type
case FixedDecimalLogicalType _ -> baseBlockBuildingDecoderWithUnionCoerceAndErrorDelays(readAction);
// Other logical types ignored by hive/don't map to hive types
case TimeMicrosLogicalType _, TimeMillisLogicalType _, TimestampMicrosLogicalType _, StringUUIDLogicalType _ -> baseBlockBuildingDecoderWithUnionCoerceAndErrorDelays(readAction);
case TimeMicrosLogicalType _, TimeMillisLogicalType _, TimestampMicrosLogicalType _, TimestampNanosLogicalType _, StringUUIDLogicalType _ -> baseBlockBuildingDecoderWithUnionCoerceAndErrorDelays(readAction);
};
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.hive.formats.avro.model.AvroLogicalType.TimeMillisLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampMicrosLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampMillisLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampNanosLogicalType;
import io.trino.hive.formats.avro.model.AvroReadAction;
import io.trino.hive.formats.avro.model.AvroReadAction.LongIoFunction;
import io.trino.hive.formats.avro.model.BytesRead;
Expand All @@ -33,6 +34,7 @@
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Int128;
import io.trino.spi.type.LongTimestamp;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import org.apache.avro.Schema;
Expand All @@ -54,8 +56,14 @@
import static io.trino.spi.type.TimeType.TIME_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.UuidType.UUID;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static java.lang.Math.divideExact;
import static java.lang.Math.floorMod;
import static java.lang.Math.multiplyExact;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -136,6 +144,12 @@ static BlockBuildingDecoder getLogicalTypeBuildingFunction(AvroLogicalType logic
}
throw new IllegalStateException("Unreachable unfiltered logical type");
}
case TimestampNanosLogicalType _ -> {
if (readAction instanceof LongRead longRead) {
yield new TimestampNanosBlockBuildingDecoder(longRead);
}
throw new IllegalStateException("Unreachable unfiltered logical type");
}
case StringUUIDLogicalType _ -> {
if (readAction instanceof StringRead) {
yield new StringUUIDBlockBuildingDecoder();
Expand Down Expand Up @@ -271,6 +285,27 @@ public void decodeIntoBlock(Decoder decoder, BlockBuilder builder)
}
}

public static class TimestampNanosBlockBuildingDecoder
implements BlockBuildingDecoder
{
private final LongIoFunction<Decoder> longDecoder;

public TimestampNanosBlockBuildingDecoder(LongRead longRead)
{
longDecoder = requireNonNull(longRead, "longRead is null").getLongDecoder();
}

@Override
public void decodeIntoBlock(Decoder decoder, BlockBuilder builder)
throws IOException
{
long epochNanos = longDecoder.apply(decoder);
long epochMicros = divideExact(epochNanos, NANOSECONDS_PER_MICROSECOND);
int picosOfMicro = multiplyExact(floorMod(epochNanos, NANOSECONDS_PER_MICROSECOND), PICOSECONDS_PER_NANOSECOND);
TIMESTAMP_NANOS.writeObject(builder, new LongTimestamp(epochMicros, picosOfMicro));
}
}

public record StringUUIDBlockBuildingDecoder()
implements BlockBuildingDecoder
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.hive.formats.avro.model.AvroLogicalType.TimeMillisLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampMicrosLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampMillisLogicalType;
import io.trino.hive.formats.avro.model.AvroLogicalType.TimestampNanosLogicalType;
import io.trino.spi.block.Block;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DecimalType;
Expand All @@ -50,19 +51,27 @@
import java.util.function.BiFunction;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.trino.hive.formats.avro.model.AvroLogicalType.DATE;
import static io.trino.hive.formats.avro.model.AvroLogicalType.DECIMAL;
import static io.trino.hive.formats.avro.model.AvroLogicalType.LOCAL_TIMESTAMP_MICROS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.LOCAL_TIMESTAMP_MILLIS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.LOCAL_TIMESTAMP_NANOS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.TIMESTAMP_MICROS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.TIMESTAMP_MILLIS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.TIMESTAMP_NANOS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.TIME_MICROS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.TIME_MILLIS;
import static io.trino.hive.formats.avro.model.AvroLogicalType.UUID;
import static io.trino.hive.formats.avro.model.AvroLogicalType.fromAvroLogicalType;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.roundDiv;
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
import static java.lang.Math.addExact;
import static java.lang.Math.divideExact;
import static java.lang.Math.multiplyExact;
import static java.util.Objects.requireNonNull;
import static org.apache.avro.LogicalTypes.fromSchemaIgnoreInvalid;

Expand All @@ -79,6 +88,7 @@ public class NativeLogicalTypesAvroTypeManager
public static final Schema TIME_MICROS_SCHEMA;
public static final Schema TIMESTAMP_MILLIS_SCHEMA;
public static final Schema TIMESTAMP_MICROS_SCHEMA;
public static final Schema TIMESTAMP_NANOS_SCHEMA;
public static final Schema UUID_SCHEMA;

static {
Expand All @@ -92,6 +102,8 @@ public class NativeLogicalTypesAvroTypeManager
LogicalTypes.timestampMillis().addToSchema(TIMESTAMP_MILLIS_SCHEMA);
TIMESTAMP_MICROS_SCHEMA = SchemaBuilder.builder().longType();
LogicalTypes.timestampMicros().addToSchema(TIMESTAMP_MICROS_SCHEMA);
TIMESTAMP_NANOS_SCHEMA = SchemaBuilder.builder().longType();
LogicalTypes.timestampNanos().addToSchema(TIMESTAMP_NANOS_SCHEMA);
UUID_SCHEMA = Schema.create(Schema.Type.STRING);
LogicalTypes.uuid().addToSchema(UUID_SCHEMA);
}
Expand All @@ -117,6 +129,7 @@ static Type getAvroLogicalTypeSpiType(AvroLogicalType avroLogicalType)
case TimeMicrosLogicalType __ -> TimeType.TIME_MICROS;
case TimestampMillisLogicalType __ -> TimestampType.TIMESTAMP_MILLIS;
case TimestampMicrosLogicalType __ -> TimestampType.TIMESTAMP_MICROS;
case TimestampNanosLogicalType __ -> TimestampType.TIMESTAMP_NANOS;
case StringUUIDLogicalType __ -> UuidType.UUID;
};
}
Expand All @@ -133,6 +146,7 @@ static Type getAvroLogicalTypeSpiType(LogicalType logicalType)
case TIME_MICROS -> TimeType.TIME_MICROS;
case TIMESTAMP_MILLIS -> TimestampType.TIMESTAMP_MILLIS;
case TIMESTAMP_MICROS -> TimestampType.TIMESTAMP_MICROS;
case TIMESTAMP_NANOS -> TimestampType.TIMESTAMP_NANOS;
case UUID -> UuidType.UUID;
default -> throw new IllegalStateException("Unreachable unfiltered logical type");
};
Expand Down Expand Up @@ -216,6 +230,17 @@ private static BiFunction<Block, Integer, Object> getAvroFunction(AvroLogicalTyp
};
}
}
case TimestampNanosLogicalType _ -> {
if (!(type instanceof TimestampType timestampType)) {
throw new AvroTypeException("Can't represent Avro logical type %s with Trino Type %s".formatted(logicalType, type));
}
checkState(!timestampType.isShort(), "Short timestamp type cannot represent nanosecond precision");
yield (block, position) ->
{
SqlTimestamp timestamp = ((SqlTimestamp) timestampType.getObjectValue(block, position));
return addExact(multiplyExact(timestamp.getEpochMicros(), NANOSECONDS_PER_MICROSECOND), divideExact(timestamp.getPicosOfMicros(), PICOSECONDS_PER_NANOSECOND));
};
}
case StringUUIDLogicalType _ -> {
if (!(type instanceof UuidType uuidType)) {
throw new AvroTypeException("Can't represent Avro logical type %s with Trino Type %s".formatted(logicalType, type));
Expand Down Expand Up @@ -253,10 +278,10 @@ public static ValidateLogicalTypeResult validateLogicalType(Schema schema)
}
LogicalType logicalType;
switch (typeName) {
case DATE, DECIMAL, TIME_MILLIS, TIME_MICROS, TIMESTAMP_MILLIS, TIMESTAMP_MICROS, UUID:
case DATE, DECIMAL, TIME_MILLIS, TIME_MICROS, TIMESTAMP_MILLIS, TIMESTAMP_MICROS, TIMESTAMP_NANOS, UUID:
logicalType = fromSchemaIgnoreInvalid(schema);
break;
case LOCAL_TIMESTAMP_MICROS, LOCAL_TIMESTAMP_MILLIS:
case LOCAL_TIMESTAMP_NANOS, LOCAL_TIMESTAMP_MICROS, LOCAL_TIMESTAMP_MILLIS:
log.debug("Logical type %s not currently supported by by Trino", typeName);
// fall through
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public sealed interface AvroLogicalType
AvroLogicalType.TimeMicrosLogicalType,
AvroLogicalType.TimestampMillisLogicalType,
AvroLogicalType.TimestampMicrosLogicalType,
AvroLogicalType.TimestampNanosLogicalType,
AvroLogicalType.StringUUIDLogicalType
{
// Copied from org.apache.avro.LogicalTypes
Expand All @@ -36,8 +37,10 @@ public sealed interface AvroLogicalType
String TIME_MICROS = "time-micros";
String TIMESTAMP_MILLIS = "timestamp-millis";
String TIMESTAMP_MICROS = "timestamp-micros";
String TIMESTAMP_NANOS = "timestamp-nanos";
String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";
String LOCAL_TIMESTAMP_NANOS = "local-timestamp-nanos";
String UUID = "uuid";

static AvroLogicalType fromAvroLogicalType(LogicalType logicalType, Schema schema)
Expand Down Expand Up @@ -80,6 +83,11 @@ static AvroLogicalType fromAvroLogicalType(LogicalType logicalType, Schema schem
return new TimestampMicrosLogicalType();
}
}
case TIMESTAMP_NANOS -> {
if (schema.getType() == Schema.Type.LONG) {
return new TimestampNanosLogicalType();
}
}
case UUID -> {
if (schema.getType() == Schema.Type.STRING) {
return new StringUUIDLogicalType();
Expand Down Expand Up @@ -110,6 +118,9 @@ record TimestampMillisLogicalType()
record TimestampMicrosLogicalType()
implements AvroLogicalType {}

record TimestampNanosLogicalType()
implements AvroLogicalType {}

record StringUUIDLogicalType()
implements AvroLogicalType {}
}
Loading