Skip to content

Commit

Permalink
pump/: Accelerat transaction status queries throught GetMvccByEncodeK…
Browse files Browse the repository at this point in the history
…ey (pingcap#632)

* pump/: Query txn info by GetMvccByEncodeKey

* server.go: Fix Fprintf call needs 1 arg but has 2 args

* pump: fix log

* address trivial comment

* storage.go: Init helper only if provide tiStore
  • Loading branch information
july2993 authored and suzaku committed Jun 18, 2019
1 parent 968b6ac commit d54d6b9
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 31 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.1
github.com/pingcap/kvproto v0.0.0-20181109035735-8e3f33ac4929
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190411060258-7e41749fa69c
github.com/pingcap/pd v2.1.3+incompatible
Expand Down
25 changes: 25 additions & 0 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb-binlog/pump/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Server struct {
lastWriteBinlogUnixNano int64
pdCli pd.Client
cfg *Config
tiStore kv.Storage

writeBinlogCount int64
alivePullerCount int64
Expand Down Expand Up @@ -166,6 +168,7 @@ func NewServer(cfg *Config) (*Server, error) {
ctx: ctx,
cancel: cancel,
metrics: metrics,
tiStore: tiStore,
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
pdCli: pdCli,
cfg: cfg,
Expand Down Expand Up @@ -597,6 +600,23 @@ func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "\n\n PrewriteValue: \n")
fmt.Fprint(w, prewriteValue.String())
}

if len(binlog.PrewriteKey) > 0 {
tikvStorage := s.tiStore.(tikv.Storage)
helper := storage.Helper{
Store: tikvStorage,
RegionCache: tikvStorage.GetRegionCache(),
}

resp, err := helper.GetMvccByEncodedKey(binlog.PrewriteKey)
if err != nil {
fmt.Fprintf(w, "GetMvccByEncodedKey failed: %s", err.Error())
return
}

fmt.Fprint(w, "\n\n GetMvccByEncodedKey: \n")
fmt.Fprint(w, resp.String())
}
}

// PumpStatus returns all pumps' status.
Expand Down Expand Up @@ -782,6 +802,11 @@ func (s *Server) Close() {
s.pdCli.Close()
}
log.Info("has closed pdCli")

if s.tiStore != nil {
s.tiStore.Close()
}
log.Info("has closed tiStore")
}

func (s *Server) waitUntilCommitTSSaved(ctx context.Context, ts int64, checkInterval time.Duration) error {
Expand Down
63 changes: 63 additions & 0 deletions pump/storage/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Copy from https://github.com/pingcap/tidb/blob/71def9c7263432c0dfa6a5960f6db824775177c9/store/helper/helper.go#L47
// we can use it directly if we upgrade to the latest version of TiDB dependency.

package storage

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"go.uber.org/zap"
)

// Helper is a middleware to get some information from tikv/pd.
type Helper struct {
Store tikv.Storage
RegionCache *tikv.RegionCache
}

// GetMvccByEncodedKey get the MVCC value by the specific encoded key.
func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey)
if err != nil {
return nil, errors.Trace(err)
}

tikvReq := &tikvrpc.Request{
Type: tikvrpc.CmdMvccGetByKey,
MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{
Key: encodedKey,
},
}
kvResp, err := h.Store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute)
if err != nil {
log.Info("get MVCC by encoded key failed",
zap.Binary("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Binary("startKey", keyLocation.StartKey),
zap.Binary("endKey", keyLocation.EndKey),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}
return kvResp.MvccGetByKey, nil
}
124 changes: 93 additions & 31 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
pkgutil "github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -85,6 +86,7 @@ type Append struct {
metadata *leveldb.DB
sorter *sorter
tiStore kv.Storage
helper *Helper
tiLockResolver *tikv.LockResolver
latestTS int64

Expand Down Expand Up @@ -138,6 +140,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
}

writeCh := make(chan *request, chanCapacity)

append = &Append{
dir: dir,
vlog: vlog,
Expand Down Expand Up @@ -179,6 +182,16 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
})

if tiStore != nil {
tikvStorage, ok := tiStore.(tikv.Storage)
if !ok {
return nil, errors.New("not tikv.Storage")
}

append.helper = &Helper{
Store: tikvStorage,
RegionCache: tikvStorage.GetRegionCache(),
}

sorter.setResolver(append.resolve)
}

Expand Down Expand Up @@ -344,22 +357,92 @@ func (a *Append) updateStatus() {
}
}

// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterating the kv by ts
func (a *Append) writeCBinlog(pbinlog *pb.Binlog, commitTS int64) error {
// write the commit binlog myself
cbinlog := new(pb.Binlog)
cbinlog.Tp = pb.BinlogType_Commit
cbinlog.StartTs = pbinlog.StartTs
cbinlog.CommitTs = commitTS

req := a.writeBinlog(cbinlog)
if req.err != nil {
return errors.Annotate(req.err, "writeBinlog failed")
}

// when writeBinlog return success, the pointer will be write to kv async,
// but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when
// we update maxCommitTS
// write the ts -> pointer to KV here to make sure it.
pointer, err := req.valuePointer.MarshalBinary()
if err != nil {
panic(err)
}

err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil)
if err != nil {
return errors.Annotate(req.err, "put into metadata failed")
}

return nil
}

func (a *Append) resolve(startTS int64) bool {
latestTS := atomic.LoadInt64(&a.latestTS)
if latestTS <= 0 {
return false
}

startSecond := oracle.ExtractPhysical(uint64(startTS)) / int64(time.Second/time.Millisecond)
maxSecond := oracle.ExtractPhysical(uint64(latestTS)) / int64(time.Second/time.Millisecond)

if maxSecond-startSecond <= maxTxnTimeoutSecond {
pbinlog, err := a.readBinlogByTS(startTS)
if err != nil {
log.Error(errors.ErrorStack(err))
return false
}

pbinlog, err := a.readBinlogByTS(startTS)
resp, err := a.helper.GetMvccByEncodedKey(pbinlog.PrewriteKey)
if err != nil {
log.Error(errors.ErrorStack(err))
log.Error("GetMvccByEncodedKey failed", zap.Error(err))
} else if resp.RegionError != nil {
log.Error("GetMvccByEncodedKey failed", zap.Stringer("RegionError", resp.RegionError))
} else if len(resp.Error) > 0 {
log.Error("GetMvccByEncodedKey failed", zap.String("Error", resp.Error))
} else {
for _, w := range resp.Info.Writes {
if int64(w.StartTs) != startTS {
continue
}

if w.Type != kvrpcpb.Op_Rollback {
// Sanity checks
if int64(w.CommitTs) <= startTS {
log.Error("op type not Rollback, but have unexpect commit ts",
zap.Int64("startTS", startTS),
zap.Uint64("commitTS", w.CommitTs))
break
}

err := a.writeCBinlog(pbinlog, int64(w.CommitTs))
if err != nil {
log.Error("writeCBinlog failed", zap.Error(err))
return false
}
} else {
// Will get the same value as start ts if it's rollback, set to 0 for log
w.CommitTs = 0
}

log.Info("known txn is committed or rollback from tikv",
zap.Int64("start ts", startTS),
zap.Uint64("commit ts", w.CommitTs))
return true
}
}

startSecond := oracle.ExtractPhysical(uint64(startTS)) / int64(time.Second/time.Millisecond)
maxSecond := oracle.ExtractPhysical(uint64(latestTS)) / int64(time.Second/time.Millisecond)

if maxSecond-startSecond <= maxTxnTimeoutSecond {
return false
}

Expand All @@ -377,35 +460,14 @@ func (a *Append) resolve(startTS int64) bool {
// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterator the kv by ts
if status.IsCommitted() {
// write the commit binlog myself
cbinlog := new(pb.Binlog)
cbinlog.Tp = pb.BinlogType_Commit
cbinlog.StartTs = pbinlog.StartTs
cbinlog.CommitTs = int64(status.CommitTS())

req := a.writeBinlog(cbinlog)
if req.err != nil {
log.Error("writeBinlog failed", zap.Error(req.err))
return false
}

// when writeBinlog return success, the pointer will be write to kv async,
// but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when
// we update maxCommitTS
// write the ts -> pointer to KV here to make sure it.
pointer, err := req.valuePointer.MarshalBinary()
if err != nil {
panic(err)
}

err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil)
err := a.writeCBinlog(pbinlog, int64(status.CommitTS()))
if err != nil {
log.Error("put into metadata failed", zap.Error(req.err))
log.Error("writeCBinlog failed", zap.Error(err))
return false
}
}

log.Info("known txn is committed from tikv",
log.Info("known txn is committed or rollback from tikv",
zap.Int64("start ts", startTS),
zap.Uint64("commit ts", status.CommitTS()))
return true
Expand Down Expand Up @@ -620,7 +682,7 @@ func (a *Append) writeBinlog(binlog *pb.Binlog) *request {
}

if duration > a.options.SlowWriteThreshold {
log.Warn("take a long time to write binlog", zap.Stringer("binlog type", binlog.Tp), zap.Int64("commit TS", binlog.CommitTs), zap.Int64("start TS", binlog.StartTs), zap.Int("length", len(binlog.PrewriteValue)))
log.Warn("take a long time to write binlog", zap.Stringer("binlog type", binlog.Tp), zap.Int64("commit TS", binlog.CommitTs), zap.Int64("start TS", binlog.StartTs), zap.Int("length", len(binlog.PrewriteValue)), zap.Float64("cost time", duration))
}
}()

Expand Down

0 comments on commit d54d6b9

Please sign in to comment.