-
Notifications
You must be signed in to change notification settings - Fork 2.4k
feat(schema): Migrate hudi-flink to use HoodieSchema instead of avro Schema #14355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| this.gateway = new MockOperatorEventGateway(); | ||
| this.conf = conf; | ||
| this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType(); | ||
| this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).getAvroSchema()).getLogicalType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much work would be involved in adding a schema converter to convert from HoodieSchema to the RowType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will look into this
|
|
||
| writeClient.addColumn("salary", doubleType, null, "name", AFTER); | ||
| // Create nullable primitive types using HoodieSchema | ||
| HoodieSchema intType = HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.INT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a helper method that allows you to just call HoodieSchema.createNullable(HoodieSchemaType.INT) for easy test setup
| private static Schema getRecordAvroSchema(String schemaStr) { | ||
| Schema recordSchema = new Schema.Parser().parse(schemaStr); | ||
| return AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType()); | ||
| private static HoodieSchema getRecordAvroSchema(String schemaStr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this to getRecordSchema?
|
|
||
| @VisibleForTesting | ||
| public Schema getTableAvroSchema() { | ||
| public HoodieSchema getTableHoodieSchema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just call this getTableSchema?
| Schema avroSchema = schemaResolver.getTableAvroSchema(); | ||
| return HoodieSchema.fromAvroSchema(avroSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll have a few PRs all doing this same operation. What do you think about just pushing this down to the TableSchemaResolver so we don't have to modify as much code later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@the-other-tim-brown Wondering if you can help me better understand the ask, is it just for schemaResolver to have some helper called getTableSchema which basically just delegates to getTableAvroSchema but then wraps it around the HoodieSchema.fromAvroSchema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on offline discussion with @the-other-tim-brown my understanding is that we will raise this as part of a seperate pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@the-other-tim-brown let me know if this pr:#17456 this aligns with what you said above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets update this to call the new method in the table schema resolver
| private HoodieSchema inferSchemaFromDdl() { | ||
| Schema avroSchema = AvroSchemaConverter.convertToSchema(this.tableRowType); | ||
| HoodieSchema schema = HoodieSchema.fromAvroSchema(avroSchema); | ||
| return HoodieSchemaUtils.addMetadataFields(schema,conf.get(FlinkOptions.CHANGELOG_ENABLED)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: add space after comma
| FileSystem fs = HadoopFSUtils.getFs(sourceSchemaFile, HadoopConfigurations.getHadoopConf(new Configuration())); | ||
| try { | ||
| this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaFile))); | ||
| this.sourceSchema = new HoodieSchema.Parser().parse(fs.open(new Path(sourceSchemaFile))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you did not set this up this way but currently this code is failing to close the input stream provided to the parsers. Let's make sure we close these while we're updating this code.
| // get full schema iterator. | ||
| final Schema tableSchema = AvroSchemaCache.intern(new Schema.Parser().parse(tableState.getAvroSchema())); | ||
| final HoodieSchema schema = HoodieSchemaCache.intern( | ||
| new HoodieSchema.Parser().parse(tableState.getAvroSchema())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an FYI, there is HoodieSchema.parse for a less verbose way of doing this
| if (hoodieSchema instanceof HoodieSchema.Decimal) { | ||
| return convertDecimal(hoodieSchema); | ||
| } else if (hoodieSchema instanceof HoodieSchema.Timestamp) { | ||
| return convertTimestamp(hoodieSchema); | ||
| } else if (hoodieSchema instanceof HoodieSchema.Time) { | ||
| return convertTime(hoodieSchema); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using instanceOf just use the switch on the type below
| private static HoodieSchema nullableSchema(HoodieSchema schema) { | ||
| return schema.isNullable() | ||
| ? schema | ||
| : HoodieSchema.createNullable(schema); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HoodieSchema.createNullable will already check if the existing schema is nullable for you.
| // Time | ||
| HoodieSchema timeSchema = HoodieSchemaConverter.convertToSchema( | ||
| DataTypes.TIME(3).notNull().getLogicalType()); | ||
| assertEquals(HoodieSchemaType.TIME, timeSchema.getType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the time precision here as well and add a test for Time-Micros
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will make this change.
Just one thing to callout here in case someone from hudi flink review this as well. I noticed that the AvroSchemaConverter in the hudi flink module says it does not support micros for Time.https://github.com/apache/hudi/blob/master/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java#L291
However when checking the avro spec I think time technically time is supported with micros
https://avro.apache.org/docs/1.8.0/spec.html#Time+%28microsecond+precision%29, so will add this functionaltiy in the HoodieSchemaConverter class
| } | ||
|
|
||
| @Test | ||
| public void testArrayType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's test arrays and maps with nullable elements as well as non-null
| DataTypes.FIELD("decimal_col", DataTypes.DECIMAL(10, 2).notNull()) | ||
| ).notNull().getLogicalType(); | ||
|
|
||
| HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a conversion for a decimal backed by a fixed size byte array
| ).notNull()) | ||
| ).notNull().getLogicalType(); | ||
|
|
||
| HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(flinkRowType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we start by generating a HoodieSchema directly for some of these tests?
|
@the-other-tim-brown addressed feedback, let me know if something does not look ideal or if we the pr is close to landing |
| // Read from file source | ||
| RowType rowType = | ||
| (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) | ||
| (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).getAvroSchema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the new converter be used here?
| Schema avroSchema = schemaResolver.getTableAvroSchema(); | ||
| return HoodieSchema.fromAvroSchema(avroSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets update this to call the new method in the table schema resolver
| final Schema tableAvroSchema = getTableAvroSchema(); | ||
| final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); | ||
| final HoodieSchema tableSchema = getTableSchema(); | ||
| //TODO make a convertor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this TODO?
| public HoodieSchema getSchema() { | ||
| if (fileSchema == null) { | ||
| fileSchema = AvroSchemaConverter.convertToSchema(getRowType().notNull().getLogicalType()); | ||
| //TODO to create a converter for HoodieSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this TODO?
|
|
||
| @Override | ||
| public ClosableIterator<HoodieRecord<RowData>> getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) throws IOException { | ||
| //TODO boundary to follow up in later pr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this TODO be removed now?
| @Override | ||
| public ClosableIterator<String> getRecordKeyIterator() throws IOException { | ||
| Schema schema = HoodieAvroUtils.getRecordKeySchema(); | ||
| //TODO add a util for this in HoodieSchemaUtils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we address this now? Should be a fairly small change
| assert (requiredFields.size() == requiredPos.length); | ||
| Iterator<Integer> positionIterator = Arrays.stream(requiredPos).iterator(); | ||
| requiredFields.forEach(f -> recordBuilder.set(f, getVal(record, positionIterator.next()))); | ||
| requiredFields.forEach(f -> recordBuilder.set(f.name(), getVal(record, positionIterator.next()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the position here instead of the name? Setting with the name requires an extra lookup.
|
|
||
| private int[] getRequiredPos(String tableSchema, Schema required) { | ||
| Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableSchema)); | ||
| Schema dataSchema = HoodieAvroUtils.removeMetadataFields(HoodieSchema.parse(tableSchema).getAvroSchema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this work on HoodieSchema?
| public static HoodieSchema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception { | ||
| TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); | ||
| return schemaUtil.getTableAvroSchema(includeMetadataFields); | ||
| return HoodieSchema.fromAvroSchema(schemaUtil.getTableAvroSchema(includeMetadataFields)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's call the new method here as well to directly return the HoodieSchema
| final HoodieSchema tableSchema; | ||
| try { | ||
| tableAvroSchema = schemaResolver.getTableAvroSchema(); | ||
| tableSchema = HoodieSchema.fromAvroSchema(schemaResolver.getTableAvroSchema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call the new method here as well?
|
@danny0405 Can you take a look at this PR, specifically the main area would be the |
| } | ||
|
|
||
| private static DataType convertTime(HoodieSchema schema) { | ||
| if (!(schema instanceof HoodieSchema.Time)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can check the HoodieSchemaType instead of instanceof ?
| } | ||
|
|
||
| private static DataType convertTimestamp(HoodieSchema schema) { | ||
| if (!(schema instanceof HoodieSchema.Timestamp)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can check the HoodieSchemaType instead of instanceof ?
| try { | ||
| this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaFile))); | ||
| try (InputStream stream = fs.open(new Path(sourceSchemaFile))) { | ||
| this.sourceSchema = new HoodieSchema.Parser().parse(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have HoodieSchema.parse(
Describe the issue this Pull Request addresses
Issue: #14273
Schemawithin thehudi-flink-datasourceto useHoodieSchemainstead.Summary and Changelog
Schemawithin thehudi-flink-datasourceto useHoodieSchema.Impact
medium as we are changing apis to now take in
HoodieSchemain this code path, we have CI as a sanity check to ensure functionality is preserved.Risk Level
Medium
Documentation Update
None
Contributor's checklist