From 91b64fd37bc24b73c7650237e96ebdd7b0ad30a7 Mon Sep 17 00:00:00 2001 From: Erik Forsberg Date: Mon, 14 Nov 2022 16:31:16 +0100 Subject: [PATCH] Correctly override close() and stop() as defined in SinkTask interface The implementation of close() will now remove TopicPartitions the task no longer will have assigned. This will fix the problem where the task logs that unassigned topic/partitions are reporting commits at every commit. stop() will now close the Scylla session, as intended in the API. Fixes #79. --- .../io/connect/scylladb/ScyllaDbSinkTask.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java b/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java index adb8fa1..da8e81e 100644 --- a/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java +++ b/src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java @@ -80,7 +80,7 @@ private ScyllaDbSession getValidSession() { if (!session.isValid()) { log.warn("ScyllaDb Session is invalid. Closing and creating new."); - close(); + closeScyllaSession(); session = sessionFactory.newSession(this.config); } return session; @@ -157,7 +157,7 @@ public Map preCommit( ) { if (config.isOffsetEnabledInScyllaDb()) { try { - log.debug("flush() - Flushing offsets to {}", this.config.offsetStorageTable); + log.debug("preCommit() - Flushing offsets to {}", this.config.offsetStorageTable); List insertFutures = currentOffsets.entrySet().stream() .map(e -> this.getValidSession().getInsertOffsetStatement(e.getKey(), e.getValue())) .map(s -> getValidSession().executeStatementAsync(s)) @@ -205,16 +205,7 @@ private void handleErrors(SinkRecord record, Exception ex) { new OffsetAndMetadata(record.kafkaOffset() + 1)); } - /** - * Closes the ScyllaDB session and proceeds to closing sink task. - */ - @Override - public void stop() { - close(); - } - - // Visible for testing - void close() { + private void closeScyllaSession() { if (null != this.session) { log.info("Closing getValidSession"); try { @@ -226,6 +217,27 @@ void close() { } } + /** + * Closes the ScyllaDB session. In SinkTasks, this method is invoked only once outstanding calls to other + * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset + * commit has completed. Implementations of this method should only need to perform final cleanup operations, such + * as closing network connections to the sink system. + */ + @Override + public void stop() { + closeScyllaSession(); + + } + + @Override + public void close(Collection partitions) { + // Remove any partitions we no longer will be processing, + // or we'll see lot's and lot's of warning log entries about unassigned offset commits + for (TopicPartition partition: partitions) { + topicOffsets.remove(partition); + } + } + @Override public String version() { return VersionUtil.getVersion();