From d54d6b91f2a12f2022bab909c8fa51a79d0fa24b Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Tue, 18 Jun 2019 15:11:04 +0800 Subject: [PATCH] pump/: Accelerat transaction status queries throught GetMvccByEncodeKey (#632) * pump/: Query txn info by GetMvccByEncodeKey * server.go: Fix Fprintf call needs 1 arg but has 2 args * pump: fix log * address trivial comment * storage.go: Init helper only if provide tiStore --- go.mod | 1 + pump/server.go | 25 ++++++++ pump/storage/helper.go | 63 ++++++++++++++++++++ pump/storage/storage.go | 124 ++++++++++++++++++++++++++++++---------- 4 files changed, 182 insertions(+), 31 deletions(-) create mode 100644 pump/storage/helper.go diff --git a/go.mod b/go.mod index f870955a1..ff68bda7e 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 github.com/pingcap/errors v0.11.1 + github.com/pingcap/kvproto v0.0.0-20181109035735-8e3f33ac4929 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190411060258-7e41749fa69c github.com/pingcap/pd v2.1.3+incompatible diff --git a/pump/server.go b/pump/server.go index caaa212bc..e09bdfbe8 100644 --- a/pump/server.go +++ b/pump/server.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb-binlog/pkg/node" "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb-binlog/pump/storage" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" @@ -81,6 +82,7 @@ type Server struct { lastWriteBinlogUnixNano int64 pdCli pd.Client cfg *Config + tiStore kv.Storage writeBinlogCount int64 alivePullerCount int64 @@ -166,6 +168,7 @@ func NewServer(cfg *Config) (*Server, error) { ctx: ctx, cancel: cancel, metrics: metrics, + tiStore: tiStore, gcDuration: time.Duration(cfg.GC) * 24 * time.Hour, pdCli: pdCli, cfg: cfg, @@ -597,6 +600,23 @@ func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, "\n\n PrewriteValue: \n") fmt.Fprint(w, prewriteValue.String()) } + + if len(binlog.PrewriteKey) > 0 { + tikvStorage := s.tiStore.(tikv.Storage) + helper := storage.Helper{ + Store: tikvStorage, + RegionCache: tikvStorage.GetRegionCache(), + } + + resp, err := helper.GetMvccByEncodedKey(binlog.PrewriteKey) + if err != nil { + fmt.Fprintf(w, "GetMvccByEncodedKey failed: %s", err.Error()) + return + } + + fmt.Fprint(w, "\n\n GetMvccByEncodedKey: \n") + fmt.Fprint(w, resp.String()) + } } // PumpStatus returns all pumps' status. @@ -782,6 +802,11 @@ func (s *Server) Close() { s.pdCli.Close() } log.Info("has closed pdCli") + + if s.tiStore != nil { + s.tiStore.Close() + } + log.Info("has closed tiStore") } func (s *Server) waitUntilCommitTSSaved(ctx context.Context, ts int64, checkInterval time.Duration) error { diff --git a/pump/storage/helper.go b/pump/storage/helper.go new file mode 100644 index 000000000..12cd4e5b0 --- /dev/null +++ b/pump/storage/helper.go @@ -0,0 +1,63 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copy from https://github.com/pingcap/tidb/blob/71def9c7263432c0dfa6a5960f6db824775177c9/store/helper/helper.go#L47 +// we can use it directly if we upgrade to the latest version of TiDB dependency. + +package storage + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" + "go.uber.org/zap" +) + +// Helper is a middleware to get some information from tikv/pd. +type Helper struct { + Store tikv.Storage + RegionCache *tikv.RegionCache +} + +// GetMvccByEncodedKey get the MVCC value by the specific encoded key. +func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { + keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey) + if err != nil { + return nil, errors.Trace(err) + } + + tikvReq := &tikvrpc.Request{ + Type: tikvrpc.CmdMvccGetByKey, + MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{ + Key: encodedKey, + }, + } + kvResp, err := h.Store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute) + if err != nil { + log.Info("get MVCC by encoded key failed", + zap.Binary("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Binary("startKey", keyLocation.StartKey), + zap.Binary("endKey", keyLocation.EndKey), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + return kvResp.MvccGetByKey, nil +} diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 1fab5a248..8e5d0c293 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -25,6 +25,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" pkgutil "github.com/pingcap/tidb-binlog/pkg/util" "github.com/pingcap/tidb/kv" @@ -85,6 +86,7 @@ type Append struct { metadata *leveldb.DB sorter *sorter tiStore kv.Storage + helper *Helper tiLockResolver *tikv.LockResolver latestTS int64 @@ -138,6 +140,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL } writeCh := make(chan *request, chanCapacity) + append = &Append{ dir: dir, vlog: vlog, @@ -179,6 +182,16 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL }) if tiStore != nil { + tikvStorage, ok := tiStore.(tikv.Storage) + if !ok { + return nil, errors.New("not tikv.Storage") + } + + append.helper = &Helper{ + Store: tikvStorage, + RegionCache: tikvStorage.GetRegionCache(), + } + sorter.setResolver(append.resolve) } @@ -344,22 +357,92 @@ func (a *Append) updateStatus() { } } +// Write a commit binlog myself if the status is committed, +// otherwise we can just ignore it, we will not get the commit binlog while iterating the kv by ts +func (a *Append) writeCBinlog(pbinlog *pb.Binlog, commitTS int64) error { + // write the commit binlog myself + cbinlog := new(pb.Binlog) + cbinlog.Tp = pb.BinlogType_Commit + cbinlog.StartTs = pbinlog.StartTs + cbinlog.CommitTs = commitTS + + req := a.writeBinlog(cbinlog) + if req.err != nil { + return errors.Annotate(req.err, "writeBinlog failed") + } + + // when writeBinlog return success, the pointer will be write to kv async, + // but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when + // we update maxCommitTS + // write the ts -> pointer to KV here to make sure it. + pointer, err := req.valuePointer.MarshalBinary() + if err != nil { + panic(err) + } + + err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil) + if err != nil { + return errors.Annotate(req.err, "put into metadata failed") + } + + return nil +} + func (a *Append) resolve(startTS int64) bool { latestTS := atomic.LoadInt64(&a.latestTS) if latestTS <= 0 { return false } - startSecond := oracle.ExtractPhysical(uint64(startTS)) / int64(time.Second/time.Millisecond) - maxSecond := oracle.ExtractPhysical(uint64(latestTS)) / int64(time.Second/time.Millisecond) - - if maxSecond-startSecond <= maxTxnTimeoutSecond { + pbinlog, err := a.readBinlogByTS(startTS) + if err != nil { + log.Error(errors.ErrorStack(err)) return false } - pbinlog, err := a.readBinlogByTS(startTS) + resp, err := a.helper.GetMvccByEncodedKey(pbinlog.PrewriteKey) if err != nil { - log.Error(errors.ErrorStack(err)) + log.Error("GetMvccByEncodedKey failed", zap.Error(err)) + } else if resp.RegionError != nil { + log.Error("GetMvccByEncodedKey failed", zap.Stringer("RegionError", resp.RegionError)) + } else if len(resp.Error) > 0 { + log.Error("GetMvccByEncodedKey failed", zap.String("Error", resp.Error)) + } else { + for _, w := range resp.Info.Writes { + if int64(w.StartTs) != startTS { + continue + } + + if w.Type != kvrpcpb.Op_Rollback { + // Sanity checks + if int64(w.CommitTs) <= startTS { + log.Error("op type not Rollback, but have unexpect commit ts", + zap.Int64("startTS", startTS), + zap.Uint64("commitTS", w.CommitTs)) + break + } + + err := a.writeCBinlog(pbinlog, int64(w.CommitTs)) + if err != nil { + log.Error("writeCBinlog failed", zap.Error(err)) + return false + } + } else { + // Will get the same value as start ts if it's rollback, set to 0 for log + w.CommitTs = 0 + } + + log.Info("known txn is committed or rollback from tikv", + zap.Int64("start ts", startTS), + zap.Uint64("commit ts", w.CommitTs)) + return true + } + } + + startSecond := oracle.ExtractPhysical(uint64(startTS)) / int64(time.Second/time.Millisecond) + maxSecond := oracle.ExtractPhysical(uint64(latestTS)) / int64(time.Second/time.Millisecond) + + if maxSecond-startSecond <= maxTxnTimeoutSecond { return false } @@ -377,35 +460,14 @@ func (a *Append) resolve(startTS int64) bool { // Write a commit binlog myself if the status is committed, // otherwise we can just ignore it, we will not get the commit binlog while iterator the kv by ts if status.IsCommitted() { - // write the commit binlog myself - cbinlog := new(pb.Binlog) - cbinlog.Tp = pb.BinlogType_Commit - cbinlog.StartTs = pbinlog.StartTs - cbinlog.CommitTs = int64(status.CommitTS()) - - req := a.writeBinlog(cbinlog) - if req.err != nil { - log.Error("writeBinlog failed", zap.Error(req.err)) - return false - } - - // when writeBinlog return success, the pointer will be write to kv async, - // but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when - // we update maxCommitTS - // write the ts -> pointer to KV here to make sure it. - pointer, err := req.valuePointer.MarshalBinary() - if err != nil { - panic(err) - } - - err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil) + err := a.writeCBinlog(pbinlog, int64(status.CommitTS())) if err != nil { - log.Error("put into metadata failed", zap.Error(req.err)) + log.Error("writeCBinlog failed", zap.Error(err)) return false } } - log.Info("known txn is committed from tikv", + log.Info("known txn is committed or rollback from tikv", zap.Int64("start ts", startTS), zap.Uint64("commit ts", status.CommitTS())) return true @@ -620,7 +682,7 @@ func (a *Append) writeBinlog(binlog *pb.Binlog) *request { } if duration > a.options.SlowWriteThreshold { - log.Warn("take a long time to write binlog", zap.Stringer("binlog type", binlog.Tp), zap.Int64("commit TS", binlog.CommitTs), zap.Int64("start TS", binlog.StartTs), zap.Int("length", len(binlog.PrewriteValue))) + log.Warn("take a long time to write binlog", zap.Stringer("binlog type", binlog.Tp), zap.Int64("commit TS", binlog.CommitTs), zap.Int64("start TS", binlog.StartTs), zap.Int("length", len(binlog.PrewriteValue)), zap.Float64("cost time", duration)) } }()