Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/modules/observability/domain/task/entity/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ func (s *OnlineExptTurnEvalResult) GetRunID() (int64, error) {
}

func (s *OnlineExptTurnEvalResult) GetUserID() string {
if s.BaseInfo == nil || s.BaseInfo.UpdatedBy == nil {
if s.BaseInfo == nil || s.BaseInfo.CreatedBy == nil {
return ""
}
return s.BaseInfo.UpdatedBy.UserID
return s.BaseInfo.CreatedBy.UserID
}

type EvaluatorRunError struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ type ListSpansReq struct {
func (h *TraceHubServiceImpl) buildSpanFilters(ctx context.Context, taskConfig *entity.ObservabilityTask) *loop_span.FilterFields {
// More complex filters can be built based on the task configuration
// Simplified here: return nil to indicate no additional filters

platformFilter, err := h.buildHelper.BuildPlatformRelatedFilter(ctx, taskConfig.SpanFilter.PlatformType)
if err != nil {
logs.CtxError(ctx, "build platform filter failed, task_id=%d, err=%v", taskConfig.ID, err)
Expand All @@ -222,6 +221,10 @@ func (h *TraceHubServiceImpl) buildSpanFilters(ctx context.Context, taskConfig *
// 不需要重试
return nil
}
if err = taskConfig.SpanFilter.Filters.Traverse(processSpecificFilter); err != nil {
logs.CtxError(ctx, "traverse filter fields failed, task_id=%d, err=%v", taskConfig.ID, err)
return nil
}
filters := h.combineFilters(builtinFilter, &taskConfig.SpanFilter.Filters)

return filters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ package tracehub

import (
"context"
"fmt"
"strconv"

"github.com/bytedance/sonic"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-loop/backend/pkg/logs"
timeutil "github.com/coze-dev/coze-loop/backend/pkg/time"
)

func ToJSONString(ctx context.Context, obj interface{}) string {
Expand All @@ -27,3 +31,75 @@ func ToJSONString(ctx context.Context, obj interface{}) string {
func (h *TraceHubServiceImpl) getTenants(ctx context.Context, platform loop_span.PlatformType) ([]string, error) {
return h.tenantProvider.GetTenantsByPlatformType(ctx, platform)
}

// todo tyf TraceService里有相同实现,待合并
func processSpecificFilter(f *loop_span.FilterField) error {
if f == nil {
return nil
}
switch f.FieldName {
case loop_span.SpanFieldStatus:
if err := processStatusFilter(f); err != nil {
return err
}
case loop_span.SpanFieldDuration,
loop_span.SpanFieldLatencyFirstResp,
loop_span.SpanFieldStartTimeFirstResp,
loop_span.SpanFieldStartTimeFirstTokenResp,
loop_span.SpanFieldLatencyFirstTokenResp,
loop_span.SpanFieldReasoningDuration:
if err := processLatencyFilter(f); err != nil {
return err
}
}
return nil
}

func processStatusFilter(f *loop_span.FilterField) error {
if f.QueryType == nil || *f.QueryType != loop_span.QueryTypeEnumIn {
return fmt.Errorf("status filter should use in operator")
}
f.FieldName = loop_span.SpanFieldStatusCode
f.FieldType = loop_span.FieldTypeLong
checkSuccess, checkError := false, false
for _, val := range f.Values {
switch val {
case loop_span.SpanStatusSuccess:
checkSuccess = true
case loop_span.SpanStatusError:
checkError = true
default:
return fmt.Errorf("invalid status code field value")
}
}
if checkSuccess && checkError {
f.QueryType = ptr.Of(loop_span.QueryTypeEnumAlwaysTrue)
f.Values = nil
} else if checkSuccess {
f.Values = []string{"0"}
} else if checkError {
f.QueryType = ptr.Of(loop_span.QueryTypeEnumNotIn)
f.Values = []string{"0"}
} else {
return fmt.Errorf("invalid status code query")
}
return nil
}

// ms -> us
func processLatencyFilter(f *loop_span.FilterField) error {
if f.FieldType != loop_span.FieldTypeLong {
return fmt.Errorf("latency field type should be long ")
}
micros := make([]string, 0)
for _, val := range f.Values {
integer, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("fail to parse long value %s, %v", val, err)
}
integer = timeutil.MillSec2MicroSec(integer)
micros = append(micros, strconv.FormatInt(integer, 10))
}
f.Values = micros
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// Copyright (c) 2025 coze-dev Authors
// SPDX-License-Identifier: Apache-2.0

package tracehub

import (
"testing"

"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
"github.com/stretchr/testify/assert"
)

func TestProcessSpecificFilter_StatusFilter(t *testing.T) {
tests := []struct {
name string
filter *loop_span.FilterField
expectError bool
validate func(t *testing.T, result *loop_span.FilterField)
}{
{
name: "status filter with success and error - should convert to always true",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStatus,
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
Values: []string{loop_span.SpanStatusSuccess, loop_span.SpanStatusError},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName)
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
assert.Equal(t, loop_span.QueryTypeEnumAlwaysTrue, *result.QueryType)
assert.Nil(t, result.Values)
},
},
{
name: "status filter with only success - should convert to value 0",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStatus,
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
Values: []string{loop_span.SpanStatusSuccess},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName)
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
assert.Equal(t, loop_span.QueryTypeEnumIn, *result.QueryType)
assert.Equal(t, []string{"0"}, result.Values)
},
},
{
name: "status filter with only error - should convert to not in value 0",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStatus,
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
Values: []string{loop_span.SpanStatusError},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName)
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
assert.Equal(t, loop_span.QueryTypeEnumNotIn, *result.QueryType)
assert.Equal(t, []string{"0"}, result.Values)
},
},
{
name: "status filter without in operator - should return error",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStatus,
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumEq),
Values: []string{loop_span.SpanStatusSuccess},
},
expectError: true,
},
{
name: "status filter with invalid status value - should return error",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStatus,
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
Values: []string{"invalid_status"},
},
expectError: true,
},
{
name: "status filter with empty values - should return error",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStatus,
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
Values: []string{},
},
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Make a copy to avoid modifying the original
filterCopy := *tt.filter
err := processSpecificFilter(&filterCopy)

if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
if tt.validate != nil {
tt.validate(t, &filterCopy)
}
}
})
}
}

func TestProcessSpecificFilter_LatencyFilter(t *testing.T) {
tests := []struct {
name string
filter *loop_span.FilterField
expectError bool
validate func(t *testing.T, result *loop_span.FilterField)
}{
{
name: "duration filter - should convert ms to us",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldDuration,
FieldType: loop_span.FieldTypeLong,
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
Values: []string{"100", "200"},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, loop_span.SpanFieldDuration, result.FieldName)
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
assert.Equal(t, loop_span.QueryTypeEnumGte, *result.QueryType)
assert.Equal(t, []string{"100000", "200000"}, result.Values) // 100ms -> 100000us
},
},
{
name: "latency_first_resp filter - should convert ms to us",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldLatencyFirstResp,
FieldType: loop_span.FieldTypeLong,
QueryType: ptr.Of(loop_span.QueryTypeEnumLte),
Values: []string{"50"},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, loop_span.SpanFieldLatencyFirstResp, result.FieldName)
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
assert.Equal(t, loop_span.QueryTypeEnumLte, *result.QueryType)
assert.Equal(t, []string{"50000"}, result.Values) // 50ms -> 50000us
},
},
{
name: "start_time_first_resp filter - should convert ms to us",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStartTimeFirstResp,
FieldType: loop_span.FieldTypeLong,
QueryType: ptr.Of(loop_span.QueryTypeEnumEq),
Values: []string{"1000"},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, loop_span.SpanFieldStartTimeFirstResp, result.FieldName)
assert.Equal(t, []string{"1000000"}, result.Values) // 1000ms -> 1000000us
},
},
{
name: "start_time_first_token_resp filter - should convert ms to us",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldStartTimeFirstTokenResp,
FieldType: loop_span.FieldTypeLong,
QueryType: ptr.Of(loop_span.QueryTypeEnumGt),
Values: []string{"10"},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, []string{"10000"}, result.Values) // 10ms -> 10000us
},
},
{
name: "latency_first_token_resp filter - should convert ms to us",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldLatencyFirstTokenResp,
FieldType: loop_span.FieldTypeLong,
QueryType: ptr.Of(loop_span.QueryTypeEnumLt),
Values: []string{"5"},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, []string{"5000"}, result.Values) // 5ms -> 5000us
},
},
{
name: "reasoning_duration filter - should convert ms to us",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldReasoningDuration,
FieldType: loop_span.FieldTypeLong,
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
Values: []string{"30"},
},
expectError: false,
validate: func(t *testing.T, result *loop_span.FilterField) {
assert.Equal(t, []string{"30000"}, result.Values) // 30ms -> 30000us
},
},
{
name: "latency filter with wrong field type - should return error",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldDuration,
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
Values: []string{"100"},
},
expectError: true,
},
{
name: "latency filter with invalid value - should return error",
filter: &loop_span.FilterField{
FieldName: loop_span.SpanFieldDuration,
FieldType: loop_span.FieldTypeLong,
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
Values: []string{"invalid"},
},
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Make a copy to avoid modifying the original
filterCopy := *tt.filter
err := processSpecificFilter(&filterCopy)

if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
if tt.validate != nil {
tt.validate(t, &filterCopy)
}
}
})
}
}

func TestProcessSpecificFilter_UnknownField(t *testing.T) {
// Test with unknown field name - should not modify the filter
filter := &loop_span.FilterField{
FieldName: "unknown_field",
FieldType: loop_span.FieldTypeString,
QueryType: ptr.Of(loop_span.QueryTypeEnumEq),
Values: []string{"test"},
}

original := *filter
err := processSpecificFilter(filter)

assert.NoError(t, err)
assert.Equal(t, original.FieldName, filter.FieldName)
assert.Equal(t, original.FieldType, filter.FieldType)
assert.Equal(t, original.QueryType, filter.QueryType)
assert.Equal(t, original.Values, filter.Values)
}

func TestProcessSpecificFilter_NilFilter(t *testing.T) {
// Test with nil filter - should not panic
err := processSpecificFilter(nil)
assert.NoError(t, err)
}
Loading