diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java index 98aacbf..fdc3e66 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java @@ -40,12 +40,12 @@ public class ScyllaDbSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(ScyllaDbSinkTask.class); private ScyllaDbSinkConnectorConfig config; - private Map topicOffsets; + private Map topicOffsets = new HashMap<>(); ScyllaDbSession session; /** - * Starts the sink task. - * If scylladb.offset.storage.table.enable is set to true, + * Starts the sink task. + * If scylladb.offset.storage.table.enable is set to true, * the task will load offsets for each Kafka topic-partition from * ScyllaDB offset table into task context. */ @@ -64,20 +64,20 @@ public void start(Map settings) { } /* - * Returns a ScyllaDB session. + * Returns a ScyllaDB session. * Creates a session, if not already exists. - * In case the when session is not valid, + * In case the when session is not valid, * it closes the existing session and creates a new one. */ private ScyllaDbSession getValidSession() { - + ScyllaDbSessionFactory sessionFactory = new ScyllaDbSessionFactory(); if (session == null) { log.info("Creating ScyllaDb Session."); session = sessionFactory.newSession(this.config); - } - + } + if (!session.isValid()) { log.warn("ScyllaDb Session is invalid. Closing and creating new."); close(); @@ -89,7 +89,7 @@ private ScyllaDbSession getValidSession() { /** *
    *
  1. Validates the kafka records. - *
  2. Writes or deletes records from Kafka topic into ScyllaDB. + *
  3. Writes or deletes records from Kafka topic into ScyllaDB. *
  4. Requests to commit the records when the scyllaDB operations are successful. *
*/ @@ -146,9 +146,9 @@ public void put(Collection records) { } /** - * If scylladb.offset.storage.table.enable is set to true, - * updates offsets in ScyllaDB table. - * Else, assumes all the records in previous @put call were successfully + * If scylladb.offset.storage.table.enable is set to true, + * updates offsets in ScyllaDB table. + * Else, assumes all the records in previous @put call were successfully * written in to ScyllaDB and returns the same offsets. */ @Override @@ -212,7 +212,7 @@ private void handleErrors(SinkRecord record, Exception ex) { public void stop() { close(); } - + // Visible for testing void close() { if (null != this.session) { @@ -230,4 +230,4 @@ void close() { public String version() { return VersionUtil.getVersion(); } -} \ No newline at end of file +} diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java index 5eaf3a0..fbaa640 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java @@ -96,8 +96,9 @@ public BoundStatement getBoundStatementForRecord(SinkRecord record) { } else { boundStatement.setConsistencyLevel(this.scyllaDbSinkConnectorConfig.consistencyLevel); // Timestamps in Kafka (record.timestamp()) are in millisecond precision, - // while Scylla expects a microsecond precision: 1 ms = 1000 us. - boundStatement.setDefaultTimestamp(record.timestamp() * 1000); + // while Scylla expects a microsecond precision: 1 ms = 1000 µs. + // + to avoid clashes of subsequent writes having the same record timestamp, the offset mod 1000 is added as [µs]. + boundStatement.setDefaultTimestamp(record.timestamp() * 1000 + record.kafkaOffset() % 1000); } return boundStatement; }