From 99502b716d021df54cfcfdc6f51055ea3e5d8ec0 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 18 Sep 2024 16:38:06 +0200 Subject: [PATCH] gRPC: Support S2 compression Support S2 and S2 in Snappy compatible mode gRPC compression. Signed-off-by: Arve Knudsen --- CHANGELOG.md | 9 ++ cmd/mimir/config-descriptor.json | 16 +-- cmd/mimir/help-all.txt.tmpl | 16 +-- .../configuration-parameters/index.md | 4 +- go.mod | 2 +- go.sum | 4 +- pkg/alertmanager/alertmanager_client.go | 2 + pkg/frontend/v2/frontend.go | 2 + pkg/ingester/client/client.go | 2 + pkg/querier/worker/worker.go | 3 + pkg/ruler/remotequerier.go | 2 + pkg/ruler/ruler.go | 2 + pkg/scheduler/scheduler.go | 2 + pkg/util/grpcencoding/s2/s2.go | 98 +++++++++++++++++++ vendor/modules.txt | 2 +- 15 files changed, 144 insertions(+), 22 deletions(-) create mode 100644 pkg/util/grpcencoding/s2/s2.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cbba1de2da..b5b5335ee4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,15 @@ * [FEATURE] Query frontend: added new query pruning middleware to enable pruning dead code (eg. expressions that cannot produce any results) and simplifying expressions (eg. expressions that can be evaluated immediately) in queries. #9086 * [FEATURE] Ruler: added experimental configuration, `-ruler.rule-evaluation-write-enabled`, to disable writing the result of rule evaluation to ingesters. This feature can be used for testing purposes. #9060 * [FEATURE] Ingester: added experimental configuration `ingester.ignore-ooo-exemplars`. When set to `true` out of order exemplars are no longer reported to the remote write client. #9151 +* [FEATURE] gRPC: Add flags for using respectively S2 or Snappy compatible S2 gRPC compression. #9322 + * `-alertmanager.alertmanager-client.grpc-compression` + * `-ingester.client.grpc-compression` + * `-querier.frontend-client.grpc-compression` + * `-querier.scheduler-client.grpc-compression` + * `-query-frontend.grpc-client-config.grpc-compression` + * `-query-scheduler.grpc-client-config.grpc-compression` + * `-ruler.client.grpc-compression` + * `-ruler.query-frontend.grpc-client-config.grpc-compression` * [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371 * [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378 * [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a19108b5964..3ea55f59ffd 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2101,7 +2101,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ingester.client.grpc-compression", @@ -4838,7 +4838,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.frontend-client.grpc-compression", @@ -5100,7 +5100,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.scheduler-client.grpc-compression", @@ -5491,7 +5491,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-frontend.grpc-client-config.grpc-compression", @@ -11455,7 +11455,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.client.grpc-compression", @@ -12420,7 +12420,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.query-frontend.grpc-client-config.grpc-compression", @@ -14677,7 +14677,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "alertmanager.alertmanager-client.grpc-compression", @@ -16254,7 +16254,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-scheduler.grpc-client-config.grpc-compression", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index e6477b91821..73651f3fc36 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -154,7 +154,7 @@ Usage of ./cmd/mimir/mimir: -alertmanager.alertmanager-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -alertmanager.alertmanager-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -alertmanager.alertmanager-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -alertmanager.alertmanager-client.grpc-max-send-msg-size int @@ -1414,7 +1414,7 @@ Usage of ./cmd/mimir/mimir: -ingester.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ingester.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ingester.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ingester.client.grpc-max-send-msg-size int @@ -1868,7 +1868,7 @@ Usage of ./cmd/mimir/mimir: -querier.frontend-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.frontend-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -querier.frontend-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.frontend-client.grpc-max-send-msg-size int @@ -1972,7 +1972,7 @@ Usage of ./cmd/mimir/mimir: -querier.scheduler-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.scheduler-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -querier.scheduler-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.scheduler-client.grpc-max-send-msg-size int @@ -2050,7 +2050,7 @@ Usage of ./cmd/mimir/mimir: -query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-frontend.grpc-client-config.grpc-max-send-msg-size int @@ -2256,7 +2256,7 @@ Usage of ./cmd/mimir/mimir: -query-scheduler.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-scheduler.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -query-scheduler.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-scheduler.grpc-client-config.grpc-max-send-msg-size int @@ -2628,7 +2628,7 @@ Usage of ./cmd/mimir/mimir: -ruler.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ruler.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.client.grpc-max-send-msg-size int @@ -2716,7 +2716,7 @@ Usage of ./cmd/mimir/mimir: -ruler.query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) -ruler.query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.query-frontend.grpc-client-config.grpc-max-send-msg-size int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 0a345b3ee47..344d669f013 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2337,7 +2337,7 @@ alertmanager_client: [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: - # 'gzip', 'snappy' and '' (disable compression) + # 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) # CLI flag: -alertmanager.alertmanager-client.grpc-compression [grpc_compression: | default = ""] @@ -2600,7 +2600,7 @@ The `grpc_client` block configures the gRPC client used to communicate between t [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: -# 'gzip', 'snappy' and '' (disable compression) +# 'gzip', 'snappy', 's2', 's2-snappy' and '' (disable compression) # CLI flag: -.grpc-compression [grpc_compression: | default = ""] diff --git a/go.mod b/go.mod index 8b2ce61d9d9..cb97f8563dc 100644 --- a/go.mod +++ b/go.mod @@ -200,7 +200,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20240711041743-f6c9dda6c6da // indirect github.com/google/s2a-go v0.1.8 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.3 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/gosimple/slug v1.1.1 // indirect github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect diff --git a/go.sum b/go.sum index 122299f9e8e..76c2dcd1ad0 100644 --- a/go.sum +++ b/go.sum @@ -1218,8 +1218,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= -github.com/googleapis/enterprise-certificate-proxy v0.3.3 h1:QRje2j5GZimBzlbhGA2V2QlGNgL8G6e+wGo/+/2bWI0= -github.com/googleapis/enterprise-certificate-proxy v0.3.3/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= +github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= +github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= diff --git a/pkg/alertmanager/alertmanager_client.go b/pkg/alertmanager/alertmanager_client.go index dfb4b0d7ea5..d1976d2355e 100644 --- a/pkg/alertmanager/alertmanager_client.go +++ b/pkg/alertmanager/alertmanager_client.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // ClientsPool is the interface used to get the client from the pool for a specified address. @@ -45,6 +46,7 @@ type ClientConfig struct { // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f) f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 2*time.Second, "Timeout for downstream alertmanagers.") } diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 81ae91baf8a..da14ddfa40a 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/util/globalerror" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -76,6 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.StringVar(&cfg.Addr, "query-frontend.instance-addr", "", "IP address to advertise to the querier (via scheduler) (default is auto-detected from network interfaces).") f.IntVar(&cfg.Port, "query-frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-frontend.grpc-client-config", f) } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 84b3ed00c5e..4717e13675b 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" querierapi "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. @@ -70,6 +71,7 @@ type Config struct { // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f) } diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index da801030ba1..75e605ee83b 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/math" ) @@ -48,7 +49,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to the query-frontend to identify requests from the same querier. Defaults to hostname.") f.BoolVar(&cfg.ResponseStreamingEnabled, "querier.response-streaming-enabled", false, "Enables streaming of responses from querier to query-frontend for response types that support it (currently only `active_series` responses do).") + cfg.QueryFrontendGRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.QueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) + cfg.QuerySchedulerGRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-client", f) } diff --git a/pkg/ruler/remotequerier.go b/pkg/ruler/remotequerier.go index e6329ce1352..89ee2947843 100644 --- a/pkg/ruler/remotequerier.go +++ b/pkg/ruler/remotequerier.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/version" ) @@ -75,6 +76,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) { "GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) "+ "to enable client side load balancing.") + c.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f) f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", "))) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index f68ac54e2d4..4cfa7f26096 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -40,6 +40,7 @@ import ( "github.com/grafana/mimir/pkg/ruler/rulestore" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -167,6 +168,7 @@ func (cfg *Config) Validate(limits validation.Limits) error { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { + cfg.ClientTLSConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", f) cfg.Ring.RegisterFlags(f, logger) cfg.Notifier.RegisterFlags(f) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 05692da39e5..82a588fa19a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/validation" ) @@ -106,6 +107,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name, s2.SnappyCompatName} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) cfg.ServiceDiscovery.RegisterFlags(f, logger) } diff --git a/pkg/util/grpcencoding/s2/s2.go b/pkg/util/grpcencoding/s2/s2.go new file mode 100644 index 00000000000..bfb8da11781 --- /dev/null +++ b/pkg/util/grpcencoding/s2/s2.go @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/mostynb/go-grpc-compression/blob/f7e92b39057ca421a6485f650243a3e804036498/internal/s2/s2.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright 2022 Mostyn Bramley-Moore. + +// Package s2 is an experimental wrapper for using +// github.com/klauspost/compress/s2 stream compression with gRPC. +package s2 + +import ( + "errors" + "io" + "sync" + + "github.com/klauspost/compress/s2" + "google.golang.org/grpc/encoding" +) + +const ( + // Name is the name of the S2 compressor. + Name = "s2" + // SnappyCompatName is the name of the Snappy compatible S2 compressor. + SnappyCompatName = "s2-snappy" +) + +type compressor struct { + name string + poolCompressor sync.Pool + poolDecompressor sync.Pool +} + +type writer struct { + *s2.Writer + pool *sync.Pool +} + +type reader struct { + *s2.Reader + pool *sync.Pool +} + +func init() { + encoding.RegisterCompressor(newCompressor(false)) + encoding.RegisterCompressor(newCompressor(true)) +} + +func newCompressor(snappyCompat bool) *compressor { + opts := []s2.WriterOption{s2.WriterConcurrency(1)} + var name string + if snappyCompat { + opts = append(opts, s2.WriterSnappyCompat()) + name = SnappyCompatName + } else { + name = Name + } + c := &compressor{ + name: name, + } + c.poolCompressor.New = func() interface{} { + w := s2.NewWriter(io.Discard, opts...) + return &writer{Writer: w, pool: &c.poolCompressor} + } + return c +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + s := c.poolCompressor.Get().(*writer) + s.Writer.Reset(w) + return s, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + s, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newR := s2.NewReader(r) + return &reader{Reader: newR, pool: &c.poolDecompressor}, nil + } + s.Reset(r) + return s, nil +} + +func (c *compressor) Name() string { + return c.name +} + +func (s *writer) Close() error { + err := s.Writer.Close() + s.pool.Put(s) + return err +} + +func (s *reader) Read(p []byte) (n int, err error) { + n, err = s.Reader.Read(p) + if errors.Is(err, io.EOF) { + s.pool.Put(s) + } + return n, err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c061f8a263c..f226d35fe29 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -557,7 +557,7 @@ github.com/google/s2a-go/stream # github.com/google/uuid v1.6.0 ## explicit github.com/google/uuid -# github.com/googleapis/enterprise-certificate-proxy v0.3.3 +# github.com/googleapis/enterprise-certificate-proxy v0.3.4 ## explicit; go 1.19 github.com/googleapis/enterprise-certificate-proxy/client github.com/googleapis/enterprise-certificate-proxy/client/util