From d72a6ac98949c5028483c6c0c96a1f030a3a4b77 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 18 Sep 2024 16:38:06 +0200 Subject: [PATCH] gRPC: Support S2 compression Support S2 and S2 in Snappy compatible mode gRPC compression. Signed-off-by: Arve Knudsen --- CHANGELOG.md | 9 ++ cmd/mimir/config-descriptor.json | 16 +-- cmd/mimir/help-all.txt.tmpl | 16 +-- .../configuration-parameters/index.md | 4 +- go.mod | 4 +- go.sum | 8 +- pkg/alertmanager/alertmanager_client.go | 2 + pkg/frontend/v2/frontend.go | 2 + pkg/ingester/client/client.go | 2 + pkg/querier/worker/worker.go | 3 + pkg/ruler/remotequerier.go | 2 + pkg/ruler/ruler.go | 2 + pkg/scheduler/scheduler.go | 2 + pkg/util/grpcencoding/s2/s2.go | 98 +++++++++++++++++++ .../grafana/dskit/grpcclient/grpcclient.go | 26 +++-- vendor/modules.txt | 4 +- 16 files changed, 168 insertions(+), 32 deletions(-) create mode 100644 pkg/util/grpcencoding/s2/s2.go diff --git a/CHANGELOG.md b/CHANGELOG.md index bd01ae35c20..e68ca32cbe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,15 @@ * [FEATURE] Query frontend: added new query pruning middleware to enable pruning dead code (eg. expressions that cannot produce any results) and simplifying expressions (eg. expressions that can be evaluated immediately) in queries. #9086 * [FEATURE] Ruler: added experimental configuration, `-ruler.rule-evaluation-write-enabled`, to disable writing the result of rule evaluation to ingesters. This feature can be used for testing purposes. #9060 * [FEATURE] Ingester: added experimental configuration `ingester.ignore-ooo-exemplars`. When set to `true` out of order exemplars are no longer reported to the remote write client. #9151 +* [FEATURE] gRPC: Add flags for using respectively S2 or Snappy compatible S2 gRPC compression. #9322 + * `-alertmanager.alertmanager-client.grpc-compression` + * `-ingester.client.grpc-compression` + * `-querier.frontend-client.grpc-compression` + * `-querier.scheduler-client.grpc-compression` + * `-query-frontend.grpc-client-config.grpc-compression` + * `-query-scheduler.grpc-client-config.grpc-compression` + * `-ruler.client.grpc-compression` + * `-ruler.query-frontend.grpc-client-config.grpc-compression` * [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371 * [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378 * [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 9765237a1ba..2a3c7fa8ee8 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2101,7 +2101,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ingester.client.grpc-compression", @@ -4838,7 +4838,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.frontend-client.grpc-compression", @@ -5100,7 +5100,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.scheduler-client.grpc-compression", @@ -5491,7 +5491,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-frontend.grpc-client-config.grpc-compression", @@ -11455,7 +11455,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.client.grpc-compression", @@ -12420,7 +12420,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.query-frontend.grpc-client-config.grpc-compression", @@ -14655,7 +14655,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "alertmanager.alertmanager-client.grpc-compression", @@ -16232,7 +16232,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-scheduler.grpc-client-config.grpc-compression", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index a5755b29a85..7cf04c8cbd0 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -154,7 +154,7 @@ Usage of ./cmd/mimir/mimir: -alertmanager.alertmanager-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -alertmanager.alertmanager-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -alertmanager.alertmanager-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -alertmanager.alertmanager-client.grpc-max-send-msg-size int @@ -1414,7 +1414,7 @@ Usage of ./cmd/mimir/mimir: -ingester.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ingester.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ingester.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ingester.client.grpc-max-send-msg-size int @@ -1868,7 +1868,7 @@ Usage of ./cmd/mimir/mimir: -querier.frontend-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.frontend-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -querier.frontend-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.frontend-client.grpc-max-send-msg-size int @@ -1972,7 +1972,7 @@ Usage of ./cmd/mimir/mimir: -querier.scheduler-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.scheduler-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -querier.scheduler-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.scheduler-client.grpc-max-send-msg-size int @@ -2050,7 +2050,7 @@ Usage of ./cmd/mimir/mimir: -query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-frontend.grpc-client-config.grpc-max-send-msg-size int @@ -2256,7 +2256,7 @@ Usage of ./cmd/mimir/mimir: -query-scheduler.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-scheduler.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -query-scheduler.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-scheduler.grpc-client-config.grpc-max-send-msg-size int @@ -2628,7 +2628,7 @@ Usage of ./cmd/mimir/mimir: -ruler.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ruler.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.client.grpc-max-send-msg-size int @@ -2712,7 +2712,7 @@ Usage of ./cmd/mimir/mimir: -ruler.query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ruler.query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.query-frontend.grpc-client-config.grpc-max-send-msg-size int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 9420968be78..c2054fa8ade 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2328,7 +2328,7 @@ alertmanager_client: [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: - # 'gzip', 'snappy' and '' (disable compression) + # 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) # CLI flag: -alertmanager.alertmanager-client.grpc-compression [grpc_compression: | default = ""] @@ -2591,7 +2591,7 @@ The `grpc_client` block configures the gRPC client used to communicate between t [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: -# 'gzip', 'snappy' and '' (disable compression) +# 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) # CLI flag: -.grpc-compression [grpc_compression: | default = ""] diff --git a/go.mod b/go.mod index 1f0490a47d3..10f72d4c55a 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20240905221822-931a021fb06b + github.com/grafana/dskit v0.0.0-20240919080237-9102f24e6e9e github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 @@ -199,7 +199,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20240711041743-f6c9dda6c6da // indirect github.com/google/s2a-go v0.1.8 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.3 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/gosimple/slug v1.1.1 // indirect github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect diff --git a/go.sum b/go.sum index d2c066f7466..cad0a376c55 100644 --- a/go.sum +++ b/go.sum @@ -1223,8 +1223,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= -github.com/googleapis/enterprise-certificate-proxy v0.3.3 h1:QRje2j5GZimBzlbhGA2V2QlGNgL8G6e+wGo/+/2bWI0= -github.com/googleapis/enterprise-certificate-proxy v0.3.3/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= +github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= +github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= @@ -1255,8 +1255,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240906191856-cdc634f213ea h1:AGmVRk+9ZmzuiLJl6hzQE1vBlVz9wbEb2+J52Gui2ys= github.com/grafana/alerting v0.0.0-20240906191856-cdc634f213ea/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= -github.com/grafana/dskit v0.0.0-20240905221822-931a021fb06b h1:x2HCzk29I0o5pRPfqWP/qwhXaPGlcz8pohq5kO1NZoE= -github.com/grafana/dskit v0.0.0-20240905221822-931a021fb06b/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20240919080237-9102f24e6e9e h1:yGly8b1fa1FruasB9cCmbomRUk/EbfC8GGAs+0x6FpI= +github.com/grafana/dskit v0.0.0-20240919080237-9102f24e6e9e/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ= diff --git a/pkg/alertmanager/alertmanager_client.go b/pkg/alertmanager/alertmanager_client.go index dfb4b0d7ea5..d1976d2355e 100644 --- a/pkg/alertmanager/alertmanager_client.go +++ b/pkg/alertmanager/alertmanager_client.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // ClientsPool is the interface used to get the client from the pool for a specified address. @@ -45,6 +46,7 @@ type ClientConfig struct { // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f) f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 2*time.Second, "Timeout for downstream alertmanagers.") } diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 81ae91baf8a..da14ddfa40a 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/util/globalerror" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -76,6 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.StringVar(&cfg.Addr, "query-frontend.instance-addr", "", "IP address to advertise to the querier (via scheduler) (default is auto-detected from network interfaces).") f.IntVar(&cfg.Port, "query-frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-frontend.grpc-client-config", f) } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 84b3ed00c5e..4717e13675b 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" querierapi "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. @@ -70,6 +71,7 @@ type Config struct { // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f) } diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index da801030ba1..75e605ee83b 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/math" ) @@ -48,7 +49,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to the query-frontend to identify requests from the same querier. Defaults to hostname.") f.BoolVar(&cfg.ResponseStreamingEnabled, "querier.response-streaming-enabled", false, "Enables streaming of responses from querier to query-frontend for response types that support it (currently only `active_series` responses do).") + cfg.QueryFrontendGRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.QueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) + cfg.QuerySchedulerGRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-client", f) } diff --git a/pkg/ruler/remotequerier.go b/pkg/ruler/remotequerier.go index e6329ce1352..89ee2947843 100644 --- a/pkg/ruler/remotequerier.go +++ b/pkg/ruler/remotequerier.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/version" ) @@ -75,6 +76,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) { "GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) "+ "to enable client side load balancing.") + c.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f) f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", "))) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index c527fcf67e2..01b84467058 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -40,6 +40,7 @@ import ( "github.com/grafana/mimir/pkg/ruler/rulestore" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -165,6 +166,7 @@ func (cfg *Config) Validate(limits validation.Limits) error { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { + cfg.ClientTLSConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", f) cfg.Ring.RegisterFlags(f, logger) cfg.Notifier.RegisterFlags(f) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 05692da39e5..82a588fa19a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/validation" ) @@ -106,6 +107,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) cfg.ServiceDiscovery.RegisterFlags(f, logger) } diff --git a/pkg/util/grpcencoding/s2/s2.go b/pkg/util/grpcencoding/s2/s2.go new file mode 100644 index 00000000000..bfb8da11781 --- /dev/null +++ b/pkg/util/grpcencoding/s2/s2.go @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/mostynb/go-grpc-compression/blob/f7e92b39057ca421a6485f650243a3e804036498/internal/s2/s2.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright 2022 Mostyn Bramley-Moore. + +// Package s2 is an experimental wrapper for using +// github.com/klauspost/compress/s2 stream compression with gRPC. +package s2 + +import ( + "errors" + "io" + "sync" + + "github.com/klauspost/compress/s2" + "google.golang.org/grpc/encoding" +) + +const ( + // Name is the name of the S2 compressor. + Name = "s2" + // SnappyCompatName is the name of the Snappy compatible S2 compressor. + SnappyCompatName = "s2-snappy" +) + +type compressor struct { + name string + poolCompressor sync.Pool + poolDecompressor sync.Pool +} + +type writer struct { + *s2.Writer + pool *sync.Pool +} + +type reader struct { + *s2.Reader + pool *sync.Pool +} + +func init() { + encoding.RegisterCompressor(newCompressor(false)) + encoding.RegisterCompressor(newCompressor(true)) +} + +func newCompressor(snappyCompat bool) *compressor { + opts := []s2.WriterOption{s2.WriterConcurrency(1)} + var name string + if snappyCompat { + opts = append(opts, s2.WriterSnappyCompat()) + name = SnappyCompatName + } else { + name = Name + } + c := &compressor{ + name: name, + } + c.poolCompressor.New = func() interface{} { + w := s2.NewWriter(io.Discard, opts...) + return &writer{Writer: w, pool: &c.poolCompressor} + } + return c +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + s := c.poolCompressor.Get().(*writer) + s.Writer.Reset(w) + return s, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + s, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newR := s2.NewReader(r) + return &reader{Reader: newR, pool: &c.poolDecompressor}, nil + } + s.Reset(r) + return s, nil +} + +func (c *compressor) Name() string { + return c.name +} + +func (s *writer) Close() error { + err := s.Writer.Close() + s.pool.Put(s) + return err +} + +func (s *reader) Read(p []byte) (n int, err error) { + n, err = s.Reader.Read(p) + if errors.Is(err, io.EOF) { + s.pool.Put(s) + } + return n, err +} diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 75189904715..a8f728c61e2 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -2,6 +2,8 @@ package grpcclient import ( "flag" + "slices" + "strings" "time" "github.com/pkg/errors" @@ -40,6 +42,9 @@ type Config struct { Middleware []grpc.UnaryClientInterceptor `yaml:"-"` StreamMiddleware []grpc.StreamClientInterceptor `yaml:"-"` + + // CustomCompressors allows configuring custom compressors. + CustomCompressors []string `yaml:"-"` } // RegisterFlags registers flags. @@ -55,9 +60,19 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.InitialStreamWindowSize = defaultInitialWindowSize cfg.InitialConnectionWindowSize = defaultInitialWindowSize + var supportedCompressors strings.Builder + supportedCompressors.WriteString("Use compression when sending messages. Supported values are: 'gzip', 'snappy'") + for _, cmp := range cfg.CustomCompressors { + supportedCompressors.WriteString(", ") + supportedCompressors.WriteString("'") + supportedCompressors.WriteString(cmp) + supportedCompressors.WriteString("'") + } + supportedCompressors.WriteString(" and '' (disable compression)") + f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).") - f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", supportedCompressors.String()) f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.") @@ -74,11 +89,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } func (cfg *Config) Validate() error { - switch cfg.GRPCCompression { - case gzip.Name, snappy.Name, "": - // valid - default: - return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) + supportedCompressors := []string{gzip.Name, snappy.Name, ""} + supportedCompressors = append(supportedCompressors, cfg.CustomCompressors...) + if !slices.Contains(supportedCompressors, cfg.GRPCCompression) { + return errors.Errorf("unsupported compression type: %q", cfg.GRPCCompression) } return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 66b344b76dc..f970ec47417 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -560,7 +560,7 @@ github.com/google/s2a-go/stream # github.com/google/uuid v1.6.0 ## explicit github.com/google/uuid -# github.com/googleapis/enterprise-certificate-proxy v0.3.3 +# github.com/googleapis/enterprise-certificate-proxy v0.3.4 ## explicit; go 1.19 github.com/googleapis/enterprise-certificate-proxy/client github.com/googleapis/enterprise-certificate-proxy/client/util @@ -614,7 +614,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20240905221822-931a021fb06b +# github.com/grafana/dskit v0.0.0-20240919080237-9102f24e6e9e ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast