Skip to content

Commit

Permalink
NIFI-8932: Add capability to skip first N rows in CSVReader
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Oct 29, 2023
1 parent 8e301cf commit 8f3af0c
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
<exclude>src/test/resources/csv/multi-bank-account_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account_skip_top_rows.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_escapechar.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_spec_delimiter.csv</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ abstract public class AbstractCSVRecordReader implements RecordReader {
protected final boolean ignoreHeader;
private final boolean trimDoubleQuote;

protected final int skipTopRows;

protected final Supplier<DateFormat> LAZY_DATE_FORMAT;
protected final Supplier<DateFormat> LAZY_TIME_FORMAT;
protected final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
Expand All @@ -44,12 +46,13 @@ abstract public class AbstractCSVRecordReader implements RecordReader {
protected final RecordSchema schema;

AbstractCSVRecordReader(final ComponentLog logger, final RecordSchema schema, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean trimDoubleQuote) {
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean trimDoubleQuote, final int skipTopRows) {
this.logger = logger;
this.schema = schema;
this.hasHeader = hasHeader;
this.ignoreHeader = ignoreHeader;
this.trimDoubleQuote = trimDoubleQuote;
this.skipTopRows = skipTopRows;

if (dateFormat == null || dateFormat.isEmpty()) {
this.dateFormat = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
"The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and assuming that all columns are of type String.");

public static final PropertyDescriptor SKIP_TOP_ROWS = new PropertyDescriptor.Builder()
.name("skip-top-rows")
.displayName("Skip Top Rows")
.description("The maximum number of lines/records (based on the specified record separator) to skip/ignore before processing. "
+ "Use this to skip over lines at the top of a CSV file that are not part of the dataset. If the value of this property is larger than the "
+ "total number of records in the FlowFile, no records will be returned. Note that CSV-related properties "
+ "such as 'Ignore Header Line' are applied after the rows are skipped.")
.required(true)
.defaultValue("0")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();

// CSV parsers
public static final AllowableValue APACHE_COMMONS_CSV = new AllowableValue("commons-csv", "Apache Commons CSV",
"The CSV parser implementation from the Apache Commons CSV library.");
Expand Down Expand Up @@ -102,6 +115,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
private volatile ConfigurationContext context;

private volatile String csvParser;
private volatile int skipTopRows;
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
Expand All @@ -119,6 +133,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
properties.add(SKIP_TOP_ROWS);
properties.add(CSVUtils.CSV_FORMAT);
properties.add(CSVUtils.VALUE_SEPARATOR);
properties.add(CSVUtils.RECORD_SEPARATOR);
Expand Down Expand Up @@ -176,13 +191,14 @@ public RecordReader createRecordReader(final Map<String, String> variables, fina
}

final boolean trimDoubleQuote = context.getProperty(TRIM_DOUBLE_QUOTE).asBoolean();
this.skipTopRows = context.getProperty(SKIP_TOP_ROWS).evaluateAttributeExpressions(variables).asInteger();

if (APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
return new CSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
return new CSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote, skipTopRows);
} else if (JACKSON_CSV.getValue().equals(csvParser)) {
return new JacksonCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
return new JacksonCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote, skipTopRows);
} else if (FAST_CSV.getValue().equals(csvParser)) {
return new FastCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote);
return new FastCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet, trimDoubleQuote, skipTopRows);
} else {
throw new IOException("Parser not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ public class CSVRecordReader extends AbstractCSVRecordReader {
private List<RecordField> recordFields;

public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote);
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote, final int skipTopRows)
throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows);

final InputStream bomInputStream = BOMInputStream.builder().setInputStream(in).get();
final Reader reader = new InputStreamReader(bomInputStream, encoding);
final Reader inputStreamReader = new InputStreamReader(bomInputStream);

// Skip the number of rows at the "top" as specified
for (int i = 0; i < skipTopRows; i++) {
readNextRecord(inputStreamReader, csvFormat.getRecordSeparator());
}

CSVFormat.Builder withHeader;
if (hasHeader) {
Expand All @@ -66,12 +72,12 @@ public CSVRecordReader(final InputStream in, final ComponentLog logger, final Re
withHeader = csvFormat.builder().setHeader(schema.getFieldNames().toArray(new String[0]));
}

csvParser = new CSVParser(reader, withHeader.build());
csvParser = new CSVParser(inputStreamReader, withHeader.build());
}

public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException {
this(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, true);
this(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, true, 0);
}

@Override
Expand All @@ -93,15 +99,13 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie
if (!dropUnknownFields) {
values.put("unknown_field_index_" + i, rawValue);
}

continue;
} else {
final RecordField recordField = recordFields.get(i);
rawFieldName = recordField.getFieldName();
dataType = recordField.getDataType();
}


final Object value;
if (coerceTypes) {
value = convert(rawValue, dataType, rawFieldName);
Expand Down Expand Up @@ -155,4 +159,59 @@ private List<RecordField> getRecordFields() {
public void close() throws IOException {
csvParser.close();
}

/**
* This method builds a text representation of the CSV record by searching character-by-character until the
* record separator is found. Because we never want to consume input we don't use, the method attempts to match
* the separator separately, and as it is not matched, the characters are added to the returned string.
* @param reader the Reader providing the input
* @param recordSeparator the String specifying the end of a record in the input
* @return a String created from the input until the record separator is reached.
* @throws IOException if an error occurs during reading
*/
protected String readNextRecord(Reader reader, String recordSeparator) throws IOException {
int indexIntoSeparator = 0;
int recordSeparatorLength = recordSeparator.length();
StringBuilder lineBuilder = new StringBuilder();
StringBuilder separatorBuilder = new StringBuilder();
int code = reader.read();
while (code != -1) {
char nextChar = (char)code;
if (recordSeparator.charAt(indexIntoSeparator) == nextChar) {
separatorBuilder.append(nextChar);
if (++indexIntoSeparator == recordSeparatorLength) {
// We have matched the separator, return the string built so far
lineBuilder.append(separatorBuilder);
return lineBuilder.toString();
}
} else {
// The character didn't match the expected one in the record separator, reset the separator matcher
// and check if it is the first character of the separator.
indexIntoSeparator = 0;
if (recordSeparator.charAt(indexIntoSeparator) == nextChar) {
// This character is the beginning of the record separator, keep it
separatorBuilder = new StringBuilder();
separatorBuilder.append(nextChar);
if (++indexIntoSeparator == recordSeparatorLength) {
// We have matched the separator, return the string built so far
return lineBuilder.toString();
}
} else {
// This character is not the beginning of the record separator, add it to the return string
lineBuilder.append(nextChar);
}
}
// This defensive check limits a record size to 2GB, this prevents out-of-memory errors if the record separator
// is not present in the input (or at least in the first 2GB)
if (indexIntoSeparator == Integer.MAX_VALUE) {
throw new IOException("2GB input threshold reached, the record is either larger than 2GB or the separator "
+ "is not found in the first 2GB of input. Ensure the Record Separator is correct for this FlowFile.");
}
code = reader.read();
}

// The end of input has been reached without the record separator being found, throw an exception with the string so far

return lineBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ public class FastCSVRecordReader extends AbstractCSVRecordReader {
private final CSVFormat csvFormat;

public FastCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote);
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote,
final int skipTopRows) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows);
this.ignoreHeader = ignoreHeader;
this.trimDoubleQuote = trimDoubleQuote;
this.csvFormat = csvFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@
public class JacksonCSVRecordReader extends AbstractCSVRecordReader {
private final MappingIterator<String[]> recordStream;
private List<String> rawFieldNames = null;
private boolean allowDuplicateHeaderNames;
private final boolean allowDuplicateHeaderNames;

private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);
private static final CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);

public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote);
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding, final boolean trimDoubleQuote,
final int skipTopRows) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, trimDoubleQuote, skipTopRows);

final Reader reader = new InputStreamReader(BOMInputStream.builder().setInputStream(in).get(), encoding);

Expand Down Expand Up @@ -100,7 +101,7 @@ public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, f

public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException {
this(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, true);
this(in, logger, schema, csvFormat, hasHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, encoding, true, 0);
}

@Override
Expand Down
Loading

0 comments on commit 8f3af0c

Please sign in to comment.