diff --git a/CHANGELOG.md b/CHANGELOG.md index 9749f68f8..dafb558c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,7 @@ * [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477 * [FEATURE] Add `concurrency.ForEachJobMergeResults()` utility function. #486 * [FEATURE] Add `ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation()`. #495 +* [FEATURE] Add S2 GRPC compression. #582 * [ENHANCEMENT] Add ability to log all source hosts from http header instead of only the first one. #444 * [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients. * [ENHANCEMENT] Use `SecretReader` interface to fetch secrets when configuring TLS. #274 diff --git a/go.mod b/go.mod index 771fc1e18..6b1d9a768 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/hashicorp/go-sockaddr v1.0.2 github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/hashicorp/memberlist v0.3.1 + github.com/klauspost/compress v1.17.9 github.com/miekg/dns v1.1.50 github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e github.com/opentracing-contrib/go-stdlib v1.0.0 @@ -39,7 +40,7 @@ require ( github.com/prometheus/common v0.44.0 github.com/prometheus/exporter-toolkit v0.10.1-0.20230714054209-2f4150c63f97 github.com/sercand/kuberesolver/v5 v5.1.1 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.9.0 github.com/uber/jaeger-client-go v2.28.0+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible go.etcd.io/etcd/api/v3 v3.5.0 @@ -77,7 +78,6 @@ require ( github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/serf v0.9.7 // indirect github.com/jpillora/backoff v1.0.0 // indirect - github.com/klauspost/compress v1.17.8 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -87,7 +87,7 @@ require ( github.com/onsi/gomega v1.24.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect - github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect diff --git a/go.sum b/go.sum index 4a1cb572c..193ddcdf2 100644 --- a/go.sum +++ b/go.sum @@ -199,8 +199,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -318,18 +318,15 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.28.0+incompatible h1:G4QSBfvPKvg5ZM2j9MrJFdfI5iSljY/WnJqOGFao6HI= github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= diff --git a/grpcclient/grpcclient.go b/grpcclient/grpcclient.go index 751899047..fab4b90fb 100644 --- a/grpcclient/grpcclient.go +++ b/grpcclient/grpcclient.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/grpcencoding/s2" "github.com/grafana/dskit/grpcencoding/snappy" ) @@ -57,7 +58,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).") - f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2', and '' (disable compression)") f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.") @@ -75,7 +76,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { func (cfg *Config) Validate() error { switch cfg.GRPCCompression { - case gzip.Name, snappy.Name, "": + case gzip.Name, snappy.Name, s2.Name, "": // valid default: return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) diff --git a/grpcencoding/s2/s2.go b/grpcencoding/s2/s2.go new file mode 100644 index 000000000..a209d26a3 --- /dev/null +++ b/grpcencoding/s2/s2.go @@ -0,0 +1,89 @@ +// Copyright 2022 Mostyn Bramley-Moore. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package s2 is an experimental wrapper for using +// github.com/klauspost/compress/s2 stream compression with gRPC. +package s2 + +import ( + "io" + "sync" + + "github.com/klauspost/compress/s2" + "google.golang.org/grpc/encoding" +) + +const Name = "s2" + +type compressor struct { + poolCompressor sync.Pool + poolDecompressor sync.Pool +} + +type writer struct { + *s2.Writer + pool *sync.Pool +} + +type reader struct { + *s2.Reader + pool *sync.Pool +} + +func init() { + encoding.RegisterCompressor(newCompressor()) +} + +func newCompressor() *compressor { + c := &compressor{} + c.poolCompressor.New = func() interface{} { + w := s2.NewWriter(io.Discard, s2.WriterConcurrency(1)) + return &writer{Writer: w, pool: &c.poolCompressor} + } + return c +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + s := c.poolCompressor.Get().(*writer) + s.Writer.Reset(w) + return s, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + s, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newR := s2.NewReader(r) + return &reader{Reader: newR, pool: &c.poolDecompressor}, nil + } + s.Reset(r) + return s, nil +} + +func (c *compressor) Name() string { + return Name +} + +func (s *writer) Close() error { + err := s.Writer.Close() + s.pool.Put(s) + return err +} + +func (s *reader) Read(p []byte) (n int, err error) { + n, err = s.Reader.Read(p) + if err == io.EOF { + s.pool.Put(s) + } + return n, err +} diff --git a/grpcencoding/s2/s2_test.go b/grpcencoding/s2/s2_test.go new file mode 100644 index 000000000..a7a5684ed --- /dev/null +++ b/grpcencoding/s2/s2_test.go @@ -0,0 +1,94 @@ +package s2 + +import ( + "bytes" + "io" + "strings" + "testing" + + "google.golang.org/grpc/encoding" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestS2(t *testing.T) { + c := newCompressor() + assert.Equal(t, "s2", c.Name()) + + tests := []struct { + test string + input string + }{ + {"empty", ""}, + {"short", "hello world"}, + {"long", strings.Repeat("123456789", 1024)}, + } + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + var buf bytes.Buffer + // Compress + w, err := c.Compress(&buf) + require.NoError(t, err) + n, err := w.Write([]byte(test.input)) + require.NoError(t, err) + assert.Len(t, test.input, n) + err = w.Close() + require.NoError(t, err) + // Decompress + r, err := c.Decompress(&buf) + require.NoError(t, err) + out, err := io.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, test.input, string(out)) + }) + } +} + +func BenchmarkS2SCompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + b.ResetTimer() + for i := 0; i < b.N; i++ { + w, _ := c.Compress(io.Discard) + _, _ = w.Write(data) + _ = w.Close() + } +} + +func BenchmarkS2Decompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + var buf bytes.Buffer + w, _ := c.Compress(&buf) + _, _ = w.Write(data) + reader := bytes.NewReader(buf.Bytes()) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r, _ := c.Decompress(reader) + _, _ = io.ReadAll(r) + _, _ = reader.Seek(0, io.SeekStart) + } +} + +func BenchmarkS2GrpcCompressionPerf(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + grpcc := encoding.GetCompressor(Name) + + // Reset the timer to exclude setup time from the measurements + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for j := 0; j < 10; j++ { + var buf bytes.Buffer + writer, _ := grpcc.Compress(&buf) + _, _ = writer.Write(data) + _ = writer.Close() + + compressedData := buf.Bytes() + reader, _ := grpcc.Decompress(bytes.NewReader(compressedData)) + var result bytes.Buffer + _, _ = result.ReadFrom(reader) + } + } +} diff --git a/grpcencoding/snappy/snappy_test.go b/grpcencoding/snappy/snappy_test.go index d288c95c2..32957f6ea 100644 --- a/grpcencoding/snappy/snappy_test.go +++ b/grpcencoding/snappy/snappy_test.go @@ -6,6 +6,8 @@ import ( "strings" "testing" + "google.golang.org/grpc/encoding" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -68,3 +70,25 @@ func BenchmarkSnappyDecompress(b *testing.B) { _, _ = reader.Seek(0, io.SeekStart) } } + +func BenchmarkSnappyGrpcCompressionPerf(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + grpcc := encoding.GetCompressor(Name) + + // Reset the timer to exclude setup time from the measurements + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for j := 0; j < 10; j++ { + var buf bytes.Buffer + writer, _ := grpcc.Compress(&buf) + _, _ = writer.Write(data) + _ = writer.Close() + + compressedData := buf.Bytes() + reader, _ := grpcc.Decompress(bytes.NewReader(compressedData)) + var result bytes.Buffer + _, _ = result.ReadFrom(reader) + } + } +} diff --git a/ring/example/local/go.sum b/ring/example/local/go.sum index 9d0fe7587..f6310136f 100644 --- a/ring/example/local/go.sum +++ b/ring/example/local/go.sum @@ -257,15 +257,15 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.28.0+incompatible h1:G4QSBfvPKvg5ZM2j9MrJFdfI5iSljY/WnJqOGFao6HI= github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=