Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix for issue 20: avoid clashes of subsequent writes having the same record timestamp #61

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ public class ScyllaDbSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(ScyllaDbSinkTask.class);

private ScyllaDbSinkConnectorConfig config;
private Map<TopicPartition, OffsetAndMetadata> topicOffsets;
private Map<TopicPartition, OffsetAndMetadata> topicOffsets = new HashMap<>();
ScyllaDbSession session;

/**
* Starts the sink task.
* If <code>scylladb.offset.storage.table.enable</code> is set to true,
* Starts the sink task.
* If <code>scylladb.offset.storage.table.enable</code> is set to true,
* the task will load offsets for each Kafka topic-partition from
* ScyllaDB offset table into task context.
*/
Expand All @@ -64,20 +64,20 @@ public void start(Map<String, String> settings) {
}

/*
* Returns a ScyllaDB session.
* Returns a ScyllaDB session.
* Creates a session, if not already exists.
* In case the when session is not valid,
* In case the when session is not valid,
* it closes the existing session and creates a new one.
*/
private ScyllaDbSession getValidSession() {

ScyllaDbSessionFactory sessionFactory = new ScyllaDbSessionFactory();

if (session == null) {
log.info("Creating ScyllaDb Session.");
session = sessionFactory.newSession(this.config);
}
}

if (!session.isValid()) {
log.warn("ScyllaDb Session is invalid. Closing and creating new.");
close();
Expand All @@ -89,7 +89,7 @@ private ScyllaDbSession getValidSession() {
/**
* <ol>
* <li>Validates the kafka records.
* <li>Writes or deletes records from Kafka topic into ScyllaDB.
* <li>Writes or deletes records from Kafka topic into ScyllaDB.
* <li>Requests to commit the records when the scyllaDB operations are successful.
* </ol>
*/
Expand Down Expand Up @@ -146,9 +146,9 @@ public void put(Collection<SinkRecord> records) {
}

/**
* If <code>scylladb.offset.storage.table.enable</code> is set to true,
* updates offsets in ScyllaDB table.
* Else, assumes all the records in previous @put call were successfully
* If <code>scylladb.offset.storage.table.enable</code> is set to true,
* updates offsets in ScyllaDB table.
* Else, assumes all the records in previous @put call were successfully
* written in to ScyllaDB and returns the same offsets.
*/
@Override
Expand Down Expand Up @@ -212,7 +212,7 @@ private void handleErrors(SinkRecord record, Exception ex) {
public void stop() {
close();
}

// Visible for testing
void close() {
if (null != this.session) {
Expand All @@ -230,4 +230,4 @@ void close() {
public String version() {
return VersionUtil.getVersion();
}
}
}
5 changes: 3 additions & 2 deletions src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ public BoundStatement getBoundStatementForRecord(SinkRecord record) {
} else {
boundStatement.setConsistencyLevel(this.scyllaDbSinkConnectorConfig.consistencyLevel);
// Timestamps in Kafka (record.timestamp()) are in millisecond precision,
// while Scylla expects a microsecond precision: 1 ms = 1000 us.
boundStatement.setDefaultTimestamp(record.timestamp() * 1000);
// while Scylla expects a microsecond precision: 1 ms = 1000 µs.
// + to avoid clashes of subsequent writes having the same record timestamp, the offset mod 1000 is added as [µs].
boundStatement.setDefaultTimestamp(record.timestamp() * 1000 + record.kafkaOffset() % 1000);
}
return boundStatement;
}
Expand Down