-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feed downstream by relay log at startup if need #883
Conversation
the statue may be Running even it's consistent with the log
/run-all-tests |
@july2993 Please resolve conflicts. |
resolved |
|
||
var _ = check.Suite(&relaySuite{}) | ||
|
||
type noOpLoader struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Embed the interface so that we don't have to repeat methods we don't need in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all need unless SaveMode
, I will keep this.
instead, we should make the pkg/Loader easy to test(not make it an interface only for test), we can use the struct directly and add some method like set it silently accept input
and return success directly.
drainer/checkpoint/checkpoint.go
Outdated
@@ -19,6 +19,14 @@ import ( | |||
"go.uber.org/zap" | |||
) | |||
|
|||
const ( | |||
// StatusNormal means server quit normally, data <= ts is synced to downstream | |||
StatusNormal int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth a better name, like StatusConsistent
and StatusDrained
In fact, I'm not sure what's meaning of it, does it mean the binlog is drained, or reaching a consistent replication state? which one is right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good Change!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename as StatusConsistent
002d449
@@ -68,6 +74,16 @@ func newMysql(cfg *Config) (CheckPoint, error) { | |||
return nil, errors.Annotatef(err, "exec failed, sql: %s", sql) | |||
} | |||
|
|||
if sp.clusterID == 0 { | |||
id, err := getClusterID(db, sp.schema, sp.table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is very weird.
In order to handle the situation where the upstream cluster is completely down, drainer needs a correct way to fetch the cluster id. But current implementation is not satisfying, like drainer can't allow store multiple entries in checkpoint table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I file an issue about it #889
}{ | ||
{"no row", nil, 0, true, ErrNoCheckpointItem}, | ||
{"on row", []uint64{1}, 1, false, nil}, | ||
{"multi row", []uint64{1, 2}, 0, true, nil}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the checkSpecifiedErr
be nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, true, nil means the result will be some error but we don't check what kind of error.
@@ -139,9 +148,6 @@ func NewConfig() *Config { | |||
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") | |||
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") | |||
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml") | |||
fs.StringVar(&cfg.SyncerCfg.RelayLogDir, "relay-log-dir", "", "path to relay log of syncer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we leave them?
they may improve ease of use, like in K8s, because configuration files are always cumbersome
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
drainer/relay.go
Outdated
|
||
defer cp.Close() | ||
|
||
if cp.Status() == checkpoint.StatusNormal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean relay log
only used to make the downstream reaching a consistent state?
|
||
log.Info("finish feed by relay log") | ||
|
||
readerErr := <-r.Error() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about put codes L136 ~ L147 before L59? there may be some benefits, like no matter what reason feedByRelayLog
exits, it's good for releasing the object, like calling r.Close
, ld.Close()
, in feedByRelayLogIfNeed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feedByRelayLog
will need to close ld
once having put all txns into it, and r.Error()
should better put here together with the according r.Run()
and r.Txns()
go func() { | ||
ld.SetSafeMode(true) | ||
loaderErr = ld.Run() | ||
close(loaderQuit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ld.Run
was exited with some error and close(loaderQuit)
is still not executed, may the logic at L120 cause to miss the loaderErr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
120 case success, ok := <-successTxnC:
121 if !ok {
122 successTxnC = nil
123 log.Info("success closed")
124 continue
125 }
126 lastSuccessTS = success.Metadata.(int64)
will continue the loop and run into the loaderQuit
case at nest select
break | ||
} | ||
|
||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in this select
is a bit hard to follow and reason about. Will it be simpler if we just we two separate goroutine for reading and writing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually first try in multi goroutine style first, but finally, give up, will keep the code style now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What problem does this PR solve?
support to recover at startup by relay log.
follows #847 #849
What is changed and how it works?
relay-read-buf-size
relay-log-size
tomax-file-size
drainer/relay.go
Check List
Tests
Side effects
Related changes
tidb-ansible
repository