6868import io .grpc .inprocess .InProcessServerBuilder ;
6969import io .grpc .internal .BackoffPolicy ;
7070import io .grpc .internal .FakeClock ;
71+ import io .grpc .internal .GrpcUtil ;
72+ import io .grpc .internal .ObjectPool ;
7173import io .grpc .internal .PickSubchannelArgsImpl ;
74+ import io .grpc .internal .SharedResourcePool ;
7275import io .grpc .lookup .v1 .RouteLookupServiceGrpc ;
7376import io .grpc .rls .CachingRlsLbClient .CacheEntry ;
7477import io .grpc .rls .CachingRlsLbClient .CachedRouteLookupResponse ;
98101import java .util .Map ;
99102import java .util .Set ;
100103import java .util .concurrent .ExecutionException ;
104+ import java .util .concurrent .Executor ;
101105import java .util .concurrent .ScheduledExecutorService ;
102106import java .util .concurrent .ScheduledFuture ;
103107import java .util .concurrent .TimeUnit ;
104108import java .util .concurrent .TimeoutException ;
109+ import java .util .concurrent .atomic .AtomicBoolean ;
105110import javax .annotation .Nonnull ;
106111import org .junit .After ;
107112import org .junit .Before ;
@@ -341,28 +346,80 @@ public void get_throttledAndRecover() throws Exception {
341346 @ Test
342347 public void controlPlaneTransientToReady_backOffEntriesRemovedAndPickerUpdated () throws Exception {
343348 setUpRlsLbClient ();
349+ final ConnectivityState [] rlsChannelState = new ConnectivityState [1 ];
350+ Runnable channelStateListener = new Runnable () {
351+ @ Override
352+ public void run () {
353+ rlsChannelState [0 ] = fakeHelper .oobChannel .getState (false );
354+ fakeHelper .oobChannel .notifyWhenStateChanged (rlsChannelState [0 ], this );
355+ synchronized (this ) {
356+ notify ();
357+ }
358+ }
359+ };
360+ fakeHelper .oobChannel .notifyWhenStateChanged (fakeHelper .oobChannel .getState (false ),
361+ channelStateListener );
344362 RouteLookupRequest routeLookupRequest = RouteLookupRequest .create (ImmutableMap .of (
345363 "server" , "bigtable.googleapis.com" , "service-key" , "foo" , "method-key" , "bar" ));
346364 rlsServerImpl .setLookupTable (
347365 ImmutableMap .of (
348366 routeLookupRequest ,
349367 RouteLookupResponse .create (ImmutableList .of ("target" ), "header" )));
350368
369+ CachedRouteLookupResponse resp = getInSyncContext (routeLookupRequest );
370+ assertThat (resp .isPending ()).isTrue ();
371+ // server response
372+ fakeClock .forwardTime (SERVER_LATENCY_MILLIS , TimeUnit .MILLISECONDS );
373+ resp = getInSyncContext (routeLookupRequest );
374+ assertThat (resp .hasData ()).isTrue ();
375+
351376 fakeHelper .server .shutdown ();
377+ // Channel goes to IDLE state from the shutdown listener handling.
378+ try {
379+ if (!fakeHelper .server .awaitTermination (10 , TimeUnit .SECONDS )) {
380+ fakeHelper .server .shutdownNow (); // Forceful shutdown if graceful timeout expires
381+ }
382+ } catch (InterruptedException e ) {
383+ fakeHelper .server .shutdownNow ();
384+ }
385+ // Use a different key to cause a cache miss and trigger a RPC.
386+ RouteLookupRequest routeLookupRequest2 = RouteLookupRequest .create (ImmutableMap .of (
387+ "server" , "bigtable.googleapis.com" , "service-key" , "foo2" , "method-key" , "bar" ));
388+ // Rls channel will go to TRANSIENT_FAILURE (back-off) because the picker notices the
389+ // subchannel state IDLE and the server transport listener is null.
390+ resp = getInSyncContext (routeLookupRequest2 );
391+ assertThat (resp .isPending ()).isTrue ();
392+ assertThat (rlsChannelState [0 ]).isEqualTo (ConnectivityState .TRANSIENT_FAILURE );
393+ // Throttle the next rpc call.
352394 fakeThrottler .nextResult = true ;
353395 fakeBackoffProvider .nextPolicy = createBackoffPolicy (10 , TimeUnit .MILLISECONDS );
354396
355- CachedRouteLookupResponse resp = getInSyncContext (routeLookupRequest );
397+ // Cause another cache miss by using a new request key. This will create a back-off Rls
398+ // cache entry.
399+ RouteLookupRequest routeLookupRequest3 = RouteLookupRequest .create (ImmutableMap .of (
400+ "server" , "bigtable.googleapis.com" , "service-key" , "foo3" , "method-key" , "bar" ));
401+ resp = getInSyncContext (routeLookupRequest3 );
356402
357403 assertThat (resp .hasError ()).isTrue ();
358404
359- fakeHelper .server .start ();
360- Thread .sleep (10000 );
361- // Backoff entry evicted from cache.
362- verify (evictionListener )
363- .onEviction (eq (routeLookupRequest ), any (CacheEntry .class ), eq (EvictionType .EXPLICIT ));
364- // Assert that Rls LB policy picker was updated.
365- assertThat (fakeHelper .lastPicker .toString ()).isEqualTo ("RlsPicker{target=service1}" );
405+ fakeHelper .createServerAndRegister ("service1" );
406+ // Wait for Rls subchannel back-off expiry and its moving to READY
407+ synchronized (channelStateListener ) {
408+ channelStateListener .wait (5000 );
409+ }
410+ assertThat (rlsChannelState [0 ]).isEqualTo (ConnectivityState .READY );
411+ final ObjectPool <? extends Executor > defaultExecutorPool =
412+ SharedResourcePool .forResource (GrpcUtil .SHARED_CHANNEL_EXECUTOR );
413+ AtomicBoolean isSuccess = new AtomicBoolean (false );
414+ defaultExecutorPool .getObject ().execute (() -> {
415+ // Backoff entry evicted from cache.
416+ verify (evictionListener )
417+ .onEviction (eq (routeLookupRequest3 ), any (CacheEntry .class ), eq (EvictionType .EXPLICIT ));
418+ // Assert that Rls LB policy picker was updated.
419+ assertThat (fakeHelper .lastPicker .toString ()).isEqualTo ("RlsPicker{target=service1}" );
420+ isSuccess .set (true );
421+ });
422+ // No need to wait because direct executor is used.
366423 }
367424
368425 @ Test
@@ -912,16 +969,21 @@ private final class FakeHelper extends Helper {
912969 SubchannelPicker lastPicker ;
913970 Server server ;
914971 ManagedChannel oobChannel ;
972+
973+ void createServerAndRegister (String target ) throws IOException {
974+ server = InProcessServerBuilder .forName (target )
975+ .addService (rlsServerImpl )
976+ .directExecutor ()
977+ .build ()
978+ .start ();
979+ grpcCleanupRule .register (server );
980+ }
981+
915982 @ Override
916983 public ManagedChannelBuilder <?> createResolvingOobChannelBuilder (
917984 String target , ChannelCredentials creds ) {
918985 try {
919- server = InProcessServerBuilder .forName (target )
920- .addService (rlsServerImpl )
921- .directExecutor ()
922- .build ()
923- .start ();
924- grpcCleanupRule .register (server );
986+ createServerAndRegister (target );
925987 } catch (IOException e ) {
926988 throw new RuntimeException ("cannot create server: " + target , e );
927989 }
@@ -937,7 +999,8 @@ protected ManagedChannelBuilder<?> delegate() {
937999
9381000 @ Override
9391001 public ManagedChannel build () {
940- return grpcCleanupRule .register (super .build ());
1002+ oobChannel = super .build ();
1003+ return grpcCleanupRule .register (oobChannel );
9411004 }
9421005
9431006 @ Override
@@ -955,9 +1018,7 @@ public CleaningChannelBuilder overrideAuthority(String authority) {
9551018 }
9561019 }
9571020
958- CleaningChannelBuilder oobChannelBuilder = new CleaningChannelBuilder ();
959- oobChannel = oobChannelBuilder .build ();
960- return oobChannelBuilder ;
1021+ return new CleaningChannelBuilder ();
9611022 }
9621023
9631024 @ Override
0 commit comments