Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: try use atomic replace the lock #8968

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -1463,7 +1464,10 @@ func (gc *groupCostController) onResponseImpl(
}
}
}

// no need to record the consumption, fast path.
if delta.RRU+delta.WRU == 0 {
return delta, nil
}
gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
Expand Down Expand Up @@ -1504,7 +1508,10 @@ func (gc *groupCostController) onResponseWaitImpl(
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

// no need to record the consumption, fast path.
if delta.RRU+delta.WRU == 0 {
return delta, waitDuration, nil
}
gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
Expand All @@ -1522,6 +1529,73 @@ func (gc *groupCostController) onResponseWaitImpl(
return delta, waitDuration, nil
}

func (gc *groupCostController) onResponseWaitAtomicImpl(
ctx context.Context, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, time.Duration, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
var waitDuration time.Duration
if !gc.burstable.Load() {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
return nil, waitDuration, err
}
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}
// no need to record the consumption, fast path.
if delta.RRU+delta.WRU == 0 {
return delta, waitDuration, nil
}
// Record the consumption of the request
AtomicAddConsumption(gc.mu.consumption, delta)
// Record the consumption of the request by store
count := &rmpb.Consumption{}
*count = *delta
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
AtomicAddConsumption(gc.mu.storeCounter[req.StoreID()], count)
AtomicAddConsumption(gc.mu.globalCounter, count)
return delta, waitDuration, nil
}

// AtomicAdd applies an atomic addition on a field of Consumption.
func AtomicAdd(target *float64, delta float64) {
for {
old := atomic.LoadUint64((*uint64)(unsafe.Pointer(target)))
newValue := math.Float64bits(math.Float64frombits(old) + delta)
if atomic.CompareAndSwapUint64((*uint64)(unsafe.Pointer(target)), old, newValue) {
break
}
}
}

// AtomicAddConsumption performs atomic addition for all fields in Consumption.
func AtomicAddConsumption(target, delta *rmpb.Consumption) {
if target == nil || delta == nil {
return
}
AtomicAdd(&target.RRU, delta.RRU)
AtomicAdd(&target.WRU, delta.WRU)
AtomicAdd(&target.ReadBytes, delta.ReadBytes)
AtomicAdd(&target.WriteBytes, delta.WriteBytes)
AtomicAdd(&target.TotalCpuTimeMs, delta.TotalCpuTimeMs)
AtomicAdd(&target.SqlLayerCpuTimeMs, delta.SqlLayerCpuTimeMs)
AtomicAdd(&target.KvReadRpcCount, delta.KvReadRpcCount)
AtomicAdd(&target.KvWriteRpcCount, delta.KvWriteRpcCount)
}

// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
Expand Down
Binary file added client/resource_group/controller/controller.test
Binary file not shown.
69 changes: 69 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,72 @@ func TestTryGetController(t *testing.T) {
re.NoError(err)
re.NotEmpty(consumption)
}

func BenchmarkRequestAndResponseConsumptionLockVer(b *testing.B) {
gc := createTestGroupCostController(require.New(b))
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
}{
{
req: &TestRequestInfo{
isWrite: false,
writeBytes: 0,
},
resp: &TestResponseInfo{
readBytes: 100,
kvCPU: 100 * time.Millisecond,
succeed: true,
},
},
}
// exclude the token bucket locks
gc.burstable.Store(true)

// Use b.RunParallel to simulate concurrent scenarios
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for idx, testCase := range testCases {
_, _, err := gc.onResponseWaitImpl(context.TODO(), testCase.req, testCase.resp)
if err != nil {
b.Fatalf("onResponseImpl failed: %v (%d)", err, idx)
}
}
}
})
}

func BenchmarkRequestAndResponseConsumptionAtomicVer(b *testing.B) {
gc := createTestGroupCostController(require.New(b))
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
}{
{
req: &TestRequestInfo{
isWrite: false,
writeBytes: 0,
},
resp: &TestResponseInfo{
readBytes: 100,
kvCPU: 100 * time.Millisecond,
succeed: true,
},
},
}

// exclude the token bucket locks
gc.burstable.Store(true)

// Use b.RunParallel to simulate concurrent scenarios
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for idx, testCase := range testCases {
_, _, err := gc.onResponseWaitAtomicImpl(context.TODO(), testCase.req, testCase.resp)
if err != nil {
b.Fatalf("onResponseImpl failed: %v (%d)", err, idx)
}
}
}
})
}
Loading