We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent aae39eb commit c4c92e1Copy full SHA for c4c92e1
internal/client/driver/kafka3/kafka3.go
@@ -183,6 +183,10 @@ func (kr *KafkaRunner) initiateStreaming() error {
183
kr.logger.Debugf("kafka. a sql dumpEntry")
184
} else if dumpData.TableSchema == "" && dumpData.TableName == "" {
185
kr.logger.Debugf("kafka. skip apply sqlMode and SystemVariablesStatement")
186
+ if err := kr.natsConn.Publish(m.Reply, nil); err != nil {
187
+ kr.onError(TaskStateDead, err)
188
+ return
189
+ }
190
return
191
} else {
192
// TODO cache table
0 commit comments