diff --git a/CHANGELOG.md b/CHANGELOG.md index 683d4c851..843589944 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ * [CHANGE] Cache: Switch Memcached backend to use `github.com/grafana/gomemcache` repository instead of `github.com/bradfitz/gomemcache`. #248 * [CHANGE] Multierror: Implement `Is(error) bool`. This allows to use `multierror.MultiError` with `errors.Is()`. `MultiError` will in turn call `errors.Is()` on every error value. #254 * [CHANGE] Cache: Remove the `context.Context` argument from the `Cache.Store` method and rename the method to `Cache.StoreAsync`. #273 +* [CHANGE] grpcclient: Make ping time and timeout into `Config.DialOption` arguments (with same defaults as corresponding `google.golang.org/grpc/keepalive.ClientParameters.Time*` parameters). #56 * [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276 * [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279 * [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients. diff --git a/crypto/tls/tls.go b/crypto/tls/tls.go index 7ed818f39..e11bc5086 100644 --- a/crypto/tls/tls.go +++ b/crypto/tls/tls.go @@ -155,9 +155,16 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { return config, nil } -// GetGRPCDialOptions creates GRPC DialOptions for TLS -func (cfg *ClientConfig) GetGRPCDialOptions(enabled bool) ([]grpc.DialOption, error) { - if !enabled { +// WithInsecure wraps grpc.WithInsecure. +// +// Stubbable for tests. +var WithInsecure = func() grpc.DialOption { + return grpc.WithInsecure() +} + +// GetGRPCDialOptions creates GRPC DialOptions for TLS. +func (cfg *ClientConfig) GetGRPCDialOptions(tlsEnabled bool) ([]grpc.DialOption, error) { + if !tlsEnabled { return []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil } diff --git a/go.mod b/go.mod index 9f2e7c861..c5e36f7d8 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/gogo/status v1.1.0 github.com/golang/snappy v0.0.4 + github.com/google/go-cmp v0.5.6 github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 github.com/hashicorp/consul/api v1.15.3 github.com/hashicorp/go-cleanhttp v0.5.2 diff --git a/go.sum b/go.sum index ac6568045..d0776b1aa 100644 --- a/go.sum +++ b/go.sum @@ -566,6 +566,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/grpcclient/grpcclient.go b/grpcclient/grpcclient.go index b9faf3d6e..76aa73e2e 100644 --- a/grpcclient/grpcclient.go +++ b/grpcclient/grpcclient.go @@ -81,14 +81,19 @@ func (cfg *Config) CallOptions() []grpc.CallOption { return opts } -// DialOption returns the config as a grpc.DialOptions. -func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { - var opts []grpc.DialOption - tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) +// DialOption returns the config as a slice of grpc.DialOptions. +// +// keepaliveTime is the number of seconds after which the client will ping the server in case of inactivity. +// See `google.golang.org/grpc/keepalive.ClientParameters.Time` for reference. +// +// keepaliveTimeout is the number of seconds the client waits after pinging the server, and if no activity is +// seen after that, the connection is closed. See `google.golang.org/grpc/keepalive.ClientParameters.Timeout` +// for reference. +func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor, keepaliveTime, keepaliveTimeout int64) ([]grpc.DialOption, error) { + opts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) if err != nil { return nil, err } - opts = append(opts, tlsOpts...) if cfg.BackoffOnRatelimits { unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...) @@ -120,13 +125,29 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep return append( opts, - grpc.WithDefaultCallOptions(cfg.CallOptions()...), - grpc.WithChainUnaryInterceptor(unaryClientInterceptors...), - grpc.WithChainStreamInterceptor(streamClientInterceptors...), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Second * 20, - Timeout: time.Second * 10, + withDefaultCallOptions(cfg.CallOptions()...), + withUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)), + withStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)), + withKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(keepaliveTime) * time.Second, + Timeout: time.Duration(keepaliveTimeout) * time.Second, PermitWithoutStream: true, }), ), nil } + +var withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption { + return grpc.WithDefaultCallOptions(cos...) +} + +var withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption { + return grpc.WithUnaryInterceptor(f) +} + +var withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption { + return grpc.WithStreamInterceptor(f) +} + +var withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption { + return grpc.WithKeepaliveParams(kp) +} diff --git a/grpcclient/grpcclient_test.go b/grpcclient/grpcclient_test.go new file mode 100644 index 000000000..0965e0587 --- /dev/null +++ b/grpcclient/grpcclient_test.go @@ -0,0 +1,120 @@ +package grpcclient + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + + "github.com/grafana/dskit/crypto/tls" +) + +type fakeDialOption struct { + grpc.EmptyDialOption + + callOpts []grpc.CallOption + isInsecure bool + unaryClientInterceptor grpc.UnaryClientInterceptor + streamClientInterceptor grpc.StreamClientInterceptor + keepaliveParams keepalive.ClientParameters +} + +func (o fakeDialOption) Equal(other fakeDialOption) bool { + if len(o.callOpts) != len(other.callOpts) { + return false + } + + for i, arg := range o.callOpts { + if maxRecv, ok := arg.(grpc.MaxRecvMsgSizeCallOption); ok { + if maxRecv != other.callOpts[i].(grpc.MaxRecvMsgSizeCallOption) { + return false + } + continue + } + if maxSend, ok := arg.(grpc.MaxSendMsgSizeCallOption); ok { + if maxSend != other.callOpts[i].(grpc.MaxSendMsgSizeCallOption) { + return false + } + continue + } + } + + hasUnaryInterceptor := o.unaryClientInterceptor != nil + otherHasUnaryInterceptor := other.unaryClientInterceptor != nil + hasStreamInterceptor := o.streamClientInterceptor != nil + otherHasStreamInterceptor := other.streamClientInterceptor != nil + + return o.isInsecure == other.isInsecure && hasUnaryInterceptor == otherHasUnaryInterceptor && + hasStreamInterceptor == otherHasStreamInterceptor && o.keepaliveParams == other.keepaliveParams +} + +func TestConfig(t *testing.T) { + origWithDefaultCallOptions := withDefaultCallOptions + origWithUnaryInterceptor := withUnaryInterceptor + origWithStreamInterceptor := withStreamInterceptor + origWithKeepaliveParams := withKeepaliveParams + origWithInsecure := tls.WithInsecure + t.Cleanup(func() { + withDefaultCallOptions = origWithDefaultCallOptions + withUnaryInterceptor = origWithUnaryInterceptor + withStreamInterceptor = origWithStreamInterceptor + withKeepaliveParams = origWithKeepaliveParams + tls.WithInsecure = origWithInsecure + }) + + withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption { + t.Log("Received call options", "options", cos) + return fakeDialOption{callOpts: cos} + } + withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption { + t.Log("Received unary client interceptor", f) + return fakeDialOption{unaryClientInterceptor: f} + } + withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption { + t.Log("Received stream client interceptor", f) + return fakeDialOption{streamClientInterceptor: f} + } + withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption { + t.Log("Received keepalive params", kp) + return fakeDialOption{ + keepaliveParams: kp, + } + } + tls.WithInsecure = func() grpc.DialOption { + return fakeDialOption{isInsecure: true} + } + + cfg := Config{} + const keepaliveTime = 10 + const keepaliveTimeout = 20 + expOpts := []grpc.DialOption{ + fakeDialOption{isInsecure: true}, + fakeDialOption{callOpts: []grpc.CallOption{ + grpc.MaxCallRecvMsgSize(0), + grpc.MaxCallSendMsgSize(0), + }}, + fakeDialOption{ + unaryClientInterceptor: middleware.ChainUnaryClient(), + }, + fakeDialOption{ + streamClientInterceptor: middleware.ChainStreamClient(), + }, + fakeDialOption{ + keepaliveParams: keepalive.ClientParameters{ + Time: keepaliveTime * time.Second, + Timeout: keepaliveTimeout * time.Second, + PermitWithoutStream: true, + }, + }, + } + + opts, err := cfg.DialOption(nil, nil, keepaliveTime, keepaliveTimeout) + require.NoError(t, err) + + assert.Empty(t, cmp.Diff(expOpts, opts)) +}