diff --git a/README.md b/README.md index fb913d6cd..833917f46 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ Documentation for this connector can be found [here](http://docs.confluent.io/cu # Criteo fork changes - Disable all hive related test raising a `NoClassDefFound Could not initialize class org.apache.hadoop.hive.ql.exec.Utilities`. Related issue (https://github.com/criteo-forks/kafka-connect-hdfs/issues/1). To be fixed if we plan to use hive module (not the case currently). +- Apply unmerged PR https://github.com/confluentinc/kafka-connect-hdfs/pull/684 to solve the rotate Interval that doesn't work for low volume or irregular traffic - Force jackson-mapper-asl transitive dependency to 1.9.13 # Development diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 9bb8b9390..4f0dc5a97 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -316,9 +316,9 @@ public boolean recover() { private void updateRotationTimers(SinkRecord currentRecord) { long now = time.milliseconds(); // Wallclock-based partitioners should be independent of the record argument. - lastRotate = isWallclockBased + lastRotate = isWallclockBased || currentRecord == null ? (Long) now - : currentRecord != null ? timestampExtractor.extract(currentRecord) : null; + : timestampExtractor.extract(currentRecord); if (log.isDebugEnabled() && rotateIntervalMs > 0) { log.debug( "Update last rotation timer. Next rotation for {} will be in {}ms", @@ -600,10 +600,10 @@ private void setState(State state) { } private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) { - Long currentTimestamp = null; - if (isWallclockBased) { + Long currentTimestamp; + if (isWallclockBased || currentRecord == null) { currentTimestamp = now; - } else if (currentRecord != null) { + } else { currentTimestamp = timestampExtractor.extract(currentRecord); lastRotate = lastRotate == null ? currentTimestamp : lastRotate; }