Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect fails when "scylladb.offset.storage.table.enable" is set to "false" #21

Closed
pushpavanthar opened this issue Jul 23, 2020 · 3 comments
Assignees

Comments

@pushpavanthar
Copy link

The Scylladb sink connector works fine when "scylladb.offset.storage.table.enable" is set to true (when consumer group offset is maintained in scylladb table). However, when this config is set to false (consumer group offset maintained in Kafka Topic), my connect task fails with the below exception.
I tried setting "timezone": "UTC", and "locale": "en" as a fix suggested by people facing similar exceptions from discussions in other sink-connector projects[1][2], but it still throws the same exception.

Below is my connect config for reference :

{
  "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
  "transforms.valuefield.skip.missing.or.null": "false",
  "scylladb.port": "9042",
  "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "tasks.max": "1",
  "topics": "poc_group_agg_clutser",
  "scylladb.contact.points": "scylladb.hostname.com",
  "transforms": "createKey",
  "behavior.on.error": "FAIL",
  "scylladb.password": "**********",
  "key.converter.schemas.enable": "false",
  "scylladb.username": "username",
  "scylladb.keyspace": "test",
  "transforms.createKey.fields": "GROUP_ID",
  "scylladb.security.enabled": "true",
  "scylladb.offset.storage.table.enable": "false",
  "value.converter.schemas.enable": "false",
  "timezone": "UTC",
  "locale": "en",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
 [2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
	at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:249)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:152)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:149)
	... 11 more
[2020-07-23 12:01:33,153] WARN WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSinkTask.preCommit(ScyllaDbSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:378)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
	at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:249)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:152)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	... 10 more
Caused by: java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:149)
	... 11 more
[2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2020-07-23 12:01:33,153] INFO Closing getValidSession (io.connect.scylladb.ScyllaDbSinkTask)
[2020-07-23 12:01:35,163] WARN WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-07-23 12:01:35,163] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSinkTask.preCommit(ScyllaDbSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:378)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:67)
	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:666)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:288)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:703)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:849)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:822)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2211)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2178)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2128)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:164)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
@hartmut-co-uk
Copy link

I run into a similar issue.

I didn't test (also not sure which code version the stacktrace applies to) but following might also fix this issue...

Part of #61:

@Bouncheck
Copy link
Collaborator

From the looks of it I believe this was fixed with #66 (even though PR itself was not about this issue).
Please reopen if the issue persists.

@Bouncheck Bouncheck self-assigned this Jul 27, 2024
@Bouncheck
Copy link
Collaborator

Closing as completed by #66 . I believe it was related to topicOffsets not being initialized correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants