diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 18cc449d15dcb..f925e3f39e46c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -186,10 +186,10 @@ public void cleanup() throws Exception { @AfterMethod(alwaysRun = true) public void cleanupAfterMethod() throws Exception{ // cleanup. - Set existsNsSetAferSetup = Stream.concat(testLocalNamespaces.stream(), testGlobalNamespaces.stream()) + Set existsNsSetAfterSetup = Stream.concat(testLocalNamespaces.stream(), testGlobalNamespaces.stream()) .map(Objects::toString).collect(Collectors.toSet()); - cleanupNamespaceByPredicate(this.testTenant, v -> !existsNsSetAferSetup.contains(v)); - cleanupNamespaceByPredicate(this.testOtherTenant, v -> !existsNsSetAferSetup.contains(v)); + cleanupNamespaceByPredicate(this.testTenant, v -> !existsNsSetAfterSetup.contains(v)); + cleanupNamespaceByPredicate(this.testOtherTenant, v -> !existsNsSetAfterSetup.contains(v)); } protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 258c0183114fd..feb06f464e6ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -195,109 +195,119 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test public void testGetSubscriptions() { String testLocalTopicName = "topic-not-found"; + String subscriptionName = "test"; + // 1) Confirm that the topic does not exist - AsyncResponse response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true); - ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class); - verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); - Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), - Response.Status.NOT_FOUND.getStatusCode()); - Assert.assertEquals(errorCaptor.getValue().getMessage(), String.format("Topic %s not found", - "persistent://my-tenant/my-namespace/topic-not-found")); + assertTopicNotFound(testLocalTopicName); // 2) Confirm that the partitioned topic does not exist - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", - true); - errorCaptor = ArgumentCaptor.forClass(RestException.class); - verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); - Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), - Response.Status.NOT_FOUND.getStatusCode()); - Assert.assertEquals(errorCaptor.getValue().getMessage(), - "Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has " - + "zero partitions"); + assertPartitionedTopicNotFound(testLocalTopicName); + + // 3) Confirm that the namespace does not exist + assertNamespaceNotFound(testLocalTopicName); + + // 4) Create the partitioned topic + createPartitionedTopic(testLocalTopicName, 3); + + // 5) Create a subscription + createSubscription(testLocalTopicName, subscriptionName); + + // 6) Confirm that the subscription exists + assertSubscriptionExists(testLocalTopicName + "-partition-0", subscriptionName); + + // 7) Delete the subscription + deleteSubscription(testLocalTopicName, subscriptionName); + + // 8) Confirm that the subscription does not exist + assertSubscriptionDoesNotExist(testLocalTopicName + "-partition-0"); + + // 9) Create a subscription for partitioned-topic + createSubscription(testLocalTopicName + "-partition-1", subscriptionName); + assertSubscriptionExists(testLocalTopicName + "-partition-1", subscriptionName); + assertSubscriptionDoesNotExist(testLocalTopicName + "-partition-0"); + assertSubscriptionExists(testLocalTopicName, subscriptionName); + + // 10) Delete the partitioned topic + deletePartitionedTopic(testLocalTopicName); + } + +// Helper methods + + private void assertTopicNotFound(String topicName) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, testNamespace, topicName, true); + verifyErrorResponse(response, Response.Status.NOT_FOUND, String.format("Topic %s not found", + "persistent://my-tenant/my-namespace/" + topicName)); + } + + private void assertPartitionedTopicNotFound(String topicName) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, testNamespace, topicName + "-partition-0", true); + verifyErrorResponse(response, Response.Status.NOT_FOUND, + "Partitioned Topic not found: persistent://my-tenant/my-namespace/" + topicName + "-partition-0 has zero partitions"); + } - // Confirm that the namespace does not exist + private void assertNamespaceNotFound(String topicName) { String notExistNamespace = "not-exist-namespace"; - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, notExistNamespace, testLocalTopicName, - true); - errorCaptor = ArgumentCaptor.forClass(RestException.class); - verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); - Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), - Response.Status.NOT_FOUND.getStatusCode()); - Assert.assertEquals(errorCaptor.getValue().getMessage(), "Namespace not found"); + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, notExistNamespace, topicName, true); + verifyErrorResponse(response, Response.Status.NOT_FOUND, "Namespace not found"); + } - // 3) Create the partitioned topic - response = mock(AsyncResponse.class); - ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); - persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3, true); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); - Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + private void createPartitionedTopic(String topicName, int numPartitions) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, numPartitions, true); + verifyResponseStatus(response, Response.Status.NO_CONTENT); + } - // 4) Create a subscription - response = mock(AsyncResponse.class); - persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true, + private void createSubscription(String topicName, String subscriptionName) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, topicName, subscriptionName, true, new ResetCursorData(MessageId.earliest), false); - responseCaptor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); - Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + verifyResponseStatus(response, Response.Status.NO_CONTENT); + } - // 5) Confirm that the subscription exists - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", - true); - verify(response, timeout(5000).times(1)).resume(Set.of("test")); + private void assertSubscriptionExists(String topicName, String subscriptionName) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, testNamespace, topicName, true); + verify(response, timeout(5000)).resume(Set.of(subscriptionName)); + } - // 6) Delete the subscription - response = mock(AsyncResponse.class); - persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", false, - true); - responseCaptor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); - Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + private void deleteSubscription(String topicName, String subscriptionName) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.deleteSubscription(response, testTenant, testNamespace, topicName, subscriptionName, false, true); + verifyResponseStatus(response, Response.Status.NO_CONTENT); + } - // 7) Confirm that the subscription does not exist - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", - true); - verify(response, timeout(5000).times(1)).resume(Set.of()); + private void assertSubscriptionDoesNotExist(String topicName) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.getSubscriptions(response, testTenant, testNamespace, topicName, true); + verify(response, timeout(5000)).resume(Set.of()); + } - // 8) Create a sub of partitioned-topic - response = mock(AsyncResponse.class); - persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", - "test", true, - new ResetCursorData(MessageId.earliest), false); - responseCaptor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); - Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - // - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-1", - true); - verify(response, timeout(5000).times(1)).resume(Set.of("test")); - // - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", - true); - verify(response, timeout(5000).times(1)).resume(Set.of()); - // - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName, true); - verify(response, timeout(5000).times(1)).resume(Set.of("test")); + private void deletePartitionedTopic(String topicName) { + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, topicName, true, true); + verifyResponseStatus(response, Response.Status.NO_CONTENT); + } - // 9) Delete the partitioned topic - response = mock(AsyncResponse.class); - persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true); - responseCaptor = ArgumentCaptor.forClass(Response.class); - verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); - Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + private void verifyErrorResponse(AsyncResponse response, Response.Status expectedStatus, String expectedMessage) { + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000)).resume(errorCaptor.capture()); + Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), expectedStatus.getStatusCode()); + Assert.assertEquals(errorCaptor.getValue().getMessage(), expectedMessage); } + private void verifyResponseStatus(AsyncResponse response, Response.Status expectedStatus) { + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), expectedStatus.getStatusCode()); + } + + @Test public void testCreateSubscriptions() throws Exception {