Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.schema;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

/**
* A global cache for HoodieSchema instances to ensure that there is only one
* variable instance of the same schema within an entire JVM lifetime.
*
* <p>This is a global cache which works for a JVM lifecycle.
* A collection of schema instances are maintained.
*
* <p>NOTE: The schema which is used frequently should be cached through this cache.
*/
public class HoodieSchemaCache {

// Ensure that there is only one variable instance of the same schema within an entire JVM lifetime
private static final LoadingCache<HoodieSchema, HoodieSchema> SCHEMA_CACHE =
Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);

/**
* Get schema variable from global cache. If not found, put it into the cache and then return it.
* @param schema schema to get
* @return if found, return the exist schema variable, otherwise return the param itself.
*/
public static HoodieSchema intern(HoodieSchema schema) {
return SCHEMA_CACHE.get(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;

import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -61,20 +61,20 @@ public static class Config {
.withDocumentation("The schema of the target you are writing to");
}

private final Schema sourceSchema;
private final HoodieSchema sourceSchema;

private Schema targetSchema;
private HoodieSchema targetSchema;

@Deprecated
public FilebasedSchemaProvider(TypedProperties props) {
checkRequiredConfigProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE));
String sourceSchemaFile = getStringWithAltKeys(props, Config.SOURCE_SCHEMA_FILE);
FileSystem fs = HadoopFSUtils.getFs(sourceSchemaFile, HadoopConfigurations.getHadoopConf(new Configuration()));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaFile)));
this.sourceSchema = new HoodieSchema.Parser().parse(fs.open(new Path(sourceSchemaFile)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you did not set this up this way but currently this code is failing to close the input stream provided to the parsers. Let's make sure we close these while we're updating this code.

if (containsConfigProperty(props, Config.TARGET_SCHEMA_FILE)) {
this.targetSchema =
new Schema.Parser().parse(fs.open(new Path(getStringWithAltKeys(props, Config.TARGET_SCHEMA_FILE))));
new HoodieSchema.Parser().parse(fs.open(new Path(getStringWithAltKeys(props, Config.TARGET_SCHEMA_FILE))));
}
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
Expand All @@ -85,19 +85,19 @@ public FilebasedSchemaProvider(Configuration conf) {
final String sourceSchemaPath = conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
final FileSystem fs = HadoopFSUtils.getFs(sourceSchemaPath, HadoopConfigurations.getHadoopConf(conf));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath)));
this.sourceSchema = new HoodieSchema.Parser().parse(fs.open(new Path(sourceSchemaPath)));
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
}
}

@Override
public Schema getSourceSchema() {
public HoodieSchema getSourceSchema() {
return sourceSchema;
}

@Override
public Schema getTargetSchema() {
public HoodieSchema getTargetSchema() {
if (targetSchema != null) {
return targetSchema;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.schema;

import org.apache.avro.Schema;
import org.apache.hudi.common.schema.HoodieSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on discussion for the Hudi-Utilities SchemaProviders, we should preserve the existing APIs for now to ensure user's have an easy upgrade path. See my suggestions on how to proceed on this other PR: #14382 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made changes based on the comments. Can reference that (if it's correct)


import java.io.Serializable;

Expand All @@ -29,9 +29,9 @@ public abstract class SchemaProvider implements Serializable {

private static final long serialVersionUID = 1L;

public abstract Schema getSourceSchema();
public abstract HoodieSchema getSourceSchema();

public Schema getTargetSchema() {
public HoodieSchema getTargetSchema() {
// by default, use source schema as target for hoodie table as well
return getSourceSchema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.exception.HoodieIOException;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -110,12 +110,12 @@ public SchemaRegistryProvider(TypedProperties props) {
checkRequiredConfigProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL));
}

private Schema getSchema(String registryUrl) throws IOException {
return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
private HoodieSchema getSchema(String registryUrl) throws IOException {
return new HoodieSchema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
}

@Override
public Schema getSourceSchema() {
public HoodieSchema getSourceSchema() {
String registryUrl = getStringWithAltKeys(config, Config.SRC_SCHEMA_REGISTRY_URL);
try {
return getSchema(registryUrl);
Expand All @@ -125,7 +125,7 @@ public Schema getSourceSchema() {
}

@Override
public Schema getTargetSchema() {
public HoodieSchema getTargetSchema() {
String registryUrl = getStringWithAltKeys(config, Config.SRC_SCHEMA_REGISTRY_URL);
String targetRegistryUrl = getStringWithAltKeys(
config, Config.TARGET_SCHEMA_REGISTRY_URL, registryUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
Expand All @@ -44,7 +45,6 @@
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.RuntimeContextUtils;

import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand Down Expand Up @@ -217,7 +217,8 @@ protected void loadRecords(String partitionPath) throws Exception {
Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedAndCompactionInstants().lastInstant();

if (latestCommitTime.isPresent()) {
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
HoodieSchema schema = HoodieSchema.fromAvroSchema(
new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema());

List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().requestedTime())
Expand Down Expand Up @@ -250,7 +251,7 @@ protected void loadRecords(String partitionPath) throws Exception {
*
* @return A record key iterator for the file slice.
*/
private ClosableIterator<String> getRecordKeyIterator(FileSlice fileSlice, Schema tableSchema) throws IOException {
private ClosableIterator<String> getRecordKeyIterator(FileSlice fileSlice, HoodieSchema tableSchema) throws IOException {
FileSlice scanFileSlice = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
// filter out crushed base file
fileSlice.getBaseFile().map(f -> isValidFile(f.getPathInfo()) ? f : null).ifPresent(scanFileSlice::setBaseFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.CloseableConcatenatingIterator;
Expand All @@ -32,6 +31,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -59,7 +59,6 @@
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.RuntimeContextUtils;

import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
Expand Down Expand Up @@ -107,8 +106,8 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
private int taskID;
private transient HoodieWriteConfig writeConfig;
private transient HoodieFlinkTable<?> table;
private transient Schema schema;
private transient Schema readerSchema;
private transient HoodieSchema schema;
private transient HoodieSchema readerSchema;
private transient HoodieFlinkWriteClient writeClient;
private transient StreamRecordCollector<ClusteringCommitEvent> collector;
private transient BinaryRowDataSerializer binarySerializer;
Expand Down Expand Up @@ -170,11 +169,12 @@ public void open() throws Exception {
this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
this.table = writeClient.getHoodieTable();

this.schema = AvroSchemaConverter.convertToSchema(rowType);
//TODO make a converter class for HoodieSchemaConverter
this.schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(rowType));
// Since there exists discrepancies between flink and spark dealing with nullability of primary key field,
// and there may be some files written by spark, force update schema as nullable to make sure clustering
// scan successfully without schema validating exception.
this.readerSchema = AvroSchemaUtils.asNullable(schema);
this.readerSchema = HoodieSchemaUtils.createNullableSchema(schema);

this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount());

Expand Down Expand Up @@ -312,8 +312,7 @@ private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation>
HoodieRowDataParquetReader fileReader = (HoodieRowDataParquetReader) fileReaderFactory.getFileReader(
table.getConfig(), new StoragePath(clusteringOp.getDataFilePath()));

//TODO boundary to revisit in later pr to use HoodieSchema directly
return new CloseableMappingIterator<>(fileReader.getRecordIterator(HoodieSchema.fromAvroSchema(readerSchema)), HoodieRecord::getData);
return new CloseableMappingIterator<>(fileReader.getRecordIterator(readerSchema), HoodieRecord::getData);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -39,7 +40,6 @@
import org.apache.hudi.util.StreamerUtil;

import com.beust.jcommander.JCommander;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.deployment.application.ApplicationExecutionException;
Expand Down Expand Up @@ -319,8 +319,8 @@ private void cluster() throws Exception {
// Mark instant as clustering inflight
ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight(instant, Option.empty(), table.getActiveTimeline());

final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final HoodieSchema tableSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableSchema.getAvroSchema());
final RowType rowType = (RowType) rowDataType.getLogicalType();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static void main(String[] args) throws Exception {
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
// Read from kafka source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).getAvroSchema())
.getLogicalType();

long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
Expand Down
Loading
Loading