diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java index fbc2308..f160869 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java @@ -180,27 +180,27 @@ public static String humanReadableByteCount(long bytes, boolean si) { } private void recordProcessingTime() { - final long secondsElapsed = processingTime.elapsed(TimeUnit.SECONDS); - final long bytesPerSecond; + final long elapsedMs = processingTime.elapsed(TimeUnit.MILLISECONDS); + final long bytesPerMs; - if (0L == secondsElapsed || 0L == this.inputFile.length()) { - bytesPerSecond = 0L; + if (0L == elapsedMs || 0L == this.inputFile.length()) { + bytesPerMs = 0L; } else { - bytesPerSecond = this.inputFile.length() / secondsElapsed; + bytesPerMs = this.inputFile.length() / elapsedMs; } - if (bytesPerSecond > 0) { + if (bytesPerMs > 0) { log.info( - "Finished processing {} record(s) in {} second(s). Processing speed {} per second.", + "Finished processing {} record(s) in {} millisecond(s). Processing speed {} per millisecond.", this.recordCount, - secondsElapsed, - humanReadableByteCount(bytesPerSecond, false) + elapsedMs, + humanReadableByteCount(bytesPerMs, false) ); } else { log.info( "Finished processing {} record(s) in {} second(s).", this.recordCount, - secondsElapsed + elapsedMs ); } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java index 7808bf5..aeddb06 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java @@ -223,7 +223,9 @@ private List getInputPathSubDirsToCleanup() { File lastSubDir = this.config.inputPath; for (String subDirName : this.inputPathSubDir.split(File.separator)) { lastSubDir = new File(lastSubDir, subDirName); - inputPathSubDirsToCleanup.add(lastSubDir); + if (lastSubDir.exists()) { + inputPathSubDirsToCleanup.add(lastSubDir); + } } Collections.reverse(inputPathSubDirsToCleanup); } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirLineDelimitedSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirLineDelimitedSourceTask.java index ac4340f..ceb72d6 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirLineDelimitedSourceTask.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirLineDelimitedSourceTask.java @@ -41,7 +41,6 @@ protected void configure(InputFile inputFile, Long lastOffset) throws IOExceptio @Override protected List process() throws IOException { - int recordCount = 0; List records = new ArrayList<>(this.config.batchSize); String line = null; while (recordCount < this.config.batchSize && null != (line = this.inputFile.lineNumberReader().readLine())) {