Skip to content

Commit

Permalink
Correctly override close() and stop() as defined in SinkTask interface
Browse files Browse the repository at this point in the history
The implementation of close() will now remove TopicPartitions the task
no longer will have assigned. This will fix the problem where
the task logs that unassigned topic/partitions are reporting
commits at every commit.

stop() will now close the Scylla session, as intended in the API.

Fixes scylladb#79.
  • Loading branch information
forsberg committed Nov 15, 2022
1 parent 91e596a commit 91b64fd
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private ScyllaDbSession getValidSession() {

if (!session.isValid()) {
log.warn("ScyllaDb Session is invalid. Closing and creating new.");
close();
closeScyllaSession();
session = sessionFactory.newSession(this.config);
}
return session;
Expand Down Expand Up @@ -157,7 +157,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
) {
if (config.isOffsetEnabledInScyllaDb()) {
try {
log.debug("flush() - Flushing offsets to {}", this.config.offsetStorageTable);
log.debug("preCommit() - Flushing offsets to {}", this.config.offsetStorageTable);
List<ResultSetFuture> insertFutures = currentOffsets.entrySet().stream()
.map(e -> this.getValidSession().getInsertOffsetStatement(e.getKey(), e.getValue()))
.map(s -> getValidSession().executeStatementAsync(s))
Expand Down Expand Up @@ -205,16 +205,7 @@ private void handleErrors(SinkRecord record, Exception ex) {
new OffsetAndMetadata(record.kafkaOffset() + 1));
}

/**
* Closes the ScyllaDB session and proceeds to closing sink task.
*/
@Override
public void stop() {
close();
}

// Visible for testing
void close() {
private void closeScyllaSession() {
if (null != this.session) {
log.info("Closing getValidSession");
try {
Expand All @@ -226,6 +217,27 @@ void close() {
}
}

/**
* Closes the ScyllaDB session. In SinkTasks, this method is invoked only once outstanding calls to other
* methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset
* commit has completed. Implementations of this method should only need to perform final cleanup operations, such
* as closing network connections to the sink system.
*/
@Override
public void stop() {
closeScyllaSession();

}

@Override
public void close(Collection<TopicPartition> partitions) {
// Remove any partitions we no longer will be processing,
// or we'll see lot's and lot's of warning log entries about unassigned offset commits
for (TopicPartition partition: partitions) {
topicOffsets.remove(partition);
}
}

@Override
public String version() {
return VersionUtil.getVersion();
Expand Down

0 comments on commit 91b64fd

Please sign in to comment.