Skip to content

[Bug] [connector-redis] redis cluster mode to write set data error #9865

@JeremyXin

Description

@JeremyXin

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Writing the set data in use redis cluster pattern, perform error: JedisConnectionException: Failed to connect to any host resolved for DNS name.

After analysis, it was found that there is a bug in the batchWriteSet method of the RedisClusterClient class. The jedis object originally passed in the method has not been initialized, resulting in the Connection of the Jedis object not being initialized, which leads to the bug

SeaTunnel Version

2.3.11

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
}

source{
 Localfile {
    path = "xxx/seatunnel/config/st-redis-test.json"
    file_format_type = "json"
    schema {
      fields {
        id = int
        name = string
      }
    }
  }
}

sink {
 Redis {
  nodes = ["xxx:6371","xxx:6372","xxx:6371","xxx:6372","xxx:6371","xxx:6372"]
  mode = "cluster"
  auth = "xxx"
  key = "redis-custom-key-{name}"
  value_field = "id"
  support_custom_key = true
  data_type = set
  expire = 300
  }
}

Running Command

sh bin/seatunnel.sh --config config/v2.batch.redis.template -m local

Error Exception

Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: redis.clients.jedis.exceptions.JedisConnectionException: Failed to connect to any host resolved for DNS name.
	at redis.clients.jedis.DefaultJedisSocketFactory.connectToFirstSuccessfulHost(DefaultJedisSocketFactory.java:63)
	at redis.clients.jedis.DefaultJedisSocketFactory.createSocket(DefaultJedisSocketFactory.java:87)
	at redis.clients.jedis.Connection.connect(Connection.java:180)
	at redis.clients.jedis.Connection.sendCommand(Connection.java:152)
	at redis.clients.jedis.Connection.sendCommand(Connection.java:147)
	at redis.clients.jedis.Jedis.select(Jedis.java:800)
	at org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters.buildJedis(RedisParameters.java:228)
	at org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters.buildRedisClient(RedisParameters.java:149)
	at org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkWriter.<init>(RedisSinkWriter.java:69)
	at org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSink.createWriter(RedisSink.java:54)
	at org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSink.createWriter(RedisSink.java:33)
	at org.apache.seatunnel.api.sink.multitablesink.MultiTableSink.createWriter(MultiTableSink.java:82)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:343)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:401)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:398)
	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:107)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.net.ConnectException: Connection refused (Connection refused)
		at java.net.PlainSocketImpl.socketConnect(Native Method)
		at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
		at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
		at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
		at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
		at java.net.Socket.connect(Socket.java:589)
		at redis.clients.jedis.DefaultJedisSocketFactory.connectToFirstSuccessfulHost(DefaultJedisSocketFactory.java:73)
		... 30 more

	... 12 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

Image

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions