Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/goldfish/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// Storage defines the interface for storing and retrieving query samples and comparison results
type Storage interface {
// Write operations (used by querytee)
StoreQuerySample(ctx context.Context, sample *QuerySample) error
StoreQuerySample(ctx context.Context, sample *QuerySample, comparison *ComparisonResult) error
StoreComparisonResult(ctx context.Context, result *ComparisonResult) error

// Read operations (used by UI)
Expand Down
24 changes: 2 additions & 22 deletions pkg/goldfish/storage_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,8 @@ func NewMySQLStorage(config StorageConfig, logger log.Logger) (*MySQLStorage, er
}, nil
}

// computeComparisonStatus determines the comparison status based on response codes and hashes
func computeComparisonStatus(sample *QuerySample) ComparisonStatus {
// If either cell has a non-2xx status code, it's an error
if sample.CellAStatusCode < 200 || sample.CellAStatusCode >= 300 ||
sample.CellBStatusCode < 200 || sample.CellBStatusCode >= 300 {
return ComparisonStatusError
}

// If response hashes match, queries produced identical results
if sample.CellAResponseHash == sample.CellBResponseHash {
return ComparisonStatusMatch
}

// Otherwise, responses differ
return ComparisonStatusMismatch
}

// StoreQuerySample stores a sampled query with performance statistics
func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample) error {
func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample, comparison *ComparisonResult) error {
query := `
INSERT INTO sampled_queries (
correlation_id, tenant_id, user, is_logs_drilldown, query, query_type,
Expand Down Expand Up @@ -147,9 +130,6 @@ func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample
}
}

// Compute the comparison status before storing
comparisonStatus := computeComparisonStatus(sample)

_, err := s.db.ExecContext(ctx, query,
sample.CorrelationID,
sample.TenantID,
Expand Down Expand Up @@ -197,7 +177,7 @@ func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample
sample.CellAUsedNewEngine,
sample.CellBUsedNewEngine,
sample.SampledAt,
comparisonStatus,
comparison.ComparisonStatus,
)

return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/goldfish/storage_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func NewNoopStorage() *NoopStorage {
}

// StoreQuerySample is a no-op
func (n *NoopStorage) StoreQuerySample(_ context.Context, _ *QuerySample) error {
func (n *NoopStorage) StoreQuerySample(_ context.Context, _ *QuerySample, _ *ComparisonResult) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ui/goldfish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type mockStorage struct {
getQueryFunc func(ctx context.Context, correlationID string) (*goldfish.QuerySample, error)
}

func (m *mockStorage) StoreQuerySample(_ context.Context, _ *goldfish.QuerySample) error {
func (m *mockStorage) StoreQuerySample(_ context.Context, _ *goldfish.QuerySample, _ *goldfish.ComparisonResult) error {
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions tools/querytee/goldfish/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *Manager) SendToGoldfish(httpReq *http.Request, cellAResp, cellBResp *Ba
cellAData, err := CaptureResponse(&http.Response{
StatusCode: cellAResp.Status,
Body: io.NopCloser(bytes.NewReader(cellAResp.Body)),
}, cellAResp.Duration, cellAResp.TraceID, cellAResp.SpanID)
}, cellAResp.Duration, cellAResp.TraceID, cellAResp.SpanID, m.logger)
if err != nil {
level.Error(m.logger).Log("msg", "failed to capture cell A response", "err", err)
return
Expand All @@ -139,7 +139,7 @@ func (m *Manager) SendToGoldfish(httpReq *http.Request, cellAResp, cellBResp *Ba
cellBData, err := CaptureResponse(&http.Response{
StatusCode: cellBResp.Status,
Body: io.NopCloser(bytes.NewReader(cellBResp.Body)),
}, cellBResp.Duration, cellBResp.TraceID, cellBResp.SpanID)
}, cellBResp.Duration, cellBResp.TraceID, cellBResp.SpanID, m.logger)
if err != nil {
level.Error(m.logger).Log("msg", "failed to capture cell B response", "err", err)
return
Expand Down Expand Up @@ -232,7 +232,7 @@ func (m *Manager) processQueryPair(req *http.Request, cellAResp, cellBResp *Resp
sampleStored := false

if m.storage != nil {
if err := m.storage.StoreQuerySample(ctx, sample); err != nil {
if err := m.storage.StoreQuerySample(ctx, sample, &result); err != nil {
level.Error(m.logger).Log("msg", "failed to store query sample", "correlation_id", correlationID, "err", err)
m.metrics.storageOperations.WithLabelValues("store_sample", "error").Inc()
} else {
Expand Down Expand Up @@ -426,7 +426,7 @@ type ResponseData struct {
}

// CaptureResponse captures response data for comparison including trace ID and span ID
func CaptureResponse(resp *http.Response, duration time.Duration, traceID, spanID string) (*ResponseData, error) {
func CaptureResponse(resp *http.Response, duration time.Duration, traceID, spanID string, logger log.Logger) (*ResponseData, error) {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
Expand All @@ -446,7 +446,7 @@ func CaptureResponse(resp *http.Response, duration time.Duration, traceID, spanI
stats, hash, size, usedNewEngine, err = extractor.ExtractResponseData(body, duration.Milliseconds())
if err != nil {
// Log error but don't fail the capture
level.Warn(log.NewNopLogger()).Log("msg", "failed to extract response statistics", "err", err)
level.Warn(logger).Log("msg", "failed to extract response statistics", "err", err)
}
} else {
size = int64(len(body))
Expand Down
89 changes: 87 additions & 2 deletions tools/querytee/goldfish/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goldfish

import (
"context"
"errors"
"io"
"net/http"
"strings"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/loki/v3/pkg/goldfish"
"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/tools/querytee/comparator"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -23,7 +25,8 @@ type mockStorage struct {
closed bool
}

func (m *mockStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample) error {
func (m *mockStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample, comparison *goldfish.ComparisonResult) error {
sample.ComparisonStatus = comparison.ComparisonStatus
m.samples = append(m.samples, *sample)
return nil
}
Expand Down Expand Up @@ -213,7 +216,7 @@ func Test_CaptureResponse_withTraceID(t *testing.T) {
}

// Call CaptureResponse with traceID and empty spanID
data, err := CaptureResponse(resp, time.Duration(100)*time.Millisecond, tt.traceID, "")
data, err := CaptureResponse(resp, time.Duration(100)*time.Millisecond, tt.traceID, "", log.NewNopLogger())

require.NoError(t, err)
assert.Equal(t, tt.expected, data.TraceID)
Expand Down Expand Up @@ -619,3 +622,85 @@ func (m *mockResultStore) Close(context.Context) error {
m.closed = true
return nil
}

// mockResponseComparator implements ResponsesComparator for testing
type mockResponseComparator struct {
match bool
}

func (m *mockResponseComparator) Compare(expectedBody, actualBody []byte, timestamp time.Time) (*comparator.ComparisonSummary, error) {
if m.match {
return &comparator.ComparisonSummary{}, nil
}
return nil, errors.New("comparison failed")
}

func TestManager_StoreQuerySample_UsesComparatorResult(t *testing.T) {
tests := []struct {
name string
cellAHash string
cellBHash string
comparatorMatch bool
expectedStatus goldfish.ComparisonStatus
}{
{
name: "hash mismatch with tolerance match",
cellAHash: "hash1",
cellBHash: "hash2",
comparatorMatch: true,
expectedStatus: goldfish.ComparisonStatusMatch,
},
{
name: "hash mismatch without tolerance match",
cellAHash: "hash1",
cellBHash: "hash2",
comparatorMatch: false,
expectedStatus: goldfish.ComparisonStatusMismatch,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
storage := &mockStorage{}

mockComparator := &mockResponseComparator{
match: tt.comparatorMatch,
}

config := Config{
Enabled: true,
ResultsStorage: ResultsStorageConfig{
Mode: ResultsPersistenceModeAll,
},
PerformanceTolerance: 0.1,
}

manager, err := NewManager(config, mockComparator, storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)

// Create responses with different hashes (both return 200)
cellAResp := &ResponseData{
StatusCode: 200,
Hash: tt.cellAHash,
Body: []byte(`{"status":"success","data":{"result":[]}}`),
Stats: goldfish.QueryStats{ExecTimeMs: 100},
}
cellBResp := &ResponseData{
StatusCode: 200,
Hash: tt.cellBHash,
Body: []byte(`{"status":"success","data":{"result":[]}}`),
Stats: goldfish.QueryStats{ExecTimeMs: 100},
}

req, _ := http.NewRequest("GET", "/loki/api/v1/query_range?query=test", nil)

// Process the query pair
manager.processQueryPair(req, cellAResp, cellBResp)

// Verify the stored sample has the correct comparison status from the comparator
require.Len(t, storage.samples, 1)
assert.Equal(t, tt.expectedStatus, storage.samples[0].ComparisonStatus,
"comparison status should match what the comparator returned")
})
}
}
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ type mockGoldfishStorage struct {
results []goldfish.ComparisonResult
}

func (m *mockGoldfishStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample) error {
func (m *mockGoldfishStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample, _ *goldfish.ComparisonResult) error {
m.samples = append(m.samples, *sample)
return nil
}
Expand Down
Loading