From d3adfc2e29791cf8b0c18424dd15325f2ce04f26 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 5 Dec 2024 22:49:46 +0800 Subject: [PATCH] pipelined dml throttle Signed-off-by: ekexium --- go.mod | 1 + go.sum | 2 ++ tikv/kv.go | 6 +++- txnkv/transaction/txn.go | 68 +++++++++++++++++++++++++++++++++------- 4 files changed, 64 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index d2ff6fc5e6..9b8d05afbc 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/tikv/client-go/v2 go 1.23 require ( + github.com/VividCortex/ewma v1.2.0 github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da github.com/docker/go-units v0.5.0 diff --git a/go.sum b/go.sum index 60814ed108..90a890ffcf 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/tikv/kv.go b/tikv/kv.go index d7d5871d62..5a8dbb673d 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -83,6 +83,7 @@ const ( defaultPipelinedFlushConcurrency = 128 defaultPipelinedResolveLockConcurrency = 8 + defaultPipelinedWriteSpeedRatio = 1.0 ) func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) { @@ -954,17 +955,20 @@ func WithDefaultPipelinedTxn() TxnOption { Enable: true, FlushConcurrency: defaultPipelinedFlushConcurrency, ResolveLockConcurrency: defaultPipelinedResolveLockConcurrency, + WriteSpeedRatio: defaultPipelinedWriteSpeedRatio, } } } // WithPipelinedTxn creates pipelined txn with specified parameters -func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int) TxnOption { +func WithPipelinedTxn(flushConcurrency, resolveLockConcurrency int, + writeSpeedRatio float64) TxnOption { return func(st *transaction.TxnOptions) { st.PipelinedTxn = transaction.PipelinedTxnOptions{ Enable: true, FlushConcurrency: flushConcurrency, ResolveLockConcurrency: resolveLockConcurrency, + WriteSpeedRatio: writeSpeedRatio, } } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index fce299b273..23addcf4d9 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -49,6 +49,7 @@ import ( "sync/atomic" "time" + "github.com/VividCortex/ewma" "github.com/dgryski/go-farm" "github.com/docker/go-units" "github.com/opentracing/opentracing-go" @@ -74,6 +75,7 @@ import ( // MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit. // We use it to abort the transaction to guarantee GC worker will not influence it. const MaxTxnTimeUse = 24 * 60 * 60 * 1000 +const defaultEWMAAge = 10 type tempLockBufferEntry struct { HasReturnValue bool @@ -113,6 +115,8 @@ type PipelinedTxnOptions struct { Enable bool FlushConcurrency int ResolveLockConcurrency int + // (0,1], 1 = no sleep + WriteSpeedRatio float64 } // TxnOptions indicates the option when beginning a transaction. @@ -180,23 +184,27 @@ type KVTxn struct { pipelinedCancel context.CancelFunc pipelinedFlushConcurrency int pipelinedResolveLockConcurrency int + writeSpeedRatio float64 + // flushBatchDurationEWMA is read before each flush, and written after each flush => no race + flushBatchDurationEWMA ewma.MovingAverage } // NewTiKVTxn creates a new KVTxn. func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, options *TxnOptions) (*KVTxn, error) { cfg := config.GetGlobalConfig() newTiKVTxn := &KVTxn{ - snapshot: snapshot, - store: store, - startTS: startTS, - startTime: time.Now(), - valid: true, - vars: tikv.DefaultVars, - scope: options.TxnScope, - enableAsyncCommit: cfg.EnableAsyncCommit, - enable1PC: cfg.Enable1PC, - diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, - RequestSource: snapshot.RequestSource, + snapshot: snapshot, + store: store, + startTS: startTS, + startTime: time.Now(), + valid: true, + vars: tikv.DefaultVars, + scope: options.TxnScope, + enableAsyncCommit: cfg.EnableAsyncCommit, + enable1PC: cfg.Enable1PC, + diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, + RequestSource: snapshot.RequestSource, + flushBatchDurationEWMA: ewma.NewMovingAverage(defaultEWMAAge), } if !options.PipelinedTxn.Enable { newTiKVTxn.us = unionstore.NewUnionStore(unionstore.NewMemDB(), snapshot) @@ -204,6 +212,7 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, } newTiKVTxn.pipelinedFlushConcurrency = options.PipelinedTxn.FlushConcurrency newTiKVTxn.pipelinedResolveLockConcurrency = options.PipelinedTxn.ResolveLockConcurrency + newTiKVTxn.writeSpeedRatio = options.PipelinedTxn.WriteSpeedRatio if err := newTiKVTxn.InitPipelinedMemDB(); err != nil { return nil, err } @@ -637,7 +646,15 @@ func (txn *KVTxn) InitPipelinedMemDB() error { } mutations.Push(op, false, mustExist, mustNotExist, flags.HasNeedConstraintCheckInPrewrite(), it.Handle()) } - return txn.committer.pipelinedFlushMutations(bo, mutations, generation) + txn.throttle() + flushStart := time.Now() + err = txn.committer.pipelinedFlushMutations(bo, mutations, generation) + if txn.flushBatchDurationEWMA.Value() == 0 { + txn.flushBatchDurationEWMA.Set(float64(time.Since(flushStart).Milliseconds())) + } else { + txn.flushBatchDurationEWMA.Add(float64(time.Since(flushStart).Milliseconds())) + } + return err }) txn.committer.priority = txn.priority.ToPB() txn.committer.syncLog = txn.syncLog @@ -648,6 +665,33 @@ func (txn *KVTxn) InitPipelinedMemDB() error { return nil } +func (txn *KVTxn) throttle() { + if txn.writeSpeedRatio > 1 || txn.writeSpeedRatio <= 0 { + logutil.BgLogger().Error( + "[pipelined dml] invalid write speed ratio", + zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Uint64("session", txn.committer.sessionID), + zap.Uint64("startTS", txn.startTS), + ) + return + } + + expectedFlushMs := txn.flushBatchDurationEWMA.Value() + // T_sleep / (T_sleep + T_flush) = 1 - writeSpeedRatio + sleepMs := int((1.0 - txn.writeSpeedRatio) / txn.writeSpeedRatio * expectedFlushMs) + if sleepMs == 0 { + return + } + logutil.BgLogger().Info( + "[pipelined dml] throttle", + zap.Int("sleepMs", sleepMs), + zap.Float64("writeSpeedRatio", txn.writeSpeedRatio), + zap.Uint64("session", txn.committer.sessionID), + zap.Uint64("startTS", txn.startTS), + ) + time.Sleep(time.Duration(sleepMs) * time.Millisecond) +} + // IsCasualConsistency returns if the transaction allows linearizability // inconsistency. func (txn *KVTxn) IsCasualConsistency() bool {