26
26
import static org .testng .Assert .assertEquals ;
27
27
import static org .testng .Assert .assertNotNull ;
28
28
import static org .testng .Assert .fail ;
29
- import com .google .common .collect .ImmutableMap ;
30
29
import io .netty .channel .embedded .EmbeddedChannel ;
30
+ import java .util .Map ;
31
31
import java .util .Optional ;
32
32
import java .util .concurrent .CompletableFuture ;
33
33
import java .util .concurrent .CompletionException ;
@@ -77,7 +77,7 @@ private Supplier<Pair<AbstractTopic, AbstractSubscription>> createProvider(
77
77
}
78
78
Topic topic = spy (topicOpt .get ());
79
79
AbstractSubscription subscription = mock (subscriptionClass );
80
- doReturn (ImmutableMap .of ("subscription" , subscription ))
80
+ doReturn (Map .of ("subscription" , subscription ))
81
81
.when (topic ).getSubscriptions ();
82
82
return Pair .of ((AbstractTopic ) topic , subscription );
83
83
};
@@ -127,7 +127,7 @@ public void testOverwriteOldProducerAfterTopicClosed(
127
127
128
128
// Add old producer
129
129
topic .addProducer (oldProducer , new CompletableFuture <>()).join ();
130
-
130
+
131
131
CountDownLatch oldCnxCheckInvokedLatch = new CountDownLatch (1 );
132
132
CountDownLatch oldCnxCheckStartLatch = new CountDownLatch (1 );
133
133
doAnswer (invocation -> {
@@ -151,14 +151,14 @@ public void testOverwriteOldProducerAfterTopicClosed(
151
151
152
152
// Wait until new producer entered `AbstractTopic#tryOverwriteOldProducer`
153
153
oldCnxCheckInvokedLatch .await ();
154
-
154
+
155
155
topic .close (true );
156
156
// Run pending tasks to remove old producer from topic.
157
157
((EmbeddedChannel ) oldCnx .ctx ().channel ()).runPendingTasks ();
158
-
158
+
159
159
// Unblock ServerCnx#checkConnectionLiveness to resume `AbstractTopic#tryOverwriteOldProducer`
160
160
oldCnxCheckStartLatch .countDown ();
161
-
161
+
162
162
// As topic is fenced, adding new producer should fail.
163
163
try {
164
164
producerEpoch .join ();
0 commit comments