Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo committed Feb 17, 2025
1 parent d23ffc1 commit 4bfdff4
Showing 1 changed file with 126 additions and 18 deletions.
144 changes: 126 additions & 18 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2593,6 +2593,98 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
}
}

func TestDistributor_MetricsForLabelMatchers_adjustPushDownLimit(t *testing.T) {
const numIngesters = 10

now := model.Now()

fixtures := []struct {
lbls []mimirpb.LabelAdapter
value float64
timestamp int64
}{
{labelAdapters(labels.MetricName, "test_1", "status", "2xx"), 1, 100000},
{labelAdapters(labels.MetricName, "test_1", "status", "3xx"), 1, 110000},
{labelAdapters(labels.MetricName, "test_1", "status", "5xx"), 1, 120000},
}

tests := map[string]struct {
setupFunc func(*prepConfig)
hints *storage.SelectHints
expectedLimit int
}{
"should adjust push-down limit": {
hints: &storage.SelectHints{
Limit: 50,
},
expectedLimit: 17,
},
"should adjust push-down limit if shuffle shard size is set": {
setupFunc: func(testConfig *prepConfig) {
testConfig.shuffleShardSize = 6
},
hints: &storage.SelectHints{
Limit: 50,
},
expectedLimit: 25, // the limit is divided between two shards (ingest_shard_size=6; RF=3)
},
"should adjust push-down limit if ingest-storage is enabled": {
setupFunc: func(testConfig *prepConfig) {
testConfig.ingestStorageEnabled = true
testConfig.limits = prepareDefaultLimits()
testConfig.limits.IngestionPartitionsTenantShardSize = 3
},
hints: &storage.SelectHints{
Limit: 50,
},
expectedLimit: 17,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
t.Parallel()

testConfig := prepConfig{
numIngesters: numIngesters,
happyIngesters: numIngesters,
numDistributors: 1,
}
if testData.setupFunc != nil {
testData.setupFunc(&testConfig)
}

ds, ingesters, _, _ := prepare(t, testConfig)

// Ensure strong read consistency, required to have no flaky tests when ingest storage is enabled.
ctx := user.InjectOrgID(context.Background(), "test")
ctx = api.ContextWithReadConsistencyLevel(ctx, api.ReadConsistencyStrong)

for _, series := range fixtures {
req := mockWriteRequest(series.lbls, series.value, series.timestamp)
_, err := ds[0].Push(ctx, req)
require.NoError(t, err)
}

// In this test, we only assert the side effect of limit's push-down. The behavior of matchers and the returned metrics are already tested above.
matchers := []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
}
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.hints, matchers...)
require.NoError(t, err)

var called bool
assertMockIngestersCalledFunc(ingesters, "MetricsForLabelMatchers", func(args ...any) {
require.Len(t, args, 2)
req := args[1].(*client.MetricsForLabelMatchersRequest)
require.EqualValues(t, testData.expectedLimit, req.Limit)
called = true
})
require.True(t, called)
})
}
}

func TestDistributor_ActiveSeries(t *testing.T) {
const numIngesters = 5
const responseSizeLimitBytes = 1024
Expand Down Expand Up @@ -6134,13 +6226,14 @@ type mockIngester struct {
sync.Mutex
client.IngesterClient
grpc_health_v1.HealthClient
happy bool
stats client.UsersStatsResponse
timeseries map[uint32]*mimirpb.PreallocTimeseries
metadata map[uint32]map[mimirpb.MetricMetadata]struct{}
queryDelay time.Duration
pushDelay time.Duration
calls map[string]int
happy bool
stats client.UsersStatsResponse
timeseries map[uint32]*mimirpb.PreallocTimeseries
metadata map[uint32]map[mimirpb.MetricMetadata]struct{}
queryDelay time.Duration
pushDelay time.Duration
// calls is a map from a methodName to list the method's invocations; each invocation is a list of the call's arguments.
calls map[string][][]any
zone string
labelNamesStreamResponseDelay time.Duration
timeOut bool
Expand Down Expand Up @@ -6231,7 +6324,7 @@ func (i *mockIngester) Close() error {
}

func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ...grpc.CallOption) (*mimirpb.WriteResponse, error) {
i.trackCall("Push")
i.trackCall("Push", ctx, req)

time.Sleep(i.pushDelay)

Expand Down Expand Up @@ -6311,7 +6404,7 @@ func makeWireChunk(c chunk.EncodedChunk) client.Chunk {
}

func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest, _ ...grpc.CallOption) (client.Ingester_QueryStreamClient, error) {
i.trackCall("QueryStream")
i.trackCall("QueryStream", ctx, req)

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -6546,7 +6639,7 @@ func (i *mockIngester) QueryExemplars(ctx context.Context, req *client.ExemplarQ
}

func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest, _ ...grpc.CallOption) (*client.MetricsForLabelMatchersResponse, error) {
i.trackCall("MetricsForLabelMatchers")
i.trackCall("MetricsForLabelMatchers", ctx, req)

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -6585,7 +6678,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
}

func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesRequest, _ ...grpc.CallOption) (*client.LabelValuesResponse, error) {
i.trackCall("LabelValues")
i.trackCall("LabelValues", ctx, req)

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -6639,7 +6732,7 @@ func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesR
}

func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesRequest, _ ...grpc.CallOption) (*client.LabelNamesResponse, error) {
i.trackCall("LabelNames")
i.trackCall("LabelNames", ctx, req)

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -6757,7 +6850,7 @@ func (s *labelNamesAndValuesMockStream) Recv() (*client.LabelNamesAndValuesRespo
}

func (i *mockIngester) LabelValuesCardinality(ctx context.Context, req *client.LabelValuesCardinalityRequest, _ ...grpc.CallOption) (client.Ingester_LabelValuesCardinalityClient, error) {
i.trackCall("LabelValuesCardinality")
i.trackCall("LabelValuesCardinality", ctx, req)

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -6831,7 +6924,7 @@ func (s *labelValuesCardinalityStream) Recv() (*client.LabelValuesCardinalityRes
}

func (i *mockIngester) ActiveSeries(ctx context.Context, req *client.ActiveSeriesRequest, _ ...grpc.CallOption) (client.Ingester_ActiveSeriesClient, error) {
i.trackCall("ActiveSeries")
i.trackCall("ActiveSeries", ctx, req)

if err := i.enforceReadConsistency(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -6897,22 +6990,31 @@ func (s *activeSeriesStream) CloseSend() error {
return nil
}

func (i *mockIngester) trackCall(name string) {
func (i *mockIngester) trackCall(name string, args ...any) {
i.Lock()
defer i.Unlock()

if i.calls == nil {
i.calls = map[string]int{}
i.calls = make(map[string][][]any)
}

i.calls[name]++
i.calls[name] = append(i.calls[name], args)
}

func (i *mockIngester) countCalls(name string) int {
i.Lock()
defer i.Unlock()

return i.calls[name]
return len(i.calls[name])
}

func (i *mockIngester) assertCalledFunc(name string, assertionFunc func(args ...any)) {
i.Lock()
defer i.Unlock()

for _, call := range i.calls[name] {
assertionFunc(call...)
}
}

func (i *mockIngester) enforceReadConsistency(ctx context.Context) error {
Expand Down Expand Up @@ -7272,6 +7374,12 @@ func countMockIngestersCalled(ingesters []*mockIngester, name string) int {
return count
}

func assertMockIngestersCalledFunc(ingesters []*mockIngester, name string, assertionFunc func(args ...any)) {
for _, i := range ingesters {
i.assertCalledFunc(name, assertionFunc)
}
}

// TestDistributor_MetricsWithRequestModifications tests that the distributor metrics are properly updated when
// requests get modified by the mechanisms that can modify them: relabel rules, drop labels, ha-dedupe, forwarding, limits.
func TestDistributor_MetricsWithRequestModifications(t *testing.T) {
Expand Down

0 comments on commit 4bfdff4

Please sign in to comment.