From 51b3f8f616e9129348311df66d9acc5e56270bf7 Mon Sep 17 00:00:00 2001 From: zzzk1 Date: Thu, 13 Feb 2025 21:21:31 +0800 Subject: [PATCH] [feat][storage]: add write path for ClickHouse based on Jaeger V2 Signed-off-by: zzzk1 --- .github/workflows/ci-e2e-all.yml | 2 + .github/workflows/ci-e2e-clickhouse.yml | 49 ++++ .mockery.yaml | 5 + docker-compose/clickhouse/docker-compose.yml | 11 + docker-compose/clickhouse/init.sql | 48 ++++ go.mod | 16 +- go.sum | 42 +++- .../storage/integration/clickhouse_test.go | 47 ++++ internal/storage/v2/clickhouse/factory.go | 47 ++++ .../storage/v2/clickhouse/factory_test.go | 45 ++++ .../storage/v2/clickhouse/package_test.go | 14 ++ internal/storage/v2/clickhouse/writer.go | 72 ++++++ internal/storage/v2/clickhouse/writer_test.go | 134 +++++++++++ pkg/clickhouse/client.go | 24 ++ pkg/clickhouse/config/config.go | 93 ++++++++ pkg/clickhouse/config/config_test.go | 40 ++++ pkg/clickhouse/empty_test.go | 14 ++ pkg/clickhouse/internal/traces.go | 213 ++++++++++++++++++ pkg/clickhouse/internal/traces_test.go | 71 ++++++ pkg/clickhouse/mocks/Client.go | 106 +++++++++ pkg/clickhouse/wrapper/.nocover | 1 + pkg/clickhouse/wrapper/empty_test.go | 14 ++ pkg/clickhouse/wrapper/wrapper.go | 58 +++++ scripts/e2e/clickhouse.sh | 51 +++++ 24 files changed, 1214 insertions(+), 3 deletions(-) create mode 100644 .github/workflows/ci-e2e-clickhouse.yml create mode 100644 docker-compose/clickhouse/docker-compose.yml create mode 100644 docker-compose/clickhouse/init.sql create mode 100644 internal/storage/integration/clickhouse_test.go create mode 100644 internal/storage/v2/clickhouse/factory.go create mode 100644 internal/storage/v2/clickhouse/factory_test.go create mode 100644 internal/storage/v2/clickhouse/package_test.go create mode 100644 internal/storage/v2/clickhouse/writer.go create mode 100644 internal/storage/v2/clickhouse/writer_test.go create mode 100644 pkg/clickhouse/client.go create mode 100644 pkg/clickhouse/config/config.go create mode 100644 pkg/clickhouse/config/config_test.go create mode 100644 pkg/clickhouse/empty_test.go create mode 100644 pkg/clickhouse/internal/traces.go create mode 100644 pkg/clickhouse/internal/traces_test.go create mode 100644 pkg/clickhouse/mocks/Client.go create mode 100644 pkg/clickhouse/wrapper/.nocover create mode 100644 pkg/clickhouse/wrapper/empty_test.go create mode 100644 pkg/clickhouse/wrapper/wrapper.go create mode 100644 scripts/e2e/clickhouse.sh diff --git a/.github/workflows/ci-e2e-all.yml b/.github/workflows/ci-e2e-all.yml index b3cb3e0ad2e..d1c2e574e3d 100644 --- a/.github/workflows/ci-e2e-all.yml +++ b/.github/workflows/ci-e2e-all.yml @@ -37,3 +37,5 @@ jobs: opensearch: uses: ./.github/workflows/ci-e2e-opensearch.yml + clickhouse: + uses: ./.github/workflows/ci-e2e-clickhouse.yml \ No newline at end of file diff --git a/.github/workflows/ci-e2e-clickhouse.yml b/.github/workflows/ci-e2e-clickhouse.yml new file mode 100644 index 00000000000..1fd20e951ac --- /dev/null +++ b/.github/workflows/ci-e2e-clickhouse.yml @@ -0,0 +1,49 @@ +name: CIT ClickHouse + +on: + workflow_call: + +concurrency: + group: cit-kafka-${{ github.workflow }}-${{ (github.event.pull_request && github.event.pull_request.number) || github.ref || github.run_id }} + cancel-in-progress: true + +# See https://github.com/ossf/scorecard/blob/main/docs/checks.md#token-permissions +permissions: # added using https://github.com/step-security/secure-workflows + contents: read + +jobs: + clickhouse: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + jaeger-version: [v2] + clickhouse-version: ["25.x"] + name: clickhouse ${{matrix.clickhouse-version }} ${{ matrix.jaeger-version }} + steps: + - name: Harden Runner + uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1 + with: + egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs + + - uses: actions/checkout@d632683dd7b4114ad314bca15554477dd762a938 # v4.2.0 + + - uses: actions/setup-go@f111f3307d8850f501ac008e886eec1fd1932a34 # v5.3.0 + with: + go-version: 1.24.x + + - name: Run clickhouse integration tests + id: test-execution + run: bash scripts/e2e/clickhouse.sh + + - uses: ./.github/actions/verify-metrics-snapshot + if: matrix.jaeger-version == 'v2' + with: + snapshot: metrics_snapshot_clickhouse + artifact_key: metrics_snapshot_clickhouse_${{ matrix.jaeger-version }} + + - name: Upload coverage to codecov + uses: ./.github/actions/upload-codecov + with: + files: cover.out + flags: clickhouse-${{ matrix.clickhouse-version }}-${{ matrix.jaeger-version }} diff --git a/.mockery.yaml b/.mockery.yaml index 38434d0f32f..94c55d1b8d8 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -42,6 +42,11 @@ packages: github.com/jaegertracing/jaeger/pkg/es/client: config: all: true + github.com/jaegertracing/jaeger/pkg/clickhouse: + config: + all: true + interfaces: + Client: github.com/jaegertracing/jaeger/pkg/kafka/consumer: interfaces: Consumer: diff --git a/docker-compose/clickhouse/docker-compose.yml b/docker-compose/clickhouse/docker-compose.yml new file mode 100644 index 00000000000..be2199cd15b --- /dev/null +++ b/docker-compose/clickhouse/docker-compose.yml @@ -0,0 +1,11 @@ +services: + clickhouse: + container_name: clickhouse + image: bitnami/clickhouse:25.1.3 + environment: + CLICKHOUSE_USER: "default" + CLICKHOUSE_PASSWORD: "default" + ports: + - 9000:9000 + volumes: + - ./init.sql:/docker-entrypoint-initdb.d/init.sql \ No newline at end of file diff --git a/docker-compose/clickhouse/init.sql b/docker-compose/clickhouse/init.sql new file mode 100644 index 00000000000..48174216cb5 --- /dev/null +++ b/docker-compose/clickhouse/init.sql @@ -0,0 +1,48 @@ +CREATE DATABASE IF NOT EXISTS jaeger; + +CREATE TABLE IF NOT EXISTS jaeger.otel_traces ( + Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), + TraceId String CODEC(ZSTD(1)), + SpanId String CODEC(ZSTD(1)), + ParentSpanId String CODEC(ZSTD(1)), + TraceState String CODEC(ZSTD(1)), + SpanName LowCardinality(String) CODEC(ZSTD(1)), + SpanKind LowCardinality(String) CODEC(ZSTD(1)), + ServiceName LowCardinality(String) CODEC(ZSTD(1)), + ResourceAttributes Nested +( + keys LowCardinality(String), + values String + ) CODEC (ZSTD(1)), + ScopeName String CODEC(ZSTD(1)), + ScopeVersion String CODEC(ZSTD(1)), + SpanAttributes Nested +( + keys LowCardinality(String), + values String + ) CODEC (ZSTD(1)), + Duration UInt64 CODEC(ZSTD(1)), + StatusCode LowCardinality(String) CODEC(ZSTD(1)), + StatusMessage String CODEC(ZSTD(1)), + Events Nested ( + Timestamp DateTime64(9), + Name LowCardinality(String), + Attributes Map(LowCardinality(String), String) + ) CODEC(ZSTD(1)), + Links Nested ( + TraceId String, + SpanId String, + TraceState String, + Attributes Map(LowCardinality(String), String) + ) CODEC(ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_res_attr_key ResourceAttributes.keys TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_value ResourceAttributes.values TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_key SpanAttributes.keys TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_value SpanAttributes.values TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_duration Duration TYPE minmax GRANULARITY 1 + ) ENGINE MergeTree() + + PARTITION BY toDate(Timestamp) + ORDER BY (ServiceName, SpanName, toUnixTimestamp(Timestamp), SpanAttributes.keys, SpanAttributes.values, Duration, TraceId) + SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; \ No newline at end of file diff --git a/go.mod b/go.mod index 2827fa996a4..fdee3e7b449 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.23.7 toolchain go1.24.0 require ( + github.com/ClickHouse/ch-go v0.64.1 + github.com/ClickHouse/clickhouse-go/v2 v2.31.0 github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/Shopify/sarama v1.37.2 github.com/apache/thrift v0.21.0 @@ -105,10 +107,22 @@ require ( ) require ( + github.com/andybalholm/brotli v1.1.1 // indirect + github.com/dmarkham/enumer v1.5.10 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.7.1 // indirect github.com/gogo/googleapis v1.4.1 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils v0.119.0 // indirect + github.com/pascaldekloe/name v1.0.1 // indirect + github.com/paulmach/orb v0.11.1 // indirect + github.com/segmentio/asm v1.2.0 // indirect + github.com/shopspring/decimal v1.4.0 // indirect go.opentelemetry.io/collector/extension/xextension v0.119.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.10.0 // indirect + golang.org/x/mod v0.19.0 // indirect + golang.org/x/sync v0.11.0 // indirect + golang.org/x/tools v0.23.0 // indirect ) require ( @@ -264,7 +278,7 @@ require ( go.opentelemetry.io/collector/processor/xprocessor v0.119.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.119.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect - go.opentelemetry.io/collector/semconv v0.119.0 // indirect + go.opentelemetry.io/collector/semconv v0.119.0 go.opentelemetry.io/collector/service v0.119.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.9.0 // indirect go.opentelemetry.io/contrib/config v0.14.0 // indirect diff --git a/go.sum b/go.sum index dfe23c95978..5c74d3b7ddf 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,10 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mx github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/ClickHouse/ch-go v0.64.1 h1:FWpP+QU4KchgzpEekuv8YoI/fUc4H2r6Bwc5WwrzvcI= +github.com/ClickHouse/ch-go v0.64.1/go.mod h1:RBUynvczWwVzhS6Up9lPKlH1mrk4UAmle6uzCiW4Pkc= +github.com/ClickHouse/clickhouse-go/v2 v2.31.0 h1:9MNHRDYXjFTJizGEJM1DfYAqdra/ohprPoZ+LPiuHXQ= +github.com/ClickHouse/clickhouse-go/v2 v2.31.0/go.mod h1:V1aZaG0ctMbd8KVi+D4loXi97duWYtHiQHMCgipKJcI= github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU= github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= @@ -43,6 +47,8 @@ github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrIewtiFmMK5RXHej2XnoXNhxVsAYUfg= github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antchfx/xmlquery v1.4.3 h1:f6jhxCzANrWfa93O+NmRWvieVyLs+R2Szfpy+YrZaww= github.com/antchfx/xmlquery v1.4.3/go.mod h1:AEPEEPYE9GnA2mj5Ur2L5Q5/2PycJ0N9Fusrx9b12fc= github.com/antchfx/xpath v1.3.3 h1:tmuPQa1Uye0Ym1Zn65vxPgfltWb/Lxu2jeqIGteJSRs= @@ -120,8 +126,10 @@ github.com/digitalocean/godo v1.118.0 h1:lkzGFQmACrVCp7UqH1sAi4JK/PWwlc5aaxubgor github.com/digitalocean/godo v1.118.0/go.mod h1:Vk0vpCot2HOAJwc5WE8wljZGtJ3ZtWIc8MQ8rF38sdo= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI= -github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/dmarkham/enumer v1.5.10 h1:ygL0L6quiTiH1jpp68DyvsWaea6MaZLZrTTkIS++R0M= +github.com/dmarkham/enumer v1.5.10/go.mod h1:e4VILe2b1nYK3JKJpRmNdl5xbDQvELc6tQ8b+GsGk6E= +github.com/docker/docker v27.4.1+incompatible h1:ZJvcY7gfwHn1JF48PfbyXg7Jyt9ZCWDW+GGXOIxEwp4= +github.com/docker/docker v27.4.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -172,6 +180,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= +github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= @@ -227,8 +239,10 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -241,8 +255,10 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -329,6 +345,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ionos-cloud/sdk-go/v6 v6.1.11 h1:J/uRN4UWO3wCyGOeDdMKv8LWRzKu6UIkLEaes38Kzh8= github.com/ionos-cloud/sdk-go/v6 v6.1.11/go.mod h1:EzEgRIDxBELvfoa/uBN0kOQaqovLjUWEB7iW4/Q+t4k= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jaegertracing/jaeger-idl v0.5.0 h1:zFXR5NL3Utu7MhPg8ZorxtCBjHrL3ReM1VoB65FOFGE= github.com/jaegertracing/jaeger-idl v0.5.0/go.mod h1:ON90zFo9eoyXrt9F/KN8YeF3zxcnujaisMweFY/rg5k= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -360,6 +378,7 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= @@ -415,6 +434,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mLPqKctH7Uo//I= github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -501,6 +521,11 @@ github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7s github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c= github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI= github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c= +github.com/pascaldekloe/name v1.0.1 h1:9lnXOHeqeHHnWLbKfH6X98+4+ETVqFqxN09UXSjcMb0= +github.com/pascaldekloe/name v1.0.1/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM= +github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= +github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= @@ -553,8 +578,12 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.29 h1:BkTk4gynLjguayxrYxZoMZjBnAOh7ntQvUkOFmkMqPU= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.29/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shirou/gopsutil/v4 v4.24.12 h1:qvePBOk20e0IKA1QXrIIU+jmk+zEiYVVx06WjBRlZo4= github.com/shirou/gopsutil/v4 v4.24.12/go.mod h1:DCtMPAad2XceTeIAbGyVfycbYQNBGk2P8cvDi7/VN9o= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= @@ -590,6 +619,7 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= @@ -611,11 +641,15 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -823,6 +857,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= @@ -888,6 +923,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= @@ -1006,6 +1042,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/storage/integration/clickhouse_test.go b/internal/storage/integration/clickhouse_test.go new file mode 100644 index 00000000000..a6da571b07b --- /dev/null +++ b/internal/storage/integration/clickhouse_test.go @@ -0,0 +1,47 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/jaegertracing/jaeger/internal/storage/v2/clickhouse" + "github.com/jaegertracing/jaeger/pkg/clickhouse/config" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +type ClickhouseIntegrationTestSuite struct { + StorageIntegration +} + +func (s *ClickhouseIntegrationTestSuite) initialize(t *testing.T) { + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) + + cfg := config.DefaultConfiguration() + f, err := clickhouse.NewFactoryWithConfig(&cfg, logger) + require.NoError(t, err) + + t.Cleanup(func() { + assert.NoError(t, f.Close()) + }) + + traceWriter, err := f.CreateTraceWriter() + require.NoError(t, err) + s.TraceWriter = traceWriter + s.CleanUp = func(_ *testing.T) {} +} + +func TestClickHouseStorage(t *testing.T) { + SkipUnlessEnv(t, "clickhouse") + t.Cleanup(func() { + testutils.VerifyGoLeaksOnce(t) + }) + s := &ClickhouseIntegrationTestSuite{} + s.initialize(t) +} diff --git a/internal/storage/v2/clickhouse/factory.go b/internal/storage/v2/clickhouse/factory.go new file mode 100644 index 00000000000..6fe546f1b49 --- /dev/null +++ b/internal/storage/v2/clickhouse/factory.go @@ -0,0 +1,47 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "errors" + + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore" + "github.com/jaegertracing/jaeger/pkg/clickhouse" + "github.com/jaegertracing/jaeger/pkg/clickhouse/config" +) + +type Factory struct { + config *config.Configuration + logger *zap.Logger + + client clickhouse.Client +} + +func NewFactoryWithConfig(configuration *config.Configuration, logger *zap.Logger) (*Factory, error) { + client, err := configuration.NewClient(logger) + if err != nil { + return nil, err + } + + f := &Factory{ + config: configuration, + logger: logger, + client: client, + } + return f, nil +} + +func (f Factory) CreateTraceWriter() (tracestore.Writer, error) { + return NewTraceWriter(f.client, f.logger, "otel_traces") +} + +func (f Factory) Close() error { + var errs []error + if f.client != nil { + errs = append(errs, f.client.Close()) + } + return errors.Join(errs...) +} diff --git a/internal/storage/v2/clickhouse/factory_test.go b/internal/storage/v2/clickhouse/factory_test.go new file mode 100644 index 00000000000..1b28805f136 --- /dev/null +++ b/internal/storage/v2/clickhouse/factory_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "testing" + + "github.com/ClickHouse/ch-go/cht" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/clickhouse/config" +) + +func TestClickHouseFactoryWithConfig(t *testing.T) { + cfg := config.Configuration{ + ClientConfig: config.ClientConfig{ + Address: "127.0.0.1:9000", + Database: "jaeger", + Username: "default", + Password: "default", + }, + ConnectionPoolConfig: config.ConnectionPoolConfig{}, + } + // provider a tcp server for testing. + cht.New(t, + cht.WithLog(zap.NewNop()), + ) + _, err := NewFactoryWithConfig(&cfg, zap.NewNop()) + require.NoError(t, err) +} + +func TestCreateTraceWriter(t *testing.T) { + cfg := config.DefaultConfiguration() + // provider a tcp server for testing. + cht.New(t, + cht.WithLog(zap.NewNop()), + ) + + f, err := NewFactoryWithConfig(&cfg, zap.NewNop()) + require.NoError(t, err) + _, err = f.CreateTraceWriter() + require.NoError(t, err) +} diff --git a/internal/storage/v2/clickhouse/package_test.go b/internal/storage/v2/clickhouse/package_test.go new file mode 100644 index 00000000000..2f9e7d4f304 --- /dev/null +++ b/internal/storage/v2/clickhouse/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/internal/storage/v2/clickhouse/writer.go b/internal/storage/v2/clickhouse/writer.go new file mode 100644 index 00000000000..7a1843c4a5f --- /dev/null +++ b/internal/storage/v2/clickhouse/writer.go @@ -0,0 +1,72 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/clickhouse" +) + +var insertTrace = `INSERT INTO %s ( + Timestamp, + TraceId, + SpanId, + ParentSpanId, + TraceState, + SpanName, + SpanKind, + ServiceName, + ResourceAttributes.keys, + ResourceAttributes.values, + ScopeName, + ScopeVersion, + SpanAttributes.keys, + SpanAttributes.values, + Duration, + StatusCode, + StatusMessage, + Events.Timestamp, + Events.Name, + Events.Attributes, + Links.TraceId, + Links.SpanId, + Links.TraceState, + Links.Attributes + ) VALUES` + +type TraceWriter struct { + Client clickhouse.Client + table string + logger *zap.Logger +} + +func NewTraceWriter(client clickhouse.Client, logger *zap.Logger, table string) (*TraceWriter, error) { + return &TraceWriter{Client: client, logger: logger, table: table}, nil +} + +func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { + err := t.writeTraces(ctx, td) + if err != nil { + return err + } + return nil +} + +func (t *TraceWriter) writeTraces(ctx context.Context, td ptrace.Traces) error { + // TODO SQL injection? + param := clickhouse.QueryParam{ + Body: fmt.Sprintf(insertTrace, t.table), + Input: td, + } + err := t.Client.Do(ctx, param) + if err != nil { + return err + } + return nil +} diff --git a/internal/storage/v2/clickhouse/writer_test.go b/internal/storage/v2/clickhouse/writer_test.go new file mode 100644 index 00000000000..2df83196dd6 --- /dev/null +++ b/internal/storage/v2/clickhouse/writer_test.go @@ -0,0 +1,134 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/clickhouse/mocks" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +type traceWriterTest struct { + client *mocks.Client + logger *zap.Logger + logBuffer *testutils.Buffer + writer *TraceWriter +} + +func withTraceWriter(t *testing.T, fn func(w *traceWriterTest)) { + client := &mocks.Client{} + client.On("Do").Return(nil) + logger, logBuffer := testutils.NewLogger() + writer, err := NewTraceWriter(client, logger, "otel_traces") + require.NoError(t, err) + w := &traceWriterTest{ + client: client, + logger: logger, + logBuffer: logBuffer, + writer: writer, + } + fn(w) +} + +func TestNewTraceWriter(t *testing.T) { + t.Run("test trace writer creation", func(t *testing.T) { + withTraceWriter(t, func(w *traceWriterTest) { + assert.NotNil(t, w.writer) + }) + }) +} + +func TestTraceWriter_WriteTrace(t *testing.T) { + testCases := []struct { + caption string + expectedError string + expectedLogs []string + }{ + { + caption: "traces insertion successfully", + expectedError: "", + }, + { + caption: "traces insertion failed", + expectedError: "table not exists", + }, + } + traces := simpleTraces(10) + for _, tc := range testCases { + testCases := tc + t.Run(tc.caption, func(t *testing.T) { + withTraceWriter(t, func(w *traceWriterTest) { + if testCases.expectedError == "" { + w.client.On("Do", mock.Anything, mock.Anything).Return(nil) + } else { + w.client.On("Do", mock.Anything, mock.Anything).Return(errors.New(testCases.expectedError)) + } + err := w.writer.WriteTraces(context.Background(), traces) + + if testCases.expectedError == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, testCases.expectedError) + } + for _, expectedLog := range testCases.expectedLogs { + assert.Contains(t, w.logBuffer.String(), expectedLog) + } + if len(testCases.expectedLogs) == 0 { + assert.Equal(t, "", w.logBuffer.String()) + } + }) + }) + } +} + +func simpleTraces(count int) ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.SetSchemaUrl("https://opentelemetry.io/schemas/1.4.0") + rs.Resource().SetDroppedAttributesCount(10) + rs.Resource().Attributes().PutStr("service.name", "test-service") + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().SetName("io.opentelemetry.contrib.clickhouse") + ss.Scope().SetVersion("1.0.0") + ss.SetSchemaUrl("https://opentelemetry.io/schemas/1.7.0") + ss.Scope().SetDroppedAttributesCount(20) + ss.Scope().Attributes().PutStr("lib", "clickhouse") + timestamp := time.Unix(1703498029, 0) + for i := 0; i < count; i++ { + s := ss.Spans().AppendEmpty() + s.SetTraceID([16]byte{1, 2, 3, byte(i)}) + s.SetSpanID([8]byte{1, 2, 3, byte(i)}) + s.TraceState().FromRaw("trace state") + s.SetParentSpanID([8]byte{1, 2, 4, byte(i)}) + s.SetName("call db") + s.SetKind(ptrace.SpanKindInternal) + s.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(timestamp.Add(time.Minute))) + s.Attributes().PutStr(conventions.AttributeServiceName, "v") + s.Status().SetMessage("error") + s.Status().SetCode(ptrace.StatusCodeError) + event := s.Events().AppendEmpty() + event.SetName("event1") + event.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + event.Attributes().PutStr("level", "info") + link := s.Links().AppendEmpty() + link.SetTraceID([16]byte{1, 2, 5, byte(i)}) + link.SetSpanID([8]byte{1, 2, 5, byte(i)}) + link.TraceState().FromRaw("error") + link.Attributes().PutStr("k", "v") + } + return traces +} diff --git a/pkg/clickhouse/client.go b/pkg/clickhouse/client.go new file mode 100644 index 00000000000..718e2cc8b56 --- /dev/null +++ b/pkg/clickhouse/client.go @@ -0,0 +1,24 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Client is an abstraction collection of ch-go and clickhouse-go. +type Client interface { + Do(ctx context.Context, query QueryParam) error + Close() error +} + +// QueryParam Parameters for executing write operations +// Body: SQL Insert Statement +// Input: Batch Insert Data. +type QueryParam struct { + Body string + Input ptrace.Traces +} diff --git a/pkg/clickhouse/config/config.go b/pkg/clickhouse/config/config.go new file mode 100644 index 00000000000..153cb97173d --- /dev/null +++ b/pkg/clickhouse/config/config.go @@ -0,0 +1,93 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package config + +import ( + "context" + "runtime" + "time" + + "github.com/ClickHouse/ch-go" + "github.com/ClickHouse/ch-go/chpool" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/clickhouse" + "github.com/jaegertracing/jaeger/pkg/clickhouse/wrapper" +) + +const ( + DefaultMaxConnLifetime = time.Hour + DefaultMaxConnIdleTime = time.Minute * 30 + DefaultHealthCheckPeriod = time.Minute +) + +// Configuration describes the configuration properties needed to connect to Clickhouse server. +type Configuration struct { + ClientConfig ClientConfig `mapstructure:"client"` + ConnectionPoolConfig ConnectionPoolConfig `mapstructure:"pool"` +} + +// ClientConfig Use clickhouse to establish a connection to the server. +type ClientConfig struct { + Address string `mapstructure:"address"` + Database string `mapstructure:"database"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + +// ConnectionPoolConfig Use ch-go to establish a connection to the server. +type ConnectionPoolConfig struct { + MaxConnLifetime time.Duration `mapstructure:"max_connection_lifetime"` + MaxConnIdleTime time.Duration `mapstructure:"max_connection_idle_time"` + MinConns int32 `mapstructure:"min_connections"` + MaxConns int32 `mapstructure:"max_connections"` + HealthCheckPeriod time.Duration `mapstructure:"health_check_period"` +} + +func DefaultConfiguration() Configuration { + return Configuration{ + ClientConfig: ClientConfig{ + Address: "127.0.0.1:9000", + Database: "jaeger", + Username: "default", + Password: "default", + }, + ConnectionPoolConfig: ConnectionPoolConfig{ + MaxConnLifetime: DefaultMaxConnLifetime, + MaxConnIdleTime: DefaultMaxConnIdleTime, + //nolint: gosec // G115 + MaxConns: int32(runtime.NumCPU() * 2), + //nolint: gosec // G115 + MinConns: int32(runtime.NumCPU()), + HealthCheckPeriod: DefaultHealthCheckPeriod, + }, + } +} + +func (c *Configuration) NewClient(logger *zap.Logger) (clickhouse.Client, error) { + pool, err := c.newPool(logger) + if err != nil { + return nil, err + } + return wrapper.WrapCHClient(nil, pool), nil +} + +func (c *Configuration) newPool(log *zap.Logger) (*chpool.Pool, error) { + option := chpool.Options{ + ClientOptions: ch.Options{ + Logger: log, + Address: c.ClientConfig.Address, + Database: c.ClientConfig.Database, + User: c.ClientConfig.Username, + Password: c.ClientConfig.Password, + }, + MaxConnLifetime: c.ConnectionPoolConfig.MaxConnLifetime, + MaxConnIdleTime: c.ConnectionPoolConfig.MaxConnIdleTime, + MaxConns: c.ConnectionPoolConfig.MaxConns, + MinConns: c.ConnectionPoolConfig.MinConns, + HealthCheckPeriod: c.ConnectionPoolConfig.HealthCheckPeriod, + } + chPool, err := chpool.Dial(context.Background(), option) + return chPool, err +} diff --git a/pkg/clickhouse/config/config_test.go b/pkg/clickhouse/config/config_test.go new file mode 100644 index 00000000000..3019748f9b0 --- /dev/null +++ b/pkg/clickhouse/config/config_test.go @@ -0,0 +1,40 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package config + +import ( + "testing" + + "github.com/ClickHouse/ch-go/cht" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestDefaultConfig(t *testing.T) { + cfg := DefaultConfiguration() + assert.NotEmpty(t, cfg) + assert.NotEmpty(t, cfg.ClientConfig) + assert.NotEmpty(t, cfg.ConnectionPoolConfig) +} + +func TestNewClientWithDefaults(t *testing.T) { + cfg := DefaultConfiguration() + logger := zap.NewNop() + + cht.New(t, + cht.WithLog(logger), + ) + + client, err := cfg.NewClient(logger) + require.NoError(t, err) + assert.NotEmpty(t, client) + defer client.Close() +} + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/pkg/clickhouse/empty_test.go b/pkg/clickhouse/empty_test.go new file mode 100644 index 00000000000..55051fbe854 --- /dev/null +++ b/pkg/clickhouse/empty_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/pkg/clickhouse/internal/traces.go b/pkg/clickhouse/internal/traces.go new file mode 100644 index 00000000000..a2f2168508c --- /dev/null +++ b/pkg/clickhouse/internal/traces.go @@ -0,0 +1,213 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "encoding/hex" + "time" + + "github.com/ClickHouse/ch-go/proto" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" +) + +// Input converts the OTEL Traces model into the ClickHouse table format for batch writing. +func Input(td ptrace.Traces) proto.Input { + var ( + timestamp = new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano) + traceId proto.ColStr + spanId proto.ColStr + parentSpanId proto.ColStr + traceState proto.ColStr + spanName proto.ColStr + spanKind proto.ColStr + serviceName proto.ColStr + resourceAttributesKeys = new(proto.ColStr).LowCardinality().Array() + resourceAttributesValues = new(proto.ColStr).LowCardinality().Array() + scopeName proto.ColStr + scopeVersion proto.ColStr + spanAttributesKeys = new(proto.ColStr).LowCardinality().Array() + spanAttributesValues = new(proto.ColStr).LowCardinality().Array() + duration proto.ColUInt64 + statusCode proto.ColStr + statusMessage proto.ColStr + eventsTimestamp = new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano).Array() + eventsName = new(proto.ColStr).Array() + eventsAttributes = proto.NewArray(proto.NewMap(new(proto.ColStr).LowCardinality(), new(proto.ColStr))) + linksTraceId = new(proto.ColStr).Array() + linksSpanId = new(proto.ColStr).Array() + linksTraceState = new(proto.ColStr).Array() + linksAttributes = proto.NewArray(proto.NewMap(new(proto.ColStr).LowCardinality(), new(proto.ColStr))) + ) + + for i := 0; i < td.ResourceSpans().Len(); i++ { + spans := td.ResourceSpans().At(i) + res := spans.Resource() + servName := getServiceName(res.Attributes()) + + resourceAttrs := attributesToMap(res.Attributes()) + resourceKeys := make([]string, len(resourceAttrs)) + resourceValues := make([]string, len(resourceAttrs)) + idx := 0 + for key, val := range resourceAttrs { + resourceKeys[idx] = key + resourceValues[idx] = val + idx++ + } + + for j := 0; j < spans.ScopeSpans().Len(); j++ { + scope := spans.ScopeSpans().At(j).Scope() + scName := scope.Name() + scVersion := scope.Version() + + rs := spans.ScopeSpans().At(j).Spans() + for k := 0; k < rs.Len(); k++ { + r := rs.At(k) + + spanAttr := attributesToMap(r.Attributes()) + spanKeys := make([]string, len(spanAttr)) + spanValues := make([]string, len(spanAttr)) + idx = 0 + for key, val := range spanAttr { + spanKeys[idx] = key + spanValues[idx] = val + idx++ + } + + status := r.Status() + eventTimes, eventNames, eventAttrs := convertEvents(r.Events()) + link := convertLinks(r.Links()) + + linksTraceIDs := link.traceIDs + linksSpanIDs := link.spanIDs + linksTraceStates := link.traceStates + linksAttrs := link.attrs + + timestamp.Append(r.StartTimestamp().AsTime()) + traceId.Append(traceIDToHexOrEmptyString(r.TraceID())) + spanId.Append(spanIDToHexOrEmptyString(r.SpanID())) + parentSpanId.Append(spanIDToHexOrEmptyString(r.ParentSpanID())) + traceState.Append(r.TraceState().AsRaw()) + spanName.Append(r.Name()) + spanKind.Append(r.Kind().String()) + serviceName.Append(servName) + + resourceAttributesKeys.Append(resourceKeys) + resourceAttributesValues.Append(resourceValues) + + scopeName.Append(scName) + scopeVersion.Append(scVersion) + + spanAttributesKeys.Append(spanKeys) + spanAttributesValues.Append(spanValues) + //nolint: gosec // G115 + duration.Append(uint64(r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds())) + statusCode.Append(status.Code().String()) + statusMessage.Append(status.Message()) + + eventsTimestamp.Append(eventTimes) + eventsName.Append(eventNames) + eventsAttributes.Append(eventAttrs) + + linksTraceId.Append(linksTraceIDs) + linksSpanId.Append(linksSpanIDs) + linksTraceState.Append(linksTraceStates) + linksAttributes.Append(linksAttrs) + } + } + } + + return proto.Input{ + {Name: "Timestamp", Data: timestamp}, + {Name: "traceId", Data: traceId}, + {Name: "SpanId", Data: spanId}, + {Name: "ParentSpanId", Data: parentSpanId}, + {Name: "TraceState", Data: traceState}, + {Name: "SpanName", Data: spanName}, + {Name: "SpanKind", Data: spanKind}, + {Name: "ServiceName", Data: serviceName}, + {Name: "ResourceAttributes.keys", Data: resourceAttributesKeys}, + {Name: "ResourceAttributes.values", Data: resourceAttributesValues}, + {Name: "ScopeName", Data: scopeName}, + {Name: "ScopeVersion", Data: scopeVersion}, + {Name: "SpanAttributes.keys", Data: spanAttributesKeys}, + {Name: "SpanAttributes.values", Data: spanAttributesValues}, + {Name: "Duration", Data: duration}, + {Name: "StatusCode", Data: statusCode}, + {Name: "StatusMessage", Data: statusMessage}, + {Name: "Events.Timestamp", Data: eventsTimestamp}, + {Name: "Events.Name", Data: eventsName}, + {Name: "Events.Attributes", Data: eventsAttributes}, + {Name: "Links.traceId", Data: linksTraceId}, + {Name: "Links.SpanId", Data: linksSpanId}, + {Name: "Links.TraceState", Data: linksTraceState}, + {Name: "Links.Attributes", Data: linksAttributes}, + } +} + +func attributesToMap(attributes pcommon.Map) map[string]string { + result := map[string]string{} + attributes.Range(func(k string, v pcommon.Value) bool { + result[k] = v.AsString() + return true + }) + return result +} + +func getServiceName(resAttr pcommon.Map) string { + var serviceName string + if v, ok := resAttr.Get(conventions.AttributeServiceName); ok { + serviceName = v.AsString() + } + + return serviceName +} + +func convertEvents(events ptrace.SpanEventSlice) (times []time.Time, names []string, attrs []map[string]string) { + for i := 0; i < events.Len(); i++ { + event := events.At(i) + times = append(times, event.Timestamp().AsTime()) + names = append(names, event.Name()) + attrs = append(attrs, attributesToMap(event.Attributes())) + } + return times, names, attrs +} + +func convertLinks(links ptrace.SpanLinkSlice) link { + traceIDs := make([]string, 0, links.Len()) + spanIDs := make([]string, 0, links.Len()) + states := make([]string, 0, links.Len()) + attrs := make([]map[string]string, 0, links.Len()) + + for i := 0; i < links.Len(); i++ { + link := links.At(i) + traceIDs = append(traceIDs, traceIDToHexOrEmptyString(link.TraceID())) + spanIDs = append(spanIDs, spanIDToHexOrEmptyString(link.SpanID())) + states = append(states, link.TraceState().AsRaw()) + attrs = append(attrs, attributesToMap(link.Attributes())) + } + return link{traceIDs, spanIDs, states, attrs} +} + +type link struct { + traceIDs []string + spanIDs []string + traceStates []string + attrs []map[string]string +} + +func traceIDToHexOrEmptyString(id pcommon.TraceID) string { + if id.IsEmpty() { + return "" + } + return hex.EncodeToString(id[:]) +} + +func spanIDToHexOrEmptyString(id pcommon.SpanID) string { + if id.IsEmpty() { + return "" + } + return hex.EncodeToString(id[:]) +} diff --git a/pkg/clickhouse/internal/traces_test.go b/pkg/clickhouse/internal/traces_test.go new file mode 100644 index 00000000000..b65bd30086a --- /dev/null +++ b/pkg/clickhouse/internal/traces_test.go @@ -0,0 +1,71 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +var expected = "INSERT INTO \"otel_traces\" (\"Timestamp\",\"traceId\",\"SpanId\",\"ParentSpanId\",\"TraceState\"," + + "\"SpanName\",\"SpanKind\",\"ServiceName\",\"ResourceAttributes.keys\",\"ResourceAttributes.values\",\"ScopeName\"," + + "\"ScopeVersion\",\"SpanAttributes.keys\",\"SpanAttributes.values\",\"Duration\",\"StatusCode\",\"StatusMessage\"," + + "\"Events.Timestamp\",\"Events.Name\",\"Events.Attributes\",\"Links.traceId\",\"Links.SpanId\",\"Links.TraceState\"," + + "\"Links.Attributes\") VALUES" + +func TestInput(t *testing.T) { + td := simpleTraces(2) + actual := Input(td) + assert.Equal(t, expected, actual.Into("otel_traces")) +} + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} + +func simpleTraces(count int) ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.SetSchemaUrl("https://opentelemetry.io/schemas/1.4.0") + rs.Resource().SetDroppedAttributesCount(10) + rs.Resource().Attributes().PutStr("service.name", "test-service") + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().SetName("io.opentelemetry.contrib.clickhouse") + ss.Scope().SetVersion("1.0.0") + ss.SetSchemaUrl("https://opentelemetry.io/schemas/1.7.0") + ss.Scope().SetDroppedAttributesCount(20) + ss.Scope().Attributes().PutStr("lib", "clickhouse") + timestamp := time.Unix(1703498029, 0) + for i := 0; i < count; i++ { + s := ss.Spans().AppendEmpty() + s.SetTraceID([16]byte{1, 2, 3, byte(i)}) + s.SetSpanID([8]byte{1, 2, 3, byte(i)}) + s.TraceState().FromRaw("trace state") + s.SetParentSpanID([8]byte{1, 2, 4, byte(i)}) + s.SetName("call db") + s.SetKind(ptrace.SpanKindInternal) + s.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(timestamp.Add(time.Minute))) + s.Attributes().PutStr(conventions.AttributeServiceName, "v") + s.Status().SetMessage("error") + s.Status().SetCode(ptrace.StatusCodeError) + event := s.Events().AppendEmpty() + event.SetName("event1") + event.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + event.Attributes().PutStr("level", "info") + link := s.Links().AppendEmpty() + link.SetTraceID([16]byte{1, 2, 5, byte(i)}) + link.SetSpanID([8]byte{1, 2, 5, byte(i)}) + link.TraceState().FromRaw("error") + link.Attributes().PutStr("k", "v") + } + return traces +} diff --git a/pkg/clickhouse/mocks/Client.go b/pkg/clickhouse/mocks/Client.go new file mode 100644 index 00000000000..134b5241036 --- /dev/null +++ b/pkg/clickhouse/mocks/Client.go @@ -0,0 +1,106 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + clickhouse "github.com/jaegertracing/jaeger/pkg/clickhouse" + + driver "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + mock "github.com/stretchr/testify/mock" +) + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// Close provides a mock function with no fields +func (_m *Client) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Do provides a mock function with given fields: ctx, query +func (_m *Client) Do(ctx context.Context, query clickhouse.QueryParam) error { + ret := _m.Called(ctx, query) + + if len(ret) == 0 { + panic("no return value specified for Do") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, clickhouse.QueryParam) error); ok { + r0 = rf(ctx, query) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Query provides a mock function with given fields: ctx, query, args +func (_m *Client) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) { + var _ca []interface{} + _ca = append(_ca, ctx, query) + _ca = append(_ca, args...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Query") + } + + var r0 driver.Rows + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, ...any) (driver.Rows, error)); ok { + return rf(ctx, query, args...) + } + if rf, ok := ret.Get(0).(func(context.Context, string, ...any) driver.Rows); ok { + r0 = rf(ctx, query, args...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(driver.Rows) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, ...any) error); ok { + r1 = rf(ctx, query, args...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewClient(t interface { + mock.TestingT + Cleanup(func()) +}) *Client { + mock := &Client{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/clickhouse/wrapper/.nocover b/pkg/clickhouse/wrapper/.nocover new file mode 100644 index 00000000000..7f408e145db --- /dev/null +++ b/pkg/clickhouse/wrapper/.nocover @@ -0,0 +1 @@ +requires connection to ClickHouse \ No newline at end of file diff --git a/pkg/clickhouse/wrapper/empty_test.go b/pkg/clickhouse/wrapper/empty_test.go new file mode 100644 index 00000000000..48aa74c3ccc --- /dev/null +++ b/pkg/clickhouse/wrapper/empty_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package wrapper + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/pkg/clickhouse/wrapper/wrapper.go b/pkg/clickhouse/wrapper/wrapper.go new file mode 100644 index 00000000000..c92fe20693c --- /dev/null +++ b/pkg/clickhouse/wrapper/wrapper.go @@ -0,0 +1,58 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package wrapper + +import ( + "context" + + "github.com/ClickHouse/ch-go" + "github.com/ClickHouse/ch-go/chpool" + v2Client "github.com/ClickHouse/clickhouse-go/v2" + + "github.com/jaegertracing/jaeger/pkg/clickhouse" + trace "github.com/jaegertracing/jaeger/pkg/clickhouse/internal" +) + +// ClientWrapper is a wrapper around clickhouse-go used by read path, ch-go used by write path. +type ClientWrapper struct { + Conn v2Client.Conn + Pool *chpool.Pool +} + +func WrapCHClient(conn v2Client.Conn, pool *chpool.Pool) ClientWrapper { + return ClientWrapper{ + Conn: conn, + Pool: pool, + } +} + +// Do calls this function to internal pool. +func (c ClientWrapper) Do(ctx context.Context, query clickhouse.QueryParam) error { + return c.WrapDo(ctx, tracesQuery(query)) +} + +// Close closes connection or pool. +func (c ClientWrapper) Close() error { + if c.Pool != nil { + c.Pool.Close() + } + if c.Conn != nil { + err := c.Conn.Close() + if err != nil { + return err + } + } + return nil +} + +func (c ClientWrapper) WrapDo(ctx context.Context, query ch.Query) error { + return c.Pool.Do(ctx, query) +} + +func tracesQuery(query clickhouse.QueryParam) ch.Query { + return ch.Query{ + Body: query.Body, + Input: trace.Input(query.Input), + } +} diff --git a/scripts/e2e/clickhouse.sh b/scripts/e2e/clickhouse.sh new file mode 100644 index 00000000000..99ca3092f20 --- /dev/null +++ b/scripts/e2e/clickhouse.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# Copyright (c) 2025 The Jaeger Authors. +# SPDX-License-Identifier: Apache-2.0 + +export CLICKHOUSE_USERNAME="default" +export CLICKHOUSE_PASSWORD="default" +compose_file="docker-compose/clickhouse/docker-compose.yml" + +setup_clickhouse() { + docker compose -f "$compose_file" up -d +} + +wait_for_clickhouse() { + echo "Waiting for ClickHouse to start..." + while ! docker exec clickhouse clickhouse-client --host=127.0.0.1 --port=9000 \ + --user="$CLICKHOUSE_USERNAME" \ + --password="$CLICKHOUSE_PASSWORD" \ + --query="SELECT 1" > /dev/null 2>&1; do + echo "Waiting for ClickHouse to be ready..." + sleep 2 + done + echo "ClickHouse is ready!" +} + +apply_schema() { + docker exec -i clickhouse clickhouse-client --host=127.0.0.1 --port=9000 \ + --user="$CLICKHOUSE_USERNAME" \ + --password="$CLICKHOUSE_PASSWORD" \ + --queries-file="docker-entrypoint-initdb.d/init.sql" +} + +dump_logs() { + echo "::group::🚧 🚧 🚧 clickhouse logs" + docker compose -f "${compose_file}" logs + echo "::endgroup::" +} + +run_integration_test() { + STORAGE=clickhouse make storage-integration-test +} + +main() { + dump_logs + setup_clickhouse + wait_for_clickhouse + apply_schema + run_integration_test +} + +main \ No newline at end of file