File tree Expand file tree Collapse file tree 1 file changed +7
-1
lines changed
pulsar-broker/src/test/java/org/apache/pulsar/broker/service Expand file tree Collapse file tree 1 file changed +7
-1
lines changed Original file line number Diff line number Diff line change @@ -127,7 +127,7 @@ public void testOverwriteOldProducerAfterTopicClosed(
127
127
128
128
// Add old producer
129
129
topic .addProducer (oldProducer , new CompletableFuture <>()).join ();
130
-
130
+
131
131
CountDownLatch oldCnxCheckInvokedLatch = new CountDownLatch (1 );
132
132
CountDownLatch oldCnxCheckStartLatch = new CountDownLatch (1 );
133
133
doAnswer (invocation -> {
@@ -149,11 +149,17 @@ public void testOverwriteOldProducerAfterTopicClosed(
149
149
CompletableFuture <Optional <Long >> producerEpoch =
150
150
topic .addProducer (newProducer , new CompletableFuture <>());
151
151
152
+ // Wait until new producer entered `AbstractTopic#tryOverwriteOldProducer`
152
153
oldCnxCheckInvokedLatch .await ();
154
+
153
155
topic .close (true );
154
156
// Run pending tasks to remove old producer from topic.
155
157
((EmbeddedChannel ) oldCnx .ctx ().channel ()).runPendingTasks ();
158
+
159
+ // Unblock ServerCnx#checkConnectionLiveness to resume `AbstractTopic#tryOverwriteOldProducer`
156
160
oldCnxCheckStartLatch .countDown ();
161
+
162
+ // As topic is fenced, adding new producer should fail.
157
163
try {
158
164
producerEpoch .join ();
159
165
fail ("TopicFencedException expected" );
You can’t perform that action at this time.
0 commit comments