diff --git a/backend/modules/observability/domain/task/entity/event.go b/backend/modules/observability/domain/task/entity/event.go index 6a31447ae..2fefb720c 100644 --- a/backend/modules/observability/domain/task/entity/event.go +++ b/backend/modules/observability/domain/task/entity/event.go @@ -272,10 +272,10 @@ func (s *OnlineExptTurnEvalResult) GetRunID() (int64, error) { } func (s *OnlineExptTurnEvalResult) GetUserID() string { - if s.BaseInfo == nil || s.BaseInfo.UpdatedBy == nil { + if s.BaseInfo == nil || s.BaseInfo.CreatedBy == nil { return "" } - return s.BaseInfo.UpdatedBy.UserID + return s.BaseInfo.CreatedBy.UserID } type EvaluatorRunError struct { diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index 68faa7682..881e6a8b3 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -206,7 +206,6 @@ type ListSpansReq struct { func (h *TraceHubServiceImpl) buildSpanFilters(ctx context.Context, taskConfig *entity.ObservabilityTask) *loop_span.FilterFields { // More complex filters can be built based on the task configuration // Simplified here: return nil to indicate no additional filters - platformFilter, err := h.buildHelper.BuildPlatformRelatedFilter(ctx, taskConfig.SpanFilter.PlatformType) if err != nil { logs.CtxError(ctx, "build platform filter failed, task_id=%d, err=%v", taskConfig.ID, err) @@ -222,6 +221,10 @@ func (h *TraceHubServiceImpl) buildSpanFilters(ctx context.Context, taskConfig * // 不需要重试 return nil } + if err = taskConfig.SpanFilter.Filters.Traverse(processSpecificFilter); err != nil { + logs.CtxError(ctx, "traverse filter fields failed, task_id=%d, err=%v", taskConfig.ID, err) + return nil + } filters := h.combineFilters(builtinFilter, &taskConfig.SpanFilter.Filters) return filters diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/utils.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/utils.go index 9cb087bc7..81ad99e60 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/utils.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/utils.go @@ -5,10 +5,14 @@ package tracehub import ( "context" + "fmt" + "strconv" "github.com/bytedance/sonic" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" + "github.com/coze-dev/coze-loop/backend/pkg/lang/ptr" "github.com/coze-dev/coze-loop/backend/pkg/logs" + timeutil "github.com/coze-dev/coze-loop/backend/pkg/time" ) func ToJSONString(ctx context.Context, obj interface{}) string { @@ -27,3 +31,75 @@ func ToJSONString(ctx context.Context, obj interface{}) string { func (h *TraceHubServiceImpl) getTenants(ctx context.Context, platform loop_span.PlatformType) ([]string, error) { return h.tenantProvider.GetTenantsByPlatformType(ctx, platform) } + +// todo tyf TraceService里有相同实现,待合并 +func processSpecificFilter(f *loop_span.FilterField) error { + if f == nil { + return nil + } + switch f.FieldName { + case loop_span.SpanFieldStatus: + if err := processStatusFilter(f); err != nil { + return err + } + case loop_span.SpanFieldDuration, + loop_span.SpanFieldLatencyFirstResp, + loop_span.SpanFieldStartTimeFirstResp, + loop_span.SpanFieldStartTimeFirstTokenResp, + loop_span.SpanFieldLatencyFirstTokenResp, + loop_span.SpanFieldReasoningDuration: + if err := processLatencyFilter(f); err != nil { + return err + } + } + return nil +} + +func processStatusFilter(f *loop_span.FilterField) error { + if f.QueryType == nil || *f.QueryType != loop_span.QueryTypeEnumIn { + return fmt.Errorf("status filter should use in operator") + } + f.FieldName = loop_span.SpanFieldStatusCode + f.FieldType = loop_span.FieldTypeLong + checkSuccess, checkError := false, false + for _, val := range f.Values { + switch val { + case loop_span.SpanStatusSuccess: + checkSuccess = true + case loop_span.SpanStatusError: + checkError = true + default: + return fmt.Errorf("invalid status code field value") + } + } + if checkSuccess && checkError { + f.QueryType = ptr.Of(loop_span.QueryTypeEnumAlwaysTrue) + f.Values = nil + } else if checkSuccess { + f.Values = []string{"0"} + } else if checkError { + f.QueryType = ptr.Of(loop_span.QueryTypeEnumNotIn) + f.Values = []string{"0"} + } else { + return fmt.Errorf("invalid status code query") + } + return nil +} + +// ms -> us +func processLatencyFilter(f *loop_span.FilterField) error { + if f.FieldType != loop_span.FieldTypeLong { + return fmt.Errorf("latency field type should be long ") + } + micros := make([]string, 0) + for _, val := range f.Values { + integer, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return fmt.Errorf("fail to parse long value %s, %v", val, err) + } + integer = timeutil.MillSec2MicroSec(integer) + micros = append(micros, strconv.FormatInt(integer, 10)) + } + f.Values = micros + return nil +} diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/utils_test.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/utils_test.go new file mode 100644 index 000000000..47d5e0fd5 --- /dev/null +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/utils_test.go @@ -0,0 +1,274 @@ +// Copyright (c) 2025 coze-dev Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracehub + +import ( + "testing" + + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" + "github.com/coze-dev/coze-loop/backend/pkg/lang/ptr" + "github.com/stretchr/testify/assert" +) + +func TestProcessSpecificFilter_StatusFilter(t *testing.T) { + tests := []struct { + name string + filter *loop_span.FilterField + expectError bool + validate func(t *testing.T, result *loop_span.FilterField) + }{ + { + name: "status filter with success and error - should convert to always true", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStatus, + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + Values: []string{loop_span.SpanStatusSuccess, loop_span.SpanStatusError}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName) + assert.Equal(t, loop_span.FieldTypeLong, result.FieldType) + assert.Equal(t, loop_span.QueryTypeEnumAlwaysTrue, *result.QueryType) + assert.Nil(t, result.Values) + }, + }, + { + name: "status filter with only success - should convert to value 0", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStatus, + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + Values: []string{loop_span.SpanStatusSuccess}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName) + assert.Equal(t, loop_span.FieldTypeLong, result.FieldType) + assert.Equal(t, loop_span.QueryTypeEnumIn, *result.QueryType) + assert.Equal(t, []string{"0"}, result.Values) + }, + }, + { + name: "status filter with only error - should convert to not in value 0", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStatus, + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + Values: []string{loop_span.SpanStatusError}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName) + assert.Equal(t, loop_span.FieldTypeLong, result.FieldType) + assert.Equal(t, loop_span.QueryTypeEnumNotIn, *result.QueryType) + assert.Equal(t, []string{"0"}, result.Values) + }, + }, + { + name: "status filter without in operator - should return error", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStatus, + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumEq), + Values: []string{loop_span.SpanStatusSuccess}, + }, + expectError: true, + }, + { + name: "status filter with invalid status value - should return error", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStatus, + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + Values: []string{"invalid_status"}, + }, + expectError: true, + }, + { + name: "status filter with empty values - should return error", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStatus, + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + Values: []string{}, + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Make a copy to avoid modifying the original + filterCopy := *tt.filter + err := processSpecificFilter(&filterCopy) + + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if tt.validate != nil { + tt.validate(t, &filterCopy) + } + } + }) + } +} + +func TestProcessSpecificFilter_LatencyFilter(t *testing.T) { + tests := []struct { + name string + filter *loop_span.FilterField + expectError bool + validate func(t *testing.T, result *loop_span.FilterField) + }{ + { + name: "duration filter - should convert ms to us", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + QueryType: ptr.Of(loop_span.QueryTypeEnumGte), + Values: []string{"100", "200"}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, loop_span.SpanFieldDuration, result.FieldName) + assert.Equal(t, loop_span.FieldTypeLong, result.FieldType) + assert.Equal(t, loop_span.QueryTypeEnumGte, *result.QueryType) + assert.Equal(t, []string{"100000", "200000"}, result.Values) // 100ms -> 100000us + }, + }, + { + name: "latency_first_resp filter - should convert ms to us", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldLatencyFirstResp, + FieldType: loop_span.FieldTypeLong, + QueryType: ptr.Of(loop_span.QueryTypeEnumLte), + Values: []string{"50"}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, loop_span.SpanFieldLatencyFirstResp, result.FieldName) + assert.Equal(t, loop_span.FieldTypeLong, result.FieldType) + assert.Equal(t, loop_span.QueryTypeEnumLte, *result.QueryType) + assert.Equal(t, []string{"50000"}, result.Values) // 50ms -> 50000us + }, + }, + { + name: "start_time_first_resp filter - should convert ms to us", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStartTimeFirstResp, + FieldType: loop_span.FieldTypeLong, + QueryType: ptr.Of(loop_span.QueryTypeEnumEq), + Values: []string{"1000"}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, loop_span.SpanFieldStartTimeFirstResp, result.FieldName) + assert.Equal(t, []string{"1000000"}, result.Values) // 1000ms -> 1000000us + }, + }, + { + name: "start_time_first_token_resp filter - should convert ms to us", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldStartTimeFirstTokenResp, + FieldType: loop_span.FieldTypeLong, + QueryType: ptr.Of(loop_span.QueryTypeEnumGt), + Values: []string{"10"}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, []string{"10000"}, result.Values) // 10ms -> 10000us + }, + }, + { + name: "latency_first_token_resp filter - should convert ms to us", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldLatencyFirstTokenResp, + FieldType: loop_span.FieldTypeLong, + QueryType: ptr.Of(loop_span.QueryTypeEnumLt), + Values: []string{"5"}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, []string{"5000"}, result.Values) // 5ms -> 5000us + }, + }, + { + name: "reasoning_duration filter - should convert ms to us", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldReasoningDuration, + FieldType: loop_span.FieldTypeLong, + QueryType: ptr.Of(loop_span.QueryTypeEnumGte), + Values: []string{"30"}, + }, + expectError: false, + validate: func(t *testing.T, result *loop_span.FilterField) { + assert.Equal(t, []string{"30000"}, result.Values) // 30ms -> 30000us + }, + }, + { + name: "latency filter with wrong field type - should return error", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumGte), + Values: []string{"100"}, + }, + expectError: true, + }, + { + name: "latency filter with invalid value - should return error", + filter: &loop_span.FilterField{ + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + QueryType: ptr.Of(loop_span.QueryTypeEnumGte), + Values: []string{"invalid"}, + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Make a copy to avoid modifying the original + filterCopy := *tt.filter + err := processSpecificFilter(&filterCopy) + + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if tt.validate != nil { + tt.validate(t, &filterCopy) + } + } + }) + } +} + +func TestProcessSpecificFilter_UnknownField(t *testing.T) { + // Test with unknown field name - should not modify the filter + filter := &loop_span.FilterField{ + FieldName: "unknown_field", + FieldType: loop_span.FieldTypeString, + QueryType: ptr.Of(loop_span.QueryTypeEnumEq), + Values: []string{"test"}, + } + + original := *filter + err := processSpecificFilter(filter) + + assert.NoError(t, err) + assert.Equal(t, original.FieldName, filter.FieldName) + assert.Equal(t, original.FieldType, filter.FieldType) + assert.Equal(t, original.QueryType, filter.QueryType) + assert.Equal(t, original.Values, filter.Values) +} + +func TestProcessSpecificFilter_NilFilter(t *testing.T) { + // Test with nil filter - should not panic + err := processSpecificFilter(nil) + assert.NoError(t, err) +}