Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/goldfish/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// Storage defines the interface for storing and retrieving query samples and comparison results
type Storage interface {
// Write operations (used by querytee)
StoreQuerySample(ctx context.Context, sample *QuerySample) error
StoreQuerySample(ctx context.Context, sample *QuerySample, comparison *ComparisonResult) error
StoreComparisonResult(ctx context.Context, result *ComparisonResult) error

// Read operations (used by UI)
Expand Down
24 changes: 2 additions & 22 deletions pkg/goldfish/storage_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,8 @@ func NewMySQLStorage(config StorageConfig, logger log.Logger) (*MySQLStorage, er
}, nil
}

// computeComparisonStatus determines the comparison status based on response codes and hashes
func computeComparisonStatus(sample *QuerySample) ComparisonStatus {
// If either cell has a non-2xx status code, it's an error
if sample.CellAStatusCode < 200 || sample.CellAStatusCode >= 300 ||
sample.CellBStatusCode < 200 || sample.CellBStatusCode >= 300 {
return ComparisonStatusError
}

// If response hashes match, queries produced identical results
if sample.CellAResponseHash == sample.CellBResponseHash {
return ComparisonStatusMatch
}

// Otherwise, responses differ
return ComparisonStatusMismatch
}

// StoreQuerySample stores a sampled query with performance statistics
func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample) error {
func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample, comparison *ComparisonResult) error {
query := `
INSERT INTO sampled_queries (
correlation_id, tenant_id, user, is_logs_drilldown, query, query_type,
Expand Down Expand Up @@ -147,9 +130,6 @@ func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample
}
}

// Compute the comparison status before storing
comparisonStatus := computeComparisonStatus(sample)

_, err := s.db.ExecContext(ctx, query,
sample.CorrelationID,
sample.TenantID,
Expand Down Expand Up @@ -197,7 +177,7 @@ func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample
sample.CellAUsedNewEngine,
sample.CellBUsedNewEngine,
sample.SampledAt,
comparisonStatus,
comparison.ComparisonStatus,
)

return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/goldfish/storage_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func NewNoopStorage() *NoopStorage {
}

// StoreQuerySample is a no-op
func (n *NoopStorage) StoreQuerySample(_ context.Context, _ *QuerySample) error {
func (n *NoopStorage) StoreQuerySample(_ context.Context, _ *QuerySample, _ *ComparisonResult) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ui/goldfish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type mockStorage struct {
getQueryFunc func(ctx context.Context, correlationID string) (*goldfish.QuerySample, error)
}

func (m *mockStorage) StoreQuerySample(_ context.Context, _ *goldfish.QuerySample) error {
func (m *mockStorage) StoreQuerySample(_ context.Context, _ *goldfish.QuerySample, _ *goldfish.ComparisonResult) error {
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions tools/querytee/goldfish/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *Manager) SendToGoldfish(httpReq *http.Request, cellAResp, cellBResp *Ba
cellAData, err := CaptureResponse(&http.Response{
StatusCode: cellAResp.Status,
Body: io.NopCloser(bytes.NewReader(cellAResp.Body)),
}, cellAResp.Duration, cellAResp.TraceID, cellAResp.SpanID)
}, cellAResp.Duration, cellAResp.TraceID, cellAResp.SpanID, m.logger)
if err != nil {
level.Error(m.logger).Log("msg", "failed to capture cell A response", "err", err)
return
Expand All @@ -139,7 +139,7 @@ func (m *Manager) SendToGoldfish(httpReq *http.Request, cellAResp, cellBResp *Ba
cellBData, err := CaptureResponse(&http.Response{
StatusCode: cellBResp.Status,
Body: io.NopCloser(bytes.NewReader(cellBResp.Body)),
}, cellBResp.Duration, cellBResp.TraceID, cellBResp.SpanID)
}, cellBResp.Duration, cellBResp.TraceID, cellBResp.SpanID, m.logger)
if err != nil {
level.Error(m.logger).Log("msg", "failed to capture cell B response", "err", err)
return
Expand Down Expand Up @@ -232,7 +232,7 @@ func (m *Manager) processQueryPair(req *http.Request, cellAResp, cellBResp *Resp
sampleStored := false

if m.storage != nil {
if err := m.storage.StoreQuerySample(ctx, sample); err != nil {
if err := m.storage.StoreQuerySample(ctx, sample, &result); err != nil {
level.Error(m.logger).Log("msg", "failed to store query sample", "correlation_id", correlationID, "err", err)
m.metrics.storageOperations.WithLabelValues("store_sample", "error").Inc()
} else {
Expand Down Expand Up @@ -426,7 +426,7 @@ type ResponseData struct {
}

// CaptureResponse captures response data for comparison including trace ID and span ID
func CaptureResponse(resp *http.Response, duration time.Duration, traceID, spanID string) (*ResponseData, error) {
func CaptureResponse(resp *http.Response, duration time.Duration, traceID, spanID string, logger log.Logger) (*ResponseData, error) {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
Expand All @@ -446,7 +446,7 @@ func CaptureResponse(resp *http.Response, duration time.Duration, traceID, spanI
stats, hash, size, usedNewEngine, err = extractor.ExtractResponseData(body, duration.Milliseconds())
if err != nil {
// Log error but don't fail the capture
level.Warn(log.NewNopLogger()).Log("msg", "failed to extract response statistics", "err", err)
level.Warn(logger).Log("msg", "failed to extract response statistics", "err", err)
}
} else {
size = int64(len(body))
Expand Down
89 changes: 87 additions & 2 deletions tools/querytee/goldfish/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goldfish

import (
"context"
"errors"
"io"
"net/http"
"strings"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/loki/v3/pkg/goldfish"
"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/tools/querytee/comparator"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -23,7 +25,8 @@ type mockStorage struct {
closed bool
}

func (m *mockStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample) error {
func (m *mockStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample, comparison *goldfish.ComparisonResult) error {
sample.ComparisonStatus = comparison.ComparisonStatus
m.samples = append(m.samples, *sample)
return nil
}
Expand Down Expand Up @@ -213,7 +216,7 @@ func Test_CaptureResponse_withTraceID(t *testing.T) {
}

// Call CaptureResponse with traceID and empty spanID
data, err := CaptureResponse(resp, time.Duration(100)*time.Millisecond, tt.traceID, "")
data, err := CaptureResponse(resp, time.Duration(100)*time.Millisecond, tt.traceID, "", log.NewNopLogger())

require.NoError(t, err)
assert.Equal(t, tt.expected, data.TraceID)
Expand Down Expand Up @@ -619,3 +622,85 @@ func (m *mockResultStore) Close(context.Context) error {
m.closed = true
return nil
}

// mockResponseComparator implements ResponsesComparator for testing
type mockResponseComparator struct {
match bool
}

func (m *mockResponseComparator) Compare(_, _ []byte, _ time.Time) (*comparator.ComparisonSummary, error) {
if m.match {
return &comparator.ComparisonSummary{}, nil
}
return nil, errors.New("comparison failed")
}

func TestManager_StoreQuerySample_UsesComparatorResult(t *testing.T) {
tests := []struct {
name string
cellAHash string
cellBHash string
comparatorMatch bool
expectedStatus goldfish.ComparisonStatus
}{
{
name: "hash mismatch with tolerance match",
cellAHash: "hash1",
cellBHash: "hash2",
comparatorMatch: true,
expectedStatus: goldfish.ComparisonStatusMatch,
},
{
name: "hash mismatch without tolerance match",
cellAHash: "hash1",
cellBHash: "hash2",
comparatorMatch: false,
expectedStatus: goldfish.ComparisonStatusMismatch,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
storage := &mockStorage{}

mockComparator := &mockResponseComparator{
match: tt.comparatorMatch,
}

config := Config{
Enabled: true,
ResultsStorage: ResultsStorageConfig{
Mode: ResultsPersistenceModeAll,
},
PerformanceTolerance: 0.1,
}

manager, err := NewManager(config, mockComparator, storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)

// Create responses with different hashes (both return 200)
cellAResp := &ResponseData{
StatusCode: 200,
Hash: tt.cellAHash,
Body: []byte(`{"status":"success","data":{"result":[]}}`),
Stats: goldfish.QueryStats{ExecTimeMs: 100},
}
cellBResp := &ResponseData{
StatusCode: 200,
Hash: tt.cellBHash,
Body: []byte(`{"status":"success","data":{"result":[]}}`),
Stats: goldfish.QueryStats{ExecTimeMs: 100},
}

req, _ := http.NewRequest("GET", "/loki/api/v1/query_range?query=test", nil)

// Process the query pair
manager.processQueryPair(req, cellAResp, cellBResp)

// Verify the stored sample has the correct comparison status from the comparator
require.Len(t, storage.samples, 1)
assert.Equal(t, tt.expectedStatus, storage.samples[0].ComparisonStatus,
"comparison status should match what the comparator returned")
})
}
}
2 changes: 1 addition & 1 deletion tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ type mockGoldfishStorage struct {
results []goldfish.ComparisonResult
}

func (m *mockGoldfishStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample) error {
func (m *mockGoldfishStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample, _ *goldfish.ComparisonResult) error {
m.samples = append(m.samples, *sample)
return nil
}
Expand Down