Skip to content

Commit

Permalink
NIFI-11739: Handle missing but required fields
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Sep 26, 2023
1 parent b90c064 commit 68fc5ee
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.iceberg;

import org.apache.nifi.components.DescribedValue;

public enum UnmatchedColumnBehavior implements DescribedValue {
IGNORE_UNMATCHED_COLUMN("Ignore Unmatched Columns",
"Any column in the database that does not have a field in the document will be assumed to not be required. No notification will be logged"),

WARNING_UNMATCHED_COLUMN("Warn on Unmatched Columns",
"Any column in the database that does not have a field in the document will be assumed to not be required. A warning will be logged"),

FAIL_UNMATCHED_COLUMN("Fail on Unmatched Columns",
"A flow will fail if any column in the database that does not have a field in the document. An error will be logged");


private final String displayName;
private final String description;

UnmatchedColumnBehavior(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.iceberg.UnmatchedColumnBehavior;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
Expand All @@ -35,8 +36,6 @@
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;

import static org.apache.nifi.processors.iceberg.PutIceberg.UnmatchedColumnBehavior;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -185,7 +184,14 @@ public DataType fieldPartner(DataType dataType, int fieldId, String name) {
}
}
// If the field is missing, use the expected type from the schema (converted to a DataType)
return GenericDataConverters.convertSchemaTypeToDataType(schema.findField(fieldId).type());
final Types.NestedField schemaField = schema.findField(fieldId);
final Type schemaFieldType = schemaField.type();
if(schemaField.isRequired()) {
// Iceberg requires a non-null value for required fields
throw new IllegalArgumentException("Iceberg requires a non-null value for required fields, field: "
+ schemaField.name() + ", type: " + schemaFieldType);
}
return GenericDataConverters.convertSchemaTypeToDataType(schemaFieldType);
}
final Optional<RecordField> recordField = recordType.getChildSchema().getField(mappedFieldName.get());
final RecordField field = recordField.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
Expand Down Expand Up @@ -366,39 +365,4 @@ void abort(DataFile[] dataFiles, Table table) {
.retry(3)
.run(file -> table.io().deleteFile(file.path().toString()));
}

public enum UnmatchedColumnBehavior implements DescribedValue {
IGNORE_UNMATCHED_COLUMN("Ignore Unmatched Columns",
"Any column in the database that does not have a field in the document will be assumed to not be required. No notification will be logged"),

WARNING_UNMATCHED_COLUMN("Warn on Unmatched Columns",
"Any column in the database that does not have a field in the document will be assumed to not be required. A warning will be logged"),

FAIL_UNMATCHED_COLUMN("Fail on Unmatched Columns",
"A flow will fail if any column in the database that does not have a field in the document. An error will be logged");


private final String displayName;
private final String description;

UnmatchedColumnBehavior(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.apache.nifi.processors.iceberg.PutIceberg.UnmatchedColumnBehavior;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -148,6 +147,24 @@ public void tearDown() {
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);

private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new Schema(
Types.NestedField.optional(0, "string", Types.StringType.get()),
Types.NestedField.required(1, "integer", Types.IntegerType.get()),
Types.NestedField.required(2, "float", Types.FloatType.get()),
Types.NestedField.required(3, "long", Types.LongType.get()),
Types.NestedField.optional(4, "double", Types.DoubleType.get()),
Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)),
Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
Types.NestedField.optional(9, "date", Types.DateType.get()),
Types.NestedField.optional(10, "time", Types.TimeType.get()),
Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);

private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema(
Types.NestedField.optional(0, "string", Types.StringType.get()),
Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
Expand Down Expand Up @@ -225,7 +242,7 @@ private static RecordSchema getMapSchema() {
private static RecordSchema getPrimitivesSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("integer", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("integer", RecordFieldType.INT.getDataType(), true));
fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
Expand Down Expand Up @@ -589,6 +606,18 @@ public void testPrimitivesIgnoreMissingFields(FileFormat format) throws IOExcept
assertNull(resultRecord.get(14, Integer.class));
}

@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testPrimitivesMissingRequiredFields(FileFormat format) {
RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
Record record = setupPrimitivesTestRecordMissingFields();
MockComponentLogger mockComponentLogger = new MockComponentLogger();

assertThrows(IllegalArgumentException.class,
() -> new IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger));
}

@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
Expand Down

0 comments on commit 68fc5ee

Please sign in to comment.