Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions go/vt/vttablet/tabletserver/querythrottler/query_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/vt/log"
Expand All @@ -37,10 +39,18 @@ import (
)

const (
queryThrottlerAppName = "QueryThrottler"
// defaultPriority is the default priority value when none is specified
defaultPriority = 100 // sqlparser.MaxPriorityValue
)

type Stats struct {
requestsTotal *stats.CountersWithMultiLabels
requestsThrottled *stats.CountersWithMultiLabels
totalLatency *servenv.MultiTimingsWrapper
evaluateLatency *servenv.MultiTimingsWrapper
}

type QueryThrottler struct {
ctx context.Context
throttleClient *throttle.Client
Expand All @@ -52,6 +62,8 @@ type QueryThrottler struct {
cfgLoader ConfigLoader
// strategy is the current throttling strategy handler.
strategy registry.ThrottlingStrategyHandler
env tabletenv.Env
stats Stats
}

// NewQueryThrottler creates a new query throttler.
Expand All @@ -65,6 +77,13 @@ func NewQueryThrottler(ctx context.Context, throttler *throttle.Throttler, cfgLo
cfg: Config{},
cfgLoader: cfgLoader,
strategy: &registry.NoOpStrategy{}, // default strategy until config is loaded
env: env,
stats: Stats{
requestsTotal: env.Exporter().NewCountersWithMultiLabels(queryThrottlerAppName+"Requests", "query throttler requests", []string{"Strategy", "Workload", "Priority"}),
requestsThrottled: env.Exporter().NewCountersWithMultiLabels(queryThrottlerAppName+"Throttled", "query throttler requests throttled", []string{"Strategy", "Workload", "Priority", "MetricName", "MetricValue", "DryRun"}),
totalLatency: env.Exporter().NewMultiTimings(queryThrottlerAppName+"TotalLatencyNs", "Total time each request takes in query throttling including evaluation, metric checks, and other overhead (nanoseconds)", []string{"Strategy", "Workload", "Priority"}),
evaluateLatency: env.Exporter().NewMultiTimings(queryThrottlerAppName+"EvaluateLatencyNs", "Time each request takes to make the throttling decision (nanoseconds)", []string{"Strategy", "Workload", "Priority"}),
},
}

// Start the initial strategy
Expand Down Expand Up @@ -97,27 +116,49 @@ func (qt *QueryThrottler) Shutdown() {
func (qt *QueryThrottler) Throttle(ctx context.Context, tabletType topodatapb.TabletType, parsedQuery *sqlparser.ParsedQuery, transactionID int64, options *querypb.ExecuteOptions) error {
// Lock-free read: for maximum performance in the hot path as cfg and strategy are updated rarely (default once per minute).
// They are word-sized and safe for atomic reads; stale data for one query is acceptable and avoids mutex contention in the hot path.
if !qt.cfg.Enabled {
tCfg := qt.cfg
tStrategy := qt.strategy

if !tCfg.Enabled {
return nil
}

// Capture start time for latency measurements only when throttling is enabled
startTime := time.Now()

// Extract query attributes once to avoid re computation in strategies
attrs := registry.QueryAttributes{
WorkloadName: extractWorkloadName(options),
Priority: extractPriority(options),
}
strategyName := tStrategy.GetStrategyName()
workload := attrs.WorkloadName
priorityStr := strconv.Itoa(attrs.Priority)

// Defer total latency recording to ensure it's always emitted regardless of return path.
defer func() {
qt.stats.totalLatency.Record([]string{strategyName, workload, priorityStr}, startTime)
}()

// Evaluate the throttling decision
decision := qt.strategy.Evaluate(ctx, tabletType, parsedQuery, transactionID, attrs)

// Record evaluate-window latency immediately after Evaluate returns
qt.stats.evaluateLatency.Record([]string{strategyName, workload, priorityStr}, startTime)

qt.stats.requestsTotal.Add([]string{strategyName, workload, priorityStr}, 1)

// If no throttling is needed, allow the query
if !decision.Throttle {
return nil
}

// Emit metric of query being throttled.
qt.stats.requestsThrottled.Add([]string{strategyName, workload, priorityStr, decision.MetricName, strconv.FormatFloat(decision.MetricValue, 'f', -1, 64), strconv.FormatBool(tCfg.DryRun)}, 1)

// If dry-run mode is enabled, log the decision but don't throttle
if qt.cfg.DryRun {
log.Warningf("[DRY-RUN] %s", decision.Message)
if tCfg.DryRun {
log.Warningf("[DRY-RUN] %s, metric name: %s, metric value: %f", decision.Message, decision.MetricName, decision.MetricValue)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -157,16 +158,18 @@ func TestQueryThrottler_Shutdown(t *testing.T) {
require.NotNil(t, strategy)
}

// TestIncomingQueryThrottler_DryRunMode tests that dry-run mode logs decisions but doesn't throttle queries.
func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
// TestQueryThrottler_DryRunMode tests that dry-run mode logs decisions but doesn't throttle queries.
func TestQueryThrottler_DryRunMode(t *testing.T) {
tests := []struct {
name string
enabled bool
dryRun bool
throttleDecision registry.ThrottleDecision
expectError bool
expectDryRunLog bool
expectedLogMsg string
name string
enabled bool
dryRun bool
throttleDecision registry.ThrottleDecision
expectError bool
expectDryRunLog bool
expectedLogMsg string
expectedTotalRequests int64
expectedThrottledRequests int64
}{
{
name: "Disabled throttler - no checks performed",
Expand Down Expand Up @@ -198,8 +201,9 @@ func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
Throttle: false,
Message: "Query allowed",
},
expectError: false,
expectDryRunLog: false,
expectError: false,
expectDryRunLog: false,
expectedTotalRequests: 1,
},
{
name: "Normal mode - query throttled",
Expand All @@ -213,8 +217,10 @@ func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
Threshold: 80.0,
ThrottlePercentage: 1.0,
},
expectError: true,
expectDryRunLog: false,
expectError: true,
expectDryRunLog: false,
expectedTotalRequests: 1,
expectedThrottledRequests: 1,
},
{
name: "Dry-run mode - query would be throttled but allowed",
Expand All @@ -228,9 +234,11 @@ func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
Threshold: 80.0,
ThrottlePercentage: 1.0,
},
expectError: false,
expectDryRunLog: true,
expectedLogMsg: "[DRY-RUN] Query throttled: metric=cpu value=95.0 threshold=80.0",
expectError: false,
expectDryRunLog: true,
expectedLogMsg: "[DRY-RUN] Query throttled: metric=cpu value=95.0 threshold=80.0, metric name: cpu, metric value: 95.000000",
expectedTotalRequests: 1,
expectedThrottledRequests: 1,
},
{
name: "Dry-run mode - query allowed normally",
Expand All @@ -240,8 +248,9 @@ func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
Throttle: false,
Message: "Query allowed",
},
expectError: false,
expectDryRunLog: false,
expectError: false,
expectDryRunLog: false,
expectedTotalRequests: 1,
},
}

Expand All @@ -252,6 +261,8 @@ func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
decision: tt.throttleDecision,
}

env := tabletenv.NewEnv(vtenv.NewTestEnv(), &tabletenv.TabletConfig{}, "TestThrottler")

// Create throttler with controlled config
iqt := &QueryThrottler{
ctx: context.Background(),
Expand All @@ -260,8 +271,18 @@ func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
DryRun: tt.dryRun,
},
strategy: mockStrategy,
env: env,
stats: Stats{
requestsTotal: env.Exporter().NewCountersWithMultiLabels(queryThrottlerAppName+"Requests", "TestThrottler requests", []string{"Strategy", "Workload", "Priority"}),
requestsThrottled: env.Exporter().NewCountersWithMultiLabels(queryThrottlerAppName+"Throttled", "TestThrottler throttled", []string{"Strategy", "Workload", "Priority", "MetricName", "MetricValue", "DryRun"}),
totalLatency: env.Exporter().NewMultiTimings(queryThrottlerAppName+"TotalLatencyMs", "Total latency of QueryThrottler.Throttle in milliseconds", []string{"Strategy", "Workload", "Priority"}),
evaluateLatency: env.Exporter().NewMultiTimings(queryThrottlerAppName+"EvaluateLatencyMs", "Latency from Throttle entry to completion of Evaluate in milliseconds", []string{"Strategy", "Workload", "Priority"}),
},
}

iqt.stats.requestsTotal.ResetAll()
iqt.stats.requestsThrottled.ResetAll()

// Capture log output
logCapture := &testLogCapture{}
originalLogWarningf := log.Warningf
Expand Down Expand Up @@ -299,6 +320,12 @@ func TestIncomingQueryThrottler_DryRunMode(t *testing.T) {
} else {
require.Empty(t, logCapture.logs, "Expected no log messages")
}

// Verify stats expectation
totalRequests := stats.CounterForDimension(iqt.stats.requestsTotal, "Strategy")
throttledRequests := stats.CounterForDimension(iqt.stats.requestsThrottled, "Strategy")
require.Equal(t, tt.expectedTotalRequests, totalRequests.Counts()["MockStrategy"], "Total requests should match expected")
require.Equal(t, tt.expectedThrottledRequests, throttledRequests.Counts()["MockStrategy"], "Throttled requests should match expected")
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ func (s *NoOpStrategy) Start() {
func (s *NoOpStrategy) Stop() {
// No-op: NoOpStrategy has no resources to clean up
}

// GetStrategyName returns the name of the strategy.
func (s *NoOpStrategy) GetStrategyName() string {
return string(ThrottlingStrategyUnknown)
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ func TestNoOpStrategy_Evaluate(t *testing.T) {
})
}
}

func TestNoOpStrategy_GetStrategyName(t *testing.T) {
strategy := &NoOpStrategy{}
require.Equal(t, string(ThrottlingStrategyUnknown), strategy.GetStrategyName())
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ type ThrottlingStrategyHandler interface {
// This method should be called when the strategy is no longer needed.
// Implementations should clean up background processes, caches, or other resources.
Stop()

// GetStrategyName returns the name of the strategy.
GetStrategyName() string
}
Loading