-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Open
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
在高可用集群(3个Master节点:a, b, c)环境中,从 PostgreSQL 到 Apache Doris /PostgreSQL 的数据同步任务在当前的 Leader Master (a) 上运行。当手动终止 (kill
) Master a 的进程后,观察到:
1、Leader Master变成b
2. 任务进程仍然显示为“运行中”,此时对PG库做数据变更,但 Doris 中的数据停止更新。
3. 此时若使用命令手动 /hazelcast/rest/maps/stop-jobs
该任务,Doris 中的数据会立即更新到最新状态。
SeaTunnel Version
2.3.9
SeaTunnel Config
seatunnel:
engine:
classloader-cache-mode: true
history-job-expire-minutes: 1440
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 60000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /seatunnel/checkpoint_snapshot
storage.type: hdfs
fs.defaultFS: hdfs://ctyunns
#if you need hdfs-site config, you can config like this:
hdfs_site_path: "/hadoop-conf/hdfs-site.xml"
seatunnel.hadoop.dfs.nameservices: ctyunns
seatunnel.hadoop.dfs.ha.namenodes.ctyunns: nn1,nn2
seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn1: a1:54310
seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn2: a7:54310
seatunnel.hadoop.dfs.client.failover.proxy.provider.ctyunns: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
# if you used kerberos, you can config like this:
krb5Path: "/hadoop-conf/krb5.conf"
kerberosPrincipal: "hdfs/[email protected]"
kerberosKeytabFilePath: "/hadoop-conf/hdfs.keytab"
telemetry:
metric:
enabled: false
log:
scheduled-deletion-enable: true
http:
enable-http: true
port: 8080
enable-dynamic-port: false
hazelcast:
cluster-name: seatunnel
network:
rest-api:
enabled: true
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true
member-list:
- a:5801
- b:5801
- c:5801
- a:5802
- b:5802
- c:5802
port:
auto-increment: false
port: 5801
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 50
hazelcast.heartbeat.failuredetector.type: phi-accrual
hazelcast.heartbeat.interval.seconds: 2
hazelcast.max.no.heartbeat.seconds: 180
hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
map:
engine*:
map-store:
enabled: true
initial-mode: EAGER
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
properties:
type: hdfs
namespace: /seatunnel/imap
clusterName: seatunnel
storage.type: hdfs
fs.defaultFS: hdfs://ctyunns
hdfs_site_path: "/hadoop-conf/hdfs-site.xml"
# 新增 Kerberos 认证配置
# if you need hdfs-site config, you can config like this:
seatunnel.hadoop.dfs.nameservices: ctyunns
seatunnel.hadoop.dfs.ha.namenodes.ctyunns: nn1,nn2
seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn1: a1:54310
seatunnel.hadoop.dfs.namenode.rpc-address.ctyunns.nn2: a7:54310
seatunnel.hadoop.dfs.client.failover.proxy.provider.ctyunns: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
# if you used kerberos, you can config like this:
krb5Path: "/hadoop-conf/krb5.conf"
kerberosPrincipal: "hdfs/[email protected]"
kerberosKeytabFilePath: "/hadoop-conf/hdfs.keytab"
Running Command
{
"env": {
"job.mode": "STREAMING",
"checkpoint.timeout": "500000",
"job.name": "SeaTunnel_Job",
"read_limit.rows_per_second": "10000",
"checkpoint.interval": "5000",
"parallelism": "5"
},
"source": [{
"base-url": "jdbc:postgresql://xxxx",
"startup.mode": "latest",
"table-names": [
"xxx"
],
"result_table_name": "102100000000052670_table_source",
"database-names": [
"xxxx"
],
"plugin_name": "Postgres-CDC",
"password": "xxxx",
"snapshot.split.size": 20000,
"snapshot.fetch.size": 5000,
"connection.pool.size": "20",
"connect.max-retries": "3",
"connect.timeout.ms": "30000",
"username": "xxx",
"slot.name": "1952315293163560963_1005134",
"debezium": {
"publication.autocreate.mode": "filtered",
"publication.name": "qeaadsrvos1"
}
}],
"transform": [],
"sink": [{
"fenodes": "xxxx:8035",
"query-port": "9030",
"username": "xxx",
"password": "xxxx",
"database": "xxxx",
"table": "${table_name}",
"source_table_name": "102100000000052670_table_sink",
"sink.enable-2pc": "false",
"doris.batch.size": 20000,
"sink.buffer-size": 20971520,
"sink.enable-delete": true,
"plugin_name": "Doris",
"schema_save_mode": "ERROR_WHEN_SCHEMA_NOT_EXIST",
"doris.config": {
"format": "json",
"read_json_by_line": "true",
"num_as_string": "true"
}
}]
}
Error Exception
目标端数据不变化,无报错信息
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct