-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpcclient: Make keepalive time and timeout into config parameters #56
base: main
Are you sure you want to change the base?
Changes from all commits
4d0aca8
b03ae2f
582bf5b
d6a678e
ad31150
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,14 +81,19 @@ func (cfg *Config) CallOptions() []grpc.CallOption { | |
return opts | ||
} | ||
|
||
// DialOption returns the config as a grpc.DialOptions. | ||
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { | ||
var opts []grpc.DialOption | ||
tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) | ||
// DialOption returns the config as a slice of grpc.DialOptions. | ||
// | ||
// keepaliveTime is the number of seconds after which the client will ping the server in case of inactivity. | ||
// See `google.golang.org/grpc/keepalive.ClientParameters.Time` for reference. | ||
// | ||
// keepaliveTimeout is the number of seconds the client waits after pinging the server, and if no activity is | ||
// seen after that, the connection is closed. See `google.golang.org/grpc/keepalive.ClientParameters.Timeout` | ||
// for reference. | ||
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor, keepaliveTime, keepaliveTimeout int64) ([]grpc.DialOption, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think keepaliveTime and keepaliveTimeout should be |
||
opts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) | ||
if err != nil { | ||
return nil, err | ||
} | ||
opts = append(opts, tlsOpts...) | ||
|
||
if cfg.BackoffOnRatelimits { | ||
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...) | ||
|
@@ -120,13 +125,29 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep | |
|
||
return append( | ||
opts, | ||
grpc.WithDefaultCallOptions(cfg.CallOptions()...), | ||
grpc.WithChainUnaryInterceptor(unaryClientInterceptors...), | ||
grpc.WithChainStreamInterceptor(streamClientInterceptors...), | ||
grpc.WithKeepaliveParams(keepalive.ClientParameters{ | ||
Time: time.Second * 20, | ||
Timeout: time.Second * 10, | ||
withDefaultCallOptions(cfg.CallOptions()...), | ||
withUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)), | ||
withStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)), | ||
withKeepaliveParams(keepalive.ClientParameters{ | ||
Time: time.Duration(keepaliveTime) * time.Second, | ||
Timeout: time.Duration(keepaliveTimeout) * time.Second, | ||
PermitWithoutStream: true, | ||
}), | ||
), nil | ||
} | ||
|
||
var withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of using global variables for these, can they be passed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pstibrany I can try. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this feedback hasn't been addressed, am I wrong? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pracucci you're right! I was deferring addressing this until consensus on whether the PR should be closed or not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pracucci @pstibrany the drawback I see with stubbing these dependencies via arguments is that it adds complexity. The tests will no longer call If you still prefer the injection via method arguments technique, I can change to that. Just want to clarify the problems I see with it. |
||
return grpc.WithDefaultCallOptions(cos...) | ||
} | ||
|
||
var withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption { | ||
return grpc.WithUnaryInterceptor(f) | ||
} | ||
|
||
var withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption { | ||
return grpc.WithStreamInterceptor(f) | ||
} | ||
|
||
var withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption { | ||
return grpc.WithKeepaliveParams(kp) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
package grpcclient | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/keepalive" | ||
|
||
"github.com/grafana/dskit/crypto/tls" | ||
) | ||
|
||
type fakeDialOption struct { | ||
grpc.EmptyDialOption | ||
|
||
callOpts []grpc.CallOption | ||
isInsecure bool | ||
unaryClientInterceptor grpc.UnaryClientInterceptor | ||
streamClientInterceptor grpc.StreamClientInterceptor | ||
keepaliveParams keepalive.ClientParameters | ||
} | ||
|
||
func (o fakeDialOption) Equal(other fakeDialOption) bool { | ||
if len(o.callOpts) != len(other.callOpts) { | ||
return false | ||
} | ||
|
||
for i, arg := range o.callOpts { | ||
if maxRecv, ok := arg.(grpc.MaxRecvMsgSizeCallOption); ok { | ||
if maxRecv != other.callOpts[i].(grpc.MaxRecvMsgSizeCallOption) { | ||
return false | ||
} | ||
continue | ||
} | ||
if maxSend, ok := arg.(grpc.MaxSendMsgSizeCallOption); ok { | ||
if maxSend != other.callOpts[i].(grpc.MaxSendMsgSizeCallOption) { | ||
return false | ||
} | ||
continue | ||
} | ||
} | ||
|
||
hasUnaryInterceptor := o.unaryClientInterceptor != nil | ||
otherHasUnaryInterceptor := other.unaryClientInterceptor != nil | ||
hasStreamInterceptor := o.streamClientInterceptor != nil | ||
otherHasStreamInterceptor := other.streamClientInterceptor != nil | ||
|
||
return o.isInsecure == other.isInsecure && hasUnaryInterceptor == otherHasUnaryInterceptor && | ||
hasStreamInterceptor == otherHasStreamInterceptor && o.keepaliveParams == other.keepaliveParams | ||
} | ||
|
||
func TestConfig(t *testing.T) { | ||
origWithDefaultCallOptions := withDefaultCallOptions | ||
origWithUnaryInterceptor := withUnaryInterceptor | ||
origWithStreamInterceptor := withStreamInterceptor | ||
origWithKeepaliveParams := withKeepaliveParams | ||
origWithInsecure := tls.WithInsecure | ||
t.Cleanup(func() { | ||
withDefaultCallOptions = origWithDefaultCallOptions | ||
withUnaryInterceptor = origWithUnaryInterceptor | ||
withStreamInterceptor = origWithStreamInterceptor | ||
withKeepaliveParams = origWithKeepaliveParams | ||
tls.WithInsecure = origWithInsecure | ||
}) | ||
|
||
withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption { | ||
t.Log("Received call options", "options", cos) | ||
return fakeDialOption{callOpts: cos} | ||
} | ||
withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption { | ||
t.Log("Received unary client interceptor", f) | ||
return fakeDialOption{unaryClientInterceptor: f} | ||
} | ||
withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption { | ||
t.Log("Received stream client interceptor", f) | ||
return fakeDialOption{streamClientInterceptor: f} | ||
} | ||
withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption { | ||
t.Log("Received keepalive params", kp) | ||
return fakeDialOption{ | ||
keepaliveParams: kp, | ||
} | ||
} | ||
tls.WithInsecure = func() grpc.DialOption { | ||
return fakeDialOption{isInsecure: true} | ||
} | ||
|
||
cfg := Config{} | ||
const keepaliveTime = 10 | ||
const keepaliveTimeout = 20 | ||
expOpts := []grpc.DialOption{ | ||
fakeDialOption{isInsecure: true}, | ||
fakeDialOption{callOpts: []grpc.CallOption{ | ||
grpc.MaxCallRecvMsgSize(0), | ||
grpc.MaxCallSendMsgSize(0), | ||
}}, | ||
fakeDialOption{ | ||
unaryClientInterceptor: middleware.ChainUnaryClient(), | ||
}, | ||
fakeDialOption{ | ||
streamClientInterceptor: middleware.ChainStreamClient(), | ||
}, | ||
fakeDialOption{ | ||
keepaliveParams: keepalive.ClientParameters{ | ||
Time: keepaliveTime * time.Second, | ||
Timeout: keepaliveTimeout * time.Second, | ||
PermitWithoutStream: true, | ||
}, | ||
}, | ||
} | ||
|
||
opts, err := cfg.DialOption(nil, nil, keepaliveTime, keepaliveTimeout) | ||
require.NoError(t, err) | ||
|
||
assert.Empty(t, cmp.Diff(expOpts, opts)) | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we wanna to expose it? Looks weird to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's technically necessary for grpcclient tests to be able to stub calling out to the
grpc
package. I'll look again into whether there's another way to do this (dependency injection?).