Skip to content

Commit

Permalink
NPE thrown when retrieving the offset for a file that filed to open. F…
Browse files Browse the repository at this point in the history
…ixes #176 (#177)
  • Loading branch information
jcustenborder authored Mar 26, 2021
1 parent 9e5c130 commit 75a9b43
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
package com.github.jcustenborder.kafka.connect.spooldir;

import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import shaded.com.google.common.base.Preconditions;
import shaded.com.google.common.base.Stopwatch;
import shaded.com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.base.Preconditions;
import shaded.com.google.common.base.Stopwatch;
import shaded.com.google.common.collect.ImmutableMap;

import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -255,7 +255,14 @@ public List<SourceRecord> read() {
this.hasRecords = !records.isEmpty();
return records;
} catch (Exception ex) {
log.error("Exception encountered processing line {} of {}.", recordOffset(), this.inputFile, ex);
long recordOffset;
try {
recordOffset = recordOffset();
} catch (Exception e) {
log.error("Exception thrown while calling recordOffset()", e);
recordOffset = -1;
}
log.error("Exception encountered processing line {} of {}.", recordOffset, this.inputFile, ex);
try {
this.cleanUpPolicy.error();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ protected List<SourceRecord> process() throws IOException {

@Override
protected long recordOffset() {
return this.inputFile.lineNumberReader().getLineNumber();
long result = -1L;

if (null != this.inputFile && null != this.inputFile.lineNumberReader()) {
result = this.inputFile.lineNumberReader().getLineNumber();
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ public void version() {
assertNotNull(this.task.version(), "version should not be null.");
}

@Test
public void recordOffsetNPE() {
this.task = createTask();
long actual = this.task.recordOffset();
}

@AfterEach
public void after() {
if (null != this.task) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.github.jcustenborder.kafka.connect.spooldir;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class SpoolDirLineDelimitedSourceTaskTest extends AbstractSpoolDirSourceTaskTest<SpoolDirLineDelimitedSourceTask> {
private static final Logger log = LoggerFactory.getLogger(SpoolDirJsonSourceTaskTest.class);

@Override
protected SpoolDirLineDelimitedSourceTask createTask() {
return new SpoolDirLineDelimitedSourceTask();
}

@Override
protected Map<String, String> settings() {
Map<String, String> settings = super.settings();
settings.put(SpoolDirCsvSourceConnectorConfig.CSV_FIRST_ROW_AS_HEADER_CONF, "true");
settings.put(SpoolDirCsvSourceConnectorConfig.PARSER_TIMESTAMP_DATE_FORMATS_CONF, "yyyy-MM-dd'T'HH:mm:ss'Z'");
settings.put(SpoolDirCsvSourceConnectorConfig.CSV_NULL_FIELD_INDICATOR_CONF, "BOTH");
return settings;
}

}

0 comments on commit 75a9b43

Please sign in to comment.