RPC and Listener event Guice Scopes for gRPC.
Copyright 2021 Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
latest release: 13.0
(javadoc)
Provides rpcScope
and listenerEventScope
Guice Scopes for both client and server apps.
Oversimplifying, in case of streaming requests on servers and streaming responses on clients, listenerEventScope
spans over the processing of a single message from the stream or over a single call to any registered handler (via setOnReadyHandler(...)
, setOnCancelHandler(...)
etc), while rpcScope
spans over a whole given RPC.
Oversimplifying again, in case of unary inbound, these 2 Scopes have roughly similar span, although most registered callbacks will have a separate listenerEventScope
.
See this DZone article for extended high-level explanation.
Technically:
-
A
ServerCall.Listener
creation inServerCallHandler.startCall(...)
, a call to any ofServerCall.Listener
's methods, a call to any ofClientCall.Listener
's methods, each run within a separate instance of ListenerEventContext.-
For servers this means that:
- all callbacks to request
StreamObserver
s returned by methods implementing RPC procedures - methods implementing RPC procedures themselves
- all invocations of callbacks registered via
ServerCallStreamObserver
s
have "separate
listenerEventScope
s", EXCEPT the first call toonReady()
handler in case of unary requests as it's invoked in the sameListener
event-handling method as the RPC method (see the source of gRPC UnaryServerCallListener.onHalfClose() for details). - all callbacks to request
-
For clients this means that:
- all callbacks to response
StreamObserver
s supplied as arguments to stub RPC methods - all invocations of callbacks registered via
ClientCallStreamObserver
have "separate
listenerEventScope
s". - all callbacks to response
-
-
ServerCallHandler.startCall(...)
and each call to any of the returnedServerCall.Listener
's methods run within the same instance of ServerRpcContext. This means that:- a single given call to a method implementing RPC procedure
- all callbacks to the request
StreamObserver
returned by this given call - all callbacks to handlers registered via this call's
ServerCallStreamObserver
all share "the same
rpcScope
". -
Each method call to a single given instance of
ClientCall.Listener
run within the same instance of ClientRpcContext. This means that:- all callbacks to the response
StreamObserver
supplied as an argument to this given call of the stub sRPC method - all callbacks to handlers registered via this call's
ClientCallStreamObserver
all share "the same
rpcScope
". - all callbacks to the response
Contains the above Scope
s, ContextTracker
s, some helper methods and gRPC interceptors that start the above contexts.
A ThreadPoolExecutor
that upon dispatching a task, automatically transfers the current RpcContext
and ListenerEventContext
to the worker thread.
Instances should usually be created using helper methods from the above GrpcModule
and configured for named instance injection in user modules.
Binds tasks and callbacks (Runnable
s, Consumer
s and BiConsumer
s) to contexts that were active at the time of binding. This can be used to transfer Context
s almost fully automatically when it's not possible to use GrpcContextTrackingExecutor
when switching threads (for example when providing callbacks as arguments to async functions). See a usage sample below.
- Create an instance of
GrpcModule
and pass it to other modules. - Other modules can use
GrpcModule.rpcScope
andGrpcModule.listenerEventScope
to scope their bindings:bind(MyComponent.class).to(MyComponentImpl.class).in(grpcModule.rpcScope);
- All gRPC service instances added to server must be intercepted with
GrpcModule.serverInterceptor
like the following:.addService(ServerInterceptors.intercept(myService, grpcModule.contextInterceptor /* more interceptors here... */))
- All client
Channel
s must be intercepted withGrpcModule.clientInterceptor
orGrpcModule.nestingClientInterceptor
like the following:ClientInterceptors.intercept(channel, grpcModule.clientInterceptor)
public class MyServer {
final Server grpcServer;
public MyServer(int port /* more params here... */) throws Exception {
final var grpcModule = new GrpcModule();
final Module myModule = (binder) -> {
binder.bind(MyComponent.class)
.to(MyComponentImpl.class)
.in(grpcModule.rpcScope);
// more bindings here
};
// more modules here
final var injector = Guice.createInjector(grpcModule, myModule /* more modules here... */);
final var myService = injector.getInstance(MyService.class);
// myService will get Provider<MyComponent> injected
// more services here...
grpcServer = ServerBuilder
.forPort(port)
.addService(ServerInterceptors.intercept(
myService, grpcModule.serverInterceptor /* more interceptors here... */))
// more services here...
.build();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {/* shutdown code here... */}));
grpcServer.start();
}
public static void main(String[] args) throws Exception {
new MyServer(6666 /* more params here... */).grpcServer.awaitTermination();
}
// more code here...
}
public class MyClient {
public static void main(String[] args) throws Exception {
final var entityManagerFactory = createEntityManagerFactory(args[0]);
final ManagedChannel managedChannel;
try {
managedChannel = ManagedChannelBuilder
.forTarget(args[1])
.usePlaintext()
.build();
} catch (Throwable t) {
entityManagerFactory.close();
throw t;
}
try {
final var grpcModule = new GrpcModule();
final var channel = ClientInterceptors.intercept(
managedChannel, grpcModule.nestingClientInterceptor);
final var stub = MyServiceGrpc.newStub(channel);
final Module myModule = (binder) -> {
binder.bind(EntityManagerFactory.class)
.toInstance(entityManagerFactory);
binder.bind(EntityManager.class)
.toProvider(entityManagerFactory::createEntityManager)
.in(grpcModule.listenerEventScope);
binder.bind(MyDao.class)
.to(MyJpaDao.class)
.in(Scopes.SINGLETON);
// more bindings here
};
// more modules here
final var injector = Guice.createInjector(
grpcModule, myModule /* more modules here... */);
final var myResponseObserver = injector.getInstance(MyResponseObserver.class);
// myResponseObserver will get MyDao injected
stub.myUnaryRequestStreamingResponseProcedure(args[2], myResponseObserver);
myResponseObserver.awaitCompletion(30, TimeUnit.MINUTES);
} finally {
entityManagerFactory.close();
managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
if ( !managedChannel.isTerminated()) {
System.err.println("channel has NOT shutdown cleanly");
managedChannel.shutdownNow();
}
}
}
// more code here...
}
class MyComponent {
@Inject ContextBinder ctxBinder;
void methodThatCallsSomeAsyncMethod(/* ... */) {
// other code here...
someAsyncMethod(arg1, /* ... */ argN, ctxBinder.bindToContext((callbackParam) -> {
// callback code here...
}));
}
}
Dependencies of this jar on guice and grpc are declared as optional, so that apps can use any versions of these deps with a compatible API.
See sample app