Skip to content

Commit

Permalink
NIFI-12024: Add CSV Writer property to CSVRecordSetWriter and a FastC…
Browse files Browse the repository at this point in the history
…SV writer implementation
  • Loading branch information
mattyb149 committed Sep 14, 2023
1 parent 7a79e8c commit 936b14f
Show file tree
Hide file tree
Showing 3 changed files with 646 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
Expand All @@ -43,9 +45,31 @@
+ "corresponding to the record fields.")
public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {

// CSV writer implementations
public static final AllowableValue APACHE_COMMONS_CSV = new AllowableValue("commons-csv", "Apache Commons CSV",
"The CSV writer implementation from the Apache Commons CSV library.");

public static final AllowableValue FAST_CSV = new AllowableValue("fast-csv", "FastCSV",
"The CSV writer implementation from the FastCSV library. NOTE: This writer only officially supports RFC-4180, so it recommended to "
+ "set the 'CSV Format' property to 'RFC 4180'. It does handle some non-compliant CSV data, for that case set the 'CSV Format' property to "
+ "'CUSTOM' and the other custom format properties (such as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this "
+ "may cause errors if FastCSV doesn't handle the property settings correctly (such as 'Ignore Header'), but otherwise may process the output as expected even "
+ "if the data is not fully RFC-4180 compliant.");
public static final PropertyDescriptor CSV_WRITER = new PropertyDescriptor.Builder()
.name("csv-writer")
.displayName("CSV Writer")
.description("Specifies which writer implementation to use to write CSV records. NOTE: Different writers may support different subsets of functionality "
+ "and may also exhibit different levels of performance.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(APACHE_COMMONS_CSV, FAST_CSV)
.defaultValue(APACHE_COMMONS_CSV.getValue())
.required(true)
.build();

private volatile ConfigurationContext context;

private volatile boolean includeHeader;
private volatile String csvWriter;
private volatile String charSet;

// it will be initialized only if there are no dynamic csv formatting properties
Expand All @@ -55,6 +79,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(CSVUtils.CSV_FORMAT);
properties.add(CSV_WRITER);
properties.add(CSVUtils.VALUE_SEPARATOR);
properties.add(CSVUtils.INCLUDE_HEADER_LINE);
properties.add(CSVUtils.QUOTE_CHAR);
Expand All @@ -81,6 +106,8 @@ public void storeStaticProperties(final ConfigurationContext context) {
} else {
this.csvFormat = null;
}

this.csvWriter = context.getProperty(CSV_WRITER).getValue();
}

@Override
Expand All @@ -92,7 +119,14 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem
csvFormat = CSVUtils.createCSVFormat(context, variables);
}

return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
if (APACHE_COMMONS_CSV.getValue().equals(csvWriter)) {
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
} else if (FAST_CSV.getValue().equals(csvWriter)) {
return new WriteFastCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
} else {
throw new IOException("Parser not supported");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.csv;

import de.siegmar.fastcsv.writer.CsvWriter;
import de.siegmar.fastcsv.writer.LineDelimiter;
import de.siegmar.fastcsv.writer.QuoteStrategy;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.QuoteMode;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RawRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.commons.csv.QuoteMode.MINIMAL;

public class WriteFastCSVResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
private final RecordSchema recordSchema;
private final SchemaAccessWriter schemaWriter;
private final String dateFormat;
private final String timeFormat;
private final String timestampFormat;

CsvWriter csvWriter;

//Need to call flush() on the underlying writer
final OutputStreamWriter streamWriter;

private final String[] fieldValues;
private final boolean includeHeaderLine;
private boolean headerWritten = false;
private String[] fieldNames;

public WriteFastCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine, final String charSet) throws IOException {

super(out);
this.recordSchema = recordSchema;
this.schemaWriter = schemaWriter;
this.dateFormat = dateFormat;
this.timeFormat = timeFormat;
this.timestampFormat = timestampFormat;
this.includeHeaderLine = includeHeaderLine;

streamWriter = new OutputStreamWriter(out, charSet);
CsvWriter.CsvWriterBuilder builder = CsvWriter.builder()
.fieldSeparator(csvFormat.getDelimiter())
.quoteCharacter(csvFormat.getQuoteCharacter());

QuoteMode quoteMode = (csvFormat.getQuoteMode() == null) ? MINIMAL : csvFormat.getQuoteMode();
switch (quoteMode) {
case ALL:
builder.quoteStrategy(QuoteStrategy.ALWAYS);
break;
case NON_NUMERIC:
builder.quoteStrategy(QuoteStrategy.NON_EMPTY);
break;
case MINIMAL:
case NONE:
builder.quoteStrategy(QuoteStrategy.REQUIRED);
}

try {
LineDelimiter lineDelimiter = LineDelimiter.of(csvFormat.getRecordSeparator());
builder.lineDelimiter(lineDelimiter);
} catch (IllegalArgumentException iae) {
throw new IOException("Line delimiter is not supported, must use LF, CR, or CRLF", iae);
}

if (csvFormat.getEscapeCharacter() != null && csvFormat.getEscapeCharacter() != '\"') {
throw new IOException("Escape character must be a double-quote character (\") per the FastCSV conformance to the RFC4180 specification");
}

csvWriter = builder.build(streamWriter);
fieldValues = new String[recordSchema.getFieldCount()];
}

private String getFormat(final RecordField field) {
final DataType dataType = field.getDataType();
switch (dataType.getFieldType()) {
case DATE:
return dateFormat;
case TIME:
return timeFormat;
case TIMESTAMP:
return timestampFormat;
}

return dataType.getFormat();
}

@Override
protected void onBeginRecordSet() throws IOException {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}

@Override
protected Map<String, String> onFinishRecordSet() throws IOException {
// If the header has not yet been written (but should be), write it out now
includeHeaderIfNecessary(null, true);
return schemaWriter.getAttributes(recordSchema);
}

@Override
public void flush() throws IOException {
streamWriter.flush();
}

@Override
public void close() throws IOException {
csvWriter.close();
}

private String[] getFieldNames(final Record record) {
if (fieldNames != null) {
return fieldNames;
}

final Set<String> allFields = new LinkedHashSet<>();
// The fields defined in the schema should be written first followed by extra ones.
allFields.addAll(recordSchema.getFieldNames());
allFields.addAll(record.getRawFieldNames());
fieldNames = allFields.toArray(new String[0]);
return fieldNames;
}

private void includeHeaderIfNecessary(final Record record, final boolean includeOnlySchemaFields) throws IOException {
if (headerWritten || !includeHeaderLine) {
return;
}

final String[] fieldNames;
if (includeOnlySchemaFields) {
fieldNames = recordSchema.getFieldNames().toArray(new String[0]);
} else {
fieldNames = getFieldNames(record);
}

csvWriter.writeRow(fieldNames);
headerWritten = true;
}

@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}

includeHeaderIfNecessary(record, true);

int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
String fieldValue = getFieldValue(record, recordField);
fieldValues[i++] = fieldValue;
}

csvWriter.writeRow(fieldValues);
return schemaWriter.getAttributes(recordSchema);
}

private String getFieldValue(final Record record, final RecordField recordField) {
return record.getAsString(recordField, getFormat(recordField));
}

@Override
public WriteResult writeRawRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}

includeHeaderIfNecessary(record, false);

final String[] fieldNames = getFieldNames(record);
// Avoid creating a new Object[] for every Record if we can. But if the record has a different number of columns than does our
// schema, we don't have a lot of options here, so we just create a new Object[] in that case.
final String[] recordFieldValues = (fieldNames.length == this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];

int i = 0;
for (final String fieldName : fieldNames) {
final Optional<RecordField> recordField = recordSchema.getField(fieldName);
if (recordField.isPresent()) {
recordFieldValues[i++] = record.getAsString(fieldName, getFormat(recordField.get()));
} else {
recordFieldValues[i++] = record.getAsString(fieldName);
}
}

csvWriter.writeRow(recordFieldValues);
final Map<String, String> attributes = schemaWriter.getAttributes(recordSchema);
return WriteResult.of(incrementRecordCount(), attributes);
}

@Override
public String getMimeType() {
return "text/csv";
}
}
Loading

0 comments on commit 936b14f

Please sign in to comment.