Skip to content

Commit f29a0c3

Browse files
committed
Make commit after every batch of inserts optional, with the default not to commit.
For at least once semantics, it is not necessary to commit after every call to put(), as Kafka Connect will commit the kafka Offsets periodically. Doing it periodically will lead to less writes to the __consumer_offsets topic, and hence, less load on the Kafka brokers. The default is false, i.e not to commit offset after every set of INSERT operations. This is a change in behaviour of Kafka Connect ScyllaDB, but it leads to a significant decrease in number of produce requests on the Kafka brokers, which in production environments with high load is quite visible in Broker CPU consumption metrics.
1 parent 91e596a commit f29a0c3

File tree

5 files changed

+82
-1
lines changed

5 files changed

+82
-1
lines changed

config/scylladb-sink-quickstart.properties

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ scylladb.keyspace=<keyspace-name>
5757
#scylladb.execute.timeout.ms=30000
5858
#scylladb.ttl=null
5959
#scylladb.offset.storage.table.enable=true
60+
#offset.commit.after.every.insert=false
6061

6162
### Converter configs(AVRO):
6263
#key.converter=io.confluent.connect.avro.AvroConverter

src/main/java/io/connect/scylladb/ScyllaDbSinkConnectorConfig.java

+21
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class ScyllaDbSinkConnectorConfig extends AbstractConfig {
6363
public final List<String> cipherSuites;
6464
public final File certFilePath;
6565
public final File privateKeyPath;
66+
public final boolean requestCommitAfterEveryInsert;
6667

6768
private static final Pattern TOPIC_KS_TABLE_SETTING_PATTERN =
6869
Pattern.compile("topic\\.([a-zA-Z0-9._-]+)\\.([^.]+|\"[\"]+\")\\.([^.]+|\"[\"]+\")\\.(mapping|consistencyLevel|ttlSeconds|deletesEnabled)$");
@@ -160,6 +161,8 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
160161
TopicConfigs topicConfigs = new TopicConfigs(topicWiseConfig.getValue(), this);
161162
topicWiseConfigs.put(topicWiseConfig.getKey(), topicConfigs);
162163
}
164+
165+
this.requestCommitAfterEveryInsert = getBoolean(OFFSET_COMMIT_AFTER_EVERY_INSERT);
163166
}
164167

165168
public static final String PORT_CONFIG = "scylladb.port";
@@ -303,6 +306,13 @@ public ScyllaDbSinkConnectorConfig(Map<?, ?> originals) {
303306
+ "inserting records into ScyllDB and continues to process next set of records, "
304307
+ "available in the kafka topics.";
305308

309+
public static final String OFFSET_COMMIT_AFTER_EVERY_INSERT = "offset.commit.after.every.insert";
310+
public static final boolean OFFSET_COMMIT_AFTER_EVERY_INSERT_DEFAULT = false;
311+
public static final String OFFSET_COMMIT_AFTER_EVERY_INSERT_DOC = "Configure code to request Kafka Offset Commit "
312+
+ "After every batch of insertion to Scylla. This will mean less repeated inserts after recovery "
313+
+ "but will increase load on Kafka cluster. If set to false, Kafka Connect will commit offsets "
314+
+ "periodically instead.";
315+
306316
public static final String SCYLLADB_GROUP = "ScyllaDB";
307317
public static final String CONNECTION_GROUP = "Connection";
308318
public static final String SSL_GROUP = "SSL";
@@ -626,6 +636,17 @@ public static ConfigDef config() {
626636
ConfigDef.Width.NONE,
627637
BEHAVIOR_ON_ERROR_DISPLAY,
628638
new ListRecommender(Arrays.asList(toStringArray(BehaviorOnError.values())))
639+
)
640+
.define(
641+
OFFSET_COMMIT_AFTER_EVERY_INSERT,
642+
ConfigDef.Type.BOOLEAN,
643+
OFFSET_COMMIT_AFTER_EVERY_INSERT_DEFAULT,
644+
ConfigDef.Importance.MEDIUM,
645+
OFFSET_COMMIT_AFTER_EVERY_INSERT_DOC,
646+
WRITE_GROUP,
647+
5,
648+
ConfigDef.Width.SHORT,
649+
"Request Kafka Offset Commit after every Scylla Write"
629650
);
630651
}
631652

src/main/java/io/connect/scylladb/ScyllaDbSinkTask.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ public void put(Collection<SinkRecord> records) {
128128
ResultSet resultSet =
129129
future.getUninterruptibly(this.config.statementTimeoutMs, TimeUnit.MILLISECONDS);
130130
}
131-
context.requestCommit();
131+
if(config.requestCommitAfterEveryInsert) {
132+
context.requestCommit();
133+
}
132134
// TODO : Log the records that fail in Queue/Kafka Topic.
133135
} catch (TransportException ex) {
134136
log.debug("put() - Setting clusterValid = false", ex);

src/test/java/io/connect/scylladb/integration/ScyllaDbSinkConnectorIT.java

+56
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ static Map<String, String> settings() {
6969
result.put(ScyllaDbSinkConnectorConfig.CONTACT_POINTS_CONFIG, SCYLLA_DB_CONTACT_POINT);
7070
result.put(ScyllaDbSinkConnectorConfig.PORT_CONFIG, String.valueOf(SCYLLA_DB_PORT));
7171
result.put(ScyllaDbSinkConnectorConfig.KEYSPACE_REPLICATION_FACTOR_CONFIG, "1");
72+
result.put(ScyllaDbSinkConnectorConfig.OFFSET_COMMIT_AFTER_EVERY_INSERT, "true");
7273
return result;
7374
}
7475

@@ -181,6 +182,61 @@ public void insert() {
181182
verify(this.sinkTaskContext, times(1)).assignment();
182183
}
183184

185+
@Test
186+
public void insertWithoutRequestCommit() {
187+
final Map<String, String> settings = settings();
188+
settings.put(ScyllaDbSinkConnectorConfig.OFFSET_COMMIT_AFTER_EVERY_INSERT, "false");
189+
connector = new ScyllaDbSinkConnector();
190+
connector.start(settings);
191+
final String topic = "insertTestingWORC";
192+
when(this.sinkTaskContext.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 3)));
193+
this.task.start(settings);
194+
List<SinkRecord> finalRecordsList = new ArrayList<>();
195+
for (int i = 0; i< 1000; i++) {
196+
List<SinkRecord> records = ImmutableList.of(
197+
write(
198+
topic,
199+
struct("key",
200+
"id", Schema.Type.INT64, true, 12345L+ i
201+
), struct("key",
202+
"id", Schema.Type.INT64, true, 12345L + i,
203+
"firstName", Schema.Type.STRING, true, "test",
204+
"lastName", Schema.Type.STRING, true, "user",
205+
"age", Schema.Type.INT64, true, 1234L + i
206+
)
207+
),
208+
write(topic,
209+
null,
210+
asMap(
211+
struct("key",
212+
"id", Schema.Type.INT64, true, 67890L + i
213+
)
214+
),
215+
null,
216+
asMap(
217+
struct("key",
218+
"id", Schema.Type.INT64, true, 67890L + i,
219+
"firstName", Schema.Type.STRING, true, "another",
220+
"lastName", Schema.Type.STRING, true, "user",
221+
"age", Schema.Type.INT64, true, 10L + i
222+
)
223+
)
224+
)
225+
);
226+
finalRecordsList.addAll(records);
227+
}
228+
this.validations = finalRecordsList.stream()
229+
.map(RowValidator::of)
230+
.collect(Collectors.toList());
231+
this.task.put(finalRecordsList);
232+
Boolean tableExists = IsOffsetStorageTableExists(SCYLLADB_OFFSET_TABLE);
233+
assertEquals(true, tableExists);
234+
verify(this.sinkTaskContext, times(0)).requestCommit();
235+
verify(this.sinkTaskContext, times(1)).assignment();
236+
}
237+
238+
239+
184240
@Test
185241
@Disabled
186242
public void insertWithTopicMapping() {

src/test/java/io/connect/scylladb/integration/ScyllaNativeTypesIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ static Cluster.Builder clusterBuilder() {
7979
.put(ScyllaDbSinkConnectorConfig.CONTACT_POINTS_CONFIG, SCYLLA_DB_CONTACT_POINT)
8080
.put(ScyllaDbSinkConnectorConfig.PORT_CONFIG, String.valueOf(SCYLLA_DB_PORT))
8181
.put(ScyllaDbSinkConnectorConfig.KEYSPACE_REPLICATION_FACTOR_CONFIG, "1")
82+
.put(ScyllaDbSinkConnectorConfig.OFFSET_COMMIT_AFTER_EVERY_INSERT, "true")
8283
.build());
8384

8485
@BeforeAll

0 commit comments

Comments
 (0)