Skip to content

Commit

Permalink
NIFI-12023: Add FastCSV parser to CSVReader
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Sep 13, 2023
1 parent 24736f6 commit 2ab0b78
Show file tree
Hide file tree
Showing 9 changed files with 733 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public class CSVUtils {
"* Apache Commons CSV - duplicate headers will result in column data \"shifting\" right with new fields " +
"created for \"unknown_field_index_X\" where \"X\" is the CSV column index number\n" +
"* Jackson CSV - duplicate headers will be de-duplicated with the field value being that of the right-most " +
"duplicate CSV column\n" +
"* FastCSV - duplicate headers will be de-duplicated with the field value being that of the left-most " +
"duplicate CSV column")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
<dependency>
<groupId>de.siegmar</groupId>
<artifactId>fastcsv</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down Expand Up @@ -183,8 +188,11 @@

<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_escapechar.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_spec_delimiter.csv</exclude>
<exclude>src/test/resources/csv/prov-events.csv</exclude>
<exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ protected final Object convertSimpleIfPossible(final String value, final DataTyp
return value;
}

private String trim(String value) {
protected String trim(String value) {
return (value.length() > 1) && value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,20 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
public static final AllowableValue JACKSON_CSV = new AllowableValue("jackson-csv", "Jackson CSV",
"The CSV parser implementation from the Jackson Dataformats library.");

public static final AllowableValue FAST_CSV = new AllowableValue("fast-csv", "FastCSV",
"The CSV parser implementation from the FastCSV library. NOTE: This parser only officially supports RFC-4180, so it recommended to "
+ "set the 'CSV Format' property to 'RFC 4180'. It does handle some non-compliant CSV data, for that case set the 'CSV Format' property to "
+ "'CUSTOM' and the other custom format properties (such as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this "
+ "may cause errors if FastCSV doesn't handle the property settings correctly (such as 'Ignore Header'), but otherwise may process the input as expected even "
+ "if the data is not fully RFC-4180 compliant.");

public static final PropertyDescriptor CSV_PARSER = new PropertyDescriptor.Builder()
.name("csv-reader-csv-parser")
.displayName("CSV Parser")
.description("Specifies which parser to use to read CSV records. NOTE: Different parsers may support different subsets of functionality "
+ "and may also exhibit different levels of performance.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV)
.allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV, FAST_CSV)
.defaultValue(APACHE_COMMONS_CSV.getValue())
.required(true)
.build();
Expand Down Expand Up @@ -175,6 +181,8 @@ public RecordReader createRecordReader(final Map<String, String> variables, fina
return new CSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
} else if (JACKSON_CSV.getValue().equals(csvParser)) {
return new JacksonCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
} else if (FAST_CSV.getValue().equals(csvParser)) {
return new FastCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
} else {
throw new IOException("Parser not supported");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.csv;

import de.siegmar.fastcsv.reader.CommentStrategy;
import de.siegmar.fastcsv.reader.CsvReader;
import de.siegmar.fastcsv.reader.CsvRow;
import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;

public class FastCSVRecordReader extends AbstractCSVRecordReader {
private final CsvReader csvReader;
private final Iterator<CsvRow> csvRowIterator;

private List<RecordField> recordFields;

private Map<String, Integer> headerMap;

private final boolean ignoreHeader;
private final boolean trimDoubleQuote;
private final CSVFormat csvFormat;

public FastCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote);
this.ignoreHeader = ignoreHeader;
this.trimDoubleQuote = trimDoubleQuote;
this.csvFormat = csvFormat;

CsvReader.CsvReaderBuilder builder = CsvReader.builder()
.fieldSeparator(csvFormat.getDelimiter())
.quoteCharacter(csvFormat.getQuoteCharacter())
.commentStrategy(CommentStrategy.SKIP)
.skipEmptyRows(csvFormat.getIgnoreEmptyLines())
.errorOnDifferentFieldCount(!csvFormat.getAllowMissingColumnNames());

if (csvFormat.getCommentMarker() != null) {
builder.commentCharacter(csvFormat.getCommentMarker());
}

if (hasHeader && !ignoreHeader) {
headerMap = null;
} else {
headerMap = new HashMap<>();
for (int i = 0; i < schema.getFieldCount(); i++) {
headerMap.put(schema.getField(i).getFieldName(), i);
}
}

csvReader = builder.build(new InputStreamReader(in, encoding));
csvRowIterator = csvReader.iterator();
}

@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {

try {
final RecordSchema schema = getSchema();

final List<RecordField> recordFields = getRecordFields();
final int numFieldNames = recordFields.size();
if (!csvRowIterator.hasNext()) {
return null;
}
final CsvRow csvRecord = csvRowIterator.next();
final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2);
for (int i = 0; i < csvRecord.getFieldCount(); i++) {
String rawValue = csvRecord.getField(i);
if (csvFormat.getTrim()) {
rawValue = rawValue.trim();
}
if (trimDoubleQuote) {
rawValue = trim(rawValue);
}

final String rawFieldName;
final DataType dataType;
if (i >= numFieldNames) {
if (!dropUnknownFields) {
values.put("unknown_field_index_" + i, rawValue);
}
continue;
} else {
final RecordField recordField = recordFields.get(i);
rawFieldName = recordField.getFieldName();
dataType = recordField.getDataType();
}

final Object value;
if (coerceTypes) {
value = convert(rawValue, dataType, rawFieldName);
} else {
// The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
// dictate a field type. As a result, we will use the schema that we have to attempt to convert
// the value into the desired type if it's a simple type.
value = convertSimpleIfPossible(rawValue, dataType, rawFieldName);
}

values.putIfAbsent(rawFieldName, value);
}

return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
} catch (Exception e) {
throw new MalformedRecordException("Error while getting next record", e);
}
}


private List<RecordField> getRecordFields() {
if (this.recordFields != null) {
return this.recordFields;
}

if (ignoreHeader) {
logger.debug("With 'Ignore Header' set to true, FastCSV still reads the header and keeps track "
+ "of the number of fields in the header. This will cause an error if the provided schema does not "
+ "have the same number of fields, as this is not conformant to RFC-4180");
}

// When getting the field names from the first record, it has to be read in
if (!csvRowIterator.hasNext()) {
return Collections.emptyList();
}
CsvRow headerRow = csvRowIterator.next();
headerMap = new HashMap<>();
for (int i = 0; i < headerRow.getFieldCount(); i++) {
String rawValue = headerRow.getField(i);
if (csvFormat.getTrim()) {
rawValue = rawValue.trim();
}
if (this.trimDoubleQuote) {
rawValue = trim(rawValue);
}
headerMap.put(rawValue, i);
}


// Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order
final SortedMap<Integer, String> sortedMap = new TreeMap<>();
for (final Map.Entry<String, Integer> entry : headerMap.entrySet()) {
sortedMap.put(entry.getValue(), entry.getKey());
}

final List<RecordField> fields = new ArrayList<>();
final List<String> rawFieldNames = new ArrayList<>(sortedMap.values());
for (final String rawFieldName : rawFieldNames) {
final Optional<RecordField> option = schema.getField(rawFieldName);
if (option.isPresent()) {
fields.add(option.get());
} else {
fields.add(new RecordField(rawFieldName, RecordFieldType.STRING.getDataType()));
}
}

this.recordFields = fields;
return fields;
}

@Override
public void close() throws IOException {
csvReader.close();
}
}
Loading

0 comments on commit 2ab0b78

Please sign in to comment.