Skip to content

Commit

Permalink
Adding ShowSQL test cases And Add Debug Information
Browse files Browse the repository at this point in the history
  • Loading branch information
duandaa committed Jun 21, 2024
1 parent 8093a7f commit 7e43b97
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 28 deletions.
8 changes: 5 additions & 3 deletions server/querier/app/prometheus/service/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/deepflowio/deepflow/server/querier/app/prometheus/model"
"github.com/deepflowio/deepflow/server/querier/common"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
chCommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
tagdescription "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/tag"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/view"
Expand Down Expand Up @@ -444,12 +445,13 @@ func showTags(ctx context.Context, db string, table string, startTime int64, end
var data *common.Result
var err error
var tagsArray []string
debug := &client.Debug{}
if db == "" || db == chCommon.DB_NAME_PROMETHEUS {
data, err = tagdescription.GetTagDescriptions(chCommon.DB_NAME_PROMETHEUS, PROMETHEUS_TABLE, fmt.Sprintf(showTags, chCommon.DB_NAME_PROMETHEUS, PROMETHEUS_TABLE, startTime, endTime), "", orgID, false, ctx)
data, err = tagdescription.GetTagDescriptions(chCommon.DB_NAME_PROMETHEUS, PROMETHEUS_TABLE, fmt.Sprintf(showTags, chCommon.DB_NAME_PROMETHEUS, PROMETHEUS_TABLE, startTime, endTime), "", orgID, false, ctx, debug)
} else if db == chCommon.DB_NAME_EXT_METRICS {
data, err = tagdescription.GetTagDescriptions(chCommon.DB_NAME_EXT_METRICS, EXT_METRICS_TABLE, fmt.Sprintf(showTags, chCommon.DB_NAME_EXT_METRICS, EXT_METRICS_TABLE, startTime, endTime), "", orgID, false, ctx)
data, err = tagdescription.GetTagDescriptions(chCommon.DB_NAME_EXT_METRICS, EXT_METRICS_TABLE, fmt.Sprintf(showTags, chCommon.DB_NAME_EXT_METRICS, EXT_METRICS_TABLE, startTime, endTime), "", orgID, false, ctx, debug)
} else {
data, err = tagdescription.GetTagDescriptions(db, table, fmt.Sprintf(showTags, db, table, startTime, endTime), "", orgID, false, ctx)
data, err = tagdescription.GetTagDescriptions(db, table, fmt.Sprintf(showTags, db, table, startTime, endTime), "", orgID, false, ctx, debug)
}
if err != nil || data == nil {
return tagsArray, err
Expand Down
5 changes: 4 additions & 1 deletion server/querier/app/prometheus/service/label_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/deepflowio/deepflow/server/querier/app/prometheus/model"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
chCommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/metrics"
)
Expand Down Expand Up @@ -71,7 +72,9 @@ func getMetrics(ctx context.Context, args *model.PromMetaParams) (resp []string)
}
} else if db == chCommon.DB_NAME_PROMETHEUS {
// prometheus samples should get all metrcis from `table`
samples := clickhouse.GetTables(db, "", args.OrgID, false, ctx)
//To delete
debug := &client.Debug{}
samples := clickhouse.GetTables(db, "", args.OrgID, false, ctx, debug)
for _, v := range samples.Values {
tableName := v.([]interface{})[0].(string)
// append ${metrics_name}
Expand Down
4 changes: 3 additions & 1 deletion server/querier/app/prometheus/service/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/deepflowio/deepflow/server/querier/app/prometheus/cache"
"github.com/deepflowio/deepflow/server/querier/app/prometheus/model"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
chCommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
tagdescription "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/tag"
)
Expand Down Expand Up @@ -859,7 +860,8 @@ func (p *prometheusExecutor) getAllOrganizations() []string {
func (p *prometheusExecutor) loadExtraLabelsCache(orgID string) {
// DeepFlow Source have same tag collections, so just try query 1 table to add all external tags
showTags := fmt.Sprintf("show tags from %s", NETWORK_TABLE)
data, err := tagdescription.GetTagDescriptions(chCommon.DB_NAME_FLOW_METRICS, NETWORK_TABLE, showTags, "", orgID, false, context.Background())
debug := &client.Debug{}
data, err := tagdescription.GetTagDescriptions(chCommon.DB_NAME_FLOW_METRICS, NETWORK_TABLE, showTags, "", orgID, false, context.Background(), debug)
if err != nil {
log.Errorf("load external tag error when start up prometheus executor: %s", err)
return
Expand Down
26 changes: 14 additions & 12 deletions server/querier/engine/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,20 @@ func (e *CHEngine) ExecuteQuery(args *common.QuerierParams) (*common.Result, map
return slimitResult, slimitDebug, err
}
// Parse showSql
result, sqlList, isShow, err := e.ParseShowSql(sql, args)
debug := &client.Debug{
IP: config.Cfg.Clickhouse.Host,
QueryUUID: query_uuid,
}
// For testing purposes, ParseShowSql requires the addition of the debug parameter
result, sqlList, isShow, err := e.ParseShowSql(sql, args, debug)
if isShow {
if err != nil {
return nil, nil, err
}
if len(sqlList) == 0 {
return result, nil, nil
return result, debug.Get(), nil
}
}
debug := &client.Debug{
IP: config.Cfg.Clickhouse.Host,
QueryUUID: query_uuid,
}

if len(sqlList) > 0 {
e.DB = "flow_tag"
Expand Down Expand Up @@ -302,7 +303,7 @@ func ShowTagTypeMetrics(tagDescriptions, result *common.Result, db, table string
}
}

func (e *CHEngine) ParseShowSql(sql string, args *common.QuerierParams) (*common.Result, []string, bool, error) {
func (e *CHEngine) ParseShowSql(sql string, args *common.QuerierParams, debug *client.Debug) (*common.Result, []string, bool, error) {
sqlSplit := strings.Fields(sql)
if strings.ToLower(sqlSplit[0]) != "show" {
return nil, []string{}, false, nil
Expand Down Expand Up @@ -346,14 +347,14 @@ func (e *CHEngine) ParseShowSql(sql string, args *common.QuerierParams) (*common
if len(sqlSplit) > 2 && strings.ToLower(sqlSplit[2]) == "functions" {
funcs, err := metrics.GetFunctionDescriptions()
return funcs, []string{}, true, err
} else {
} else { //to debug
result, err := metrics.GetMetricsDescriptions(e.DB, table, where, args.QueryCacheTTL, args.ORGID, args.UseQueryCache, e.Context)
if err != nil {
return nil, []string{}, true, err
}

// tag metrics
tagDescriptions, err := tag.GetTagDescriptions(e.DB, table, sql, "", e.ORGID, true, e.Context)
tagDescriptions, err := tag.GetTagDescriptions(e.DB, table, sql, "", e.ORGID, true, e.Context, debug)
if err != nil {
log.Error("Failed to get tag type metrics")
return nil, []string{}, true, err
Expand All @@ -373,10 +374,10 @@ func (e *CHEngine) ParseShowSql(sql string, args *common.QuerierParams) (*common
}
return nil, []string{}, true, fmt.Errorf("parse show sql error, sql: '%s' not support", sql)
case "tags":
data, err := tagdescription.GetTagDescriptions(e.DB, table, sql, args.QueryCacheTTL, args.ORGID, args.UseQueryCache, e.Context)
data, err := tagdescription.GetTagDescriptions(e.DB, table, sql, args.QueryCacheTTL, args.ORGID, args.UseQueryCache, e.Context, debug)
return data, []string{}, true, err
case "tables":
return GetTables(e.DB, args.QueryCacheTTL, args.ORGID, args.UseQueryCache, e.Context), []string{}, true, nil
return GetTables(e.DB, args.QueryCacheTTL, args.ORGID, args.UseQueryCache, e.Context, debug), []string{}, true, nil
case "databases":
return GetDatabases(), []string{}, true, nil
case "tag-values":
Expand Down Expand Up @@ -505,9 +506,10 @@ func (e *CHEngine) ParseSlimitSql(sql string, args *common.QuerierParams) (strin
}
}
}
debug := &client.Debug{}

showTagsSql := "show tags from " + table
tags, _, _, err := e.ParseShowSql(showTagsSql, args)
tags, _, _, err := e.ParseShowSql(showTagsSql, args, debug)
if err != nil {
return "", nil, nil, err
} else if len(tags.Values) == 0 {
Expand Down
3 changes: 2 additions & 1 deletion server/querier/engine/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,8 @@ func TestGetSql(t *testing.T) {
} else {
input := pcase.input
if strings.HasPrefix(pcase.input, "SHOW") {
_, sqlList, _, err1 := e.ParseShowSql(pcase.input, args)
debug := &client.Debug{}
_, sqlList, _, err1 := e.ParseShowSql(pcase.input, args, debug)
err = err1
input = sqlList[0]
}
Expand Down
7 changes: 5 additions & 2 deletions server/querier/engine/clickhouse/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func GetDatasourceInterval(db string, table string, name string, orgID string) (
return int(body["DATA"].([]interface{})[0].(map[string]interface{})["INTERVAL"].(float64)), nil
}

func GetExtTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context) (values []interface{}) {
func GetExtTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context, debug *client.Debug) (values []interface{}) {
chClient := client.Client{
Host: config.Cfg.Clickhouse.Host,
Port: config.Cfg.Clickhouse.Port,
Expand All @@ -218,6 +218,8 @@ func GetExtTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx conte
} else {
sql = "SHOW TABLES FROM " + db
}
// to debug
debug.Sql += sql
rst, err := chClient.DoQuery(&client.QueryParams{Sql: sql, UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
if err != nil {
log.Error(err)
Expand All @@ -233,7 +235,7 @@ func GetExtTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx conte
return values
}

func GetPrometheusTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context) (values []interface{}) {
func GetPrometheusTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context, debug *client.Debug) (values []interface{}) {
chClient := client.Client{
Host: config.Cfg.Clickhouse.Host,
Port: config.Cfg.Clickhouse.Port,
Expand All @@ -249,6 +251,7 @@ func GetPrometheusTables(db, queryCacheTTL, orgID string, useQueryCache bool, ct
} else {
sql = "SHOW TABLES FROM " + db
}
debug.Sql += sql
rst, err := chClient.DoQuery(&client.QueryParams{Sql: sql, UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
if err != nil {
log.Error(err)
Expand Down
8 changes: 6 additions & 2 deletions server/querier/engine/clickhouse/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/deepflowio/deepflow/server/querier/common"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
ckcommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/tag"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/trans_prometheus"
Expand Down Expand Up @@ -238,7 +239,8 @@ func GetMetrics(field, db, table, orgID string) (*Metrics, bool) {
}

// tag metrics
tagDescriptions, err := tag.GetTagDescriptions(db, table, "", "", orgID, true, context.Background())
debug := &client.Debug{}
tagDescriptions, err := tag.GetTagDescriptions(db, table, "", "", orgID, true, context.Background(), debug)
if err != nil {
log.Error("Failed to get tag type metrics")
return nil, false
Expand Down Expand Up @@ -429,7 +431,9 @@ func GetMetricsDescriptions(db, table, where, queryCacheTTL, orgID string, useQu
if db == "ext_metrics" {
tables = append(tables, table)
} else if slices.Contains([]string{ckcommon.DB_NAME_DEEPFLOW_ADMIN, ckcommon.DB_NAME_DEEPFLOW_TENANT}, db) {
for _, extTables := range ckcommon.GetExtTables(db, queryCacheTTL, orgID, useQueryCache, ctx) {
//To delete
debug := &client.Debug{}
for _, extTables := range ckcommon.GetExtTables(db, queryCacheTTL, orgID, useQueryCache, ctx, debug) {
for i, extTable := range extTables.([]interface{}) {
if i == 0 {
tables = append(tables, extTable)
Expand Down
7 changes: 4 additions & 3 deletions server/querier/engine/clickhouse/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"golang.org/x/exp/slices"

"github.com/deepflowio/deepflow/server/querier/common"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
chCommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
)

Expand All @@ -36,16 +37,16 @@ func GetDatabases() *common.Result {
}
}

func GetTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context) *common.Result {
func GetTables(db, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context, debug *client.Debug) *common.Result {
var values []interface{}
tables, ok := chCommon.DB_TABLE_MAP[db]
if !ok {
return nil
}
if slices.Contains([]string{chCommon.DB_NAME_DEEPFLOW_ADMIN, chCommon.DB_NAME_EXT_METRICS, chCommon.DB_NAME_DEEPFLOW_TENANT}, db) {
values = append(values, chCommon.GetExtTables(db, queryCacheTTL, orgID, useQueryCache, ctx)...)
values = append(values, chCommon.GetExtTables(db, queryCacheTTL, orgID, useQueryCache, ctx, debug)...)
} else if db == chCommon.DB_NAME_PROMETHEUS {
values = append(values, chCommon.GetPrometheusTables(db, queryCacheTTL, orgID, useQueryCache, ctx)...)
values = append(values, chCommon.GetPrometheusTables(db, queryCacheTTL, orgID, useQueryCache, ctx, debug)...)
} else {
for _, table := range tables {
datasource, err := chCommon.GetDatasources(db, table, orgID)
Expand Down
14 changes: 11 additions & 3 deletions server/querier/engine/clickhouse/tag/description.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func LoadTagDescriptions(tagData map[string]interface{}) error {
return nil
}

func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context) (response *common.Result, err error) {
func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context, debug *client.Debug) (response *common.Result, err error) {
// 把`1m`的反引号去掉
table = strings.Trim(table, "`")
response = &common.Result{
Expand Down Expand Up @@ -650,6 +650,7 @@ func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQuery
Context: ctx,
}
k8sLabelSql := "SELECT key FROM (SELECT key FROM flow_tag.pod_service_k8s_label_map UNION ALL SELECT key FROM flow_tag.pod_k8s_label_map) GROUP BY key"
debug.Sql += k8sLabelSql
k8sLabelRst, err := chClient.DoQuery(&client.QueryParams{Sql: k8sLabelSql, UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
if err != nil {
return nil, err
Expand All @@ -671,8 +672,10 @@ func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQuery
}

// 查询 k8s_annotation
k8sAnnotationSql := "SELECT key FROM (SELECT key FROM flow_tag.pod_k8s_annotation_map UNION ALL SELECT key FROM flow_tag.pod_service_k8s_annotation_map) GROUP BY key"
debug.Sql += k8sAnnotationSql
k8sAnnotationRst, err := chClient.DoQuery(&client.QueryParams{
Sql: "SELECT key FROM (SELECT key FROM flow_tag.pod_k8s_annotation_map UNION ALL SELECT key FROM flow_tag.pod_service_k8s_annotation_map) GROUP BY key",
Sql: k8sAnnotationSql,
UseQueryCache: useQueryCache,
QueryCacheTTL: queryCacheTTL,
ORGID: orgID,
Expand All @@ -697,8 +700,10 @@ func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQuery
}

// 查询 k8s_env
podK8senvSql := "SELECT key FROM flow_tag.pod_k8s_env_map GROUP BY key"
debug.Sql += podK8senvSql
podK8senvRst, err := chClient.DoQuery(&client.QueryParams{
Sql: "SELECT key FROM flow_tag.pod_k8s_env_map GROUP BY key", UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
Sql: podK8senvSql, UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
if err != nil {
return nil, err
}
Expand All @@ -720,6 +725,7 @@ func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQuery

// 查询cloud.tag
cloudTagSql := "SELECT key FROM (SELECT key FROM flow_tag.chost_cloud_tag_map UNION ALL SELECT key FROM flow_tag.pod_ns_cloud_tag_map) GROUP BY key"
debug.Sql += cloudTagSql
cloudTagRst, err := chClient.DoQuery(&client.QueryParams{Sql: cloudTagSql, UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
if err != nil {
return nil, err
Expand All @@ -742,6 +748,7 @@ func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQuery

// 查询 os.app
osAPPTagSql := "SELECT key FROM flow_tag.os_app_tag_map GROUP BY key"
debug.Sql += osAPPTagSql
osAPPTagRst, err := chClient.DoQuery(&client.QueryParams{Sql: osAPPTagSql, UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
if err != nil {
return nil, err
Expand Down Expand Up @@ -834,6 +841,7 @@ func GetTagDescriptions(db, table, rawSql, queryCacheTTL, orgID string, useQuery
externalSql = fmt.Sprintf("SELECT field_name AS tag_name, table FROM flow_tag.%s_custom_field WHERE table='%s' AND field_type='tag' GROUP BY tag_name, table ORDER BY tag_name ASC LIMIT %s", db, table, limit)
}
}
debug.Sql += externalSql
externalRst, err := externalChClient.DoQuery(&client.QueryParams{Sql: externalSql, UseQueryCache: useQueryCache, QueryCacheTTL: queryCacheTTL, ORGID: orgID})
if err != nil {
return nil, err
Expand Down

0 comments on commit 7e43b97

Please sign in to comment.