From 069cc3db7c741a7b1fc64c79733ada847dca873e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=93=E5=90=9B?= Date: Tue, 17 Dec 2024 11:38:13 +0800 Subject: [PATCH] [fix][admin] Verify is policies read only before revoke permissions on topic (#23730) --- .../admin/impl/PersistentTopicsBase.java | 3 +- .../broker/admin/PersistentTopicsTest.java | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9a306f6b4fff7..1300cd3449c27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -289,7 +289,8 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges CompletableFuture validateAccessForTenantCf = - validateAdminAccessForTenantAsync(namespaceName.getTenant()); + validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics(); if (checkIfTopicExists) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index cca5049ed50eb..302948903442c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1030,6 +1030,36 @@ public void testRevokePartitionedTopic() { } } + @Test + public void testRevokePartitionedTopicWithReadonlyPolicies() throws Exception { + final String partitionedTopicName = "testRevokePartitionedTopicWithReadonlyPolicies-topic"; + final int numPartitions = 5; + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.createPartitionedTopic( + response, testTenant, testNamespace, partitionedTopicName, numPartitions, true); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + String role = "role"; + Set expectActions = new HashSet<>(); + expectActions.add(AuthAction.produce); + response = mock(AsyncResponse.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); + persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role, + expectActions); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + response = mock(AsyncResponse.class); + doReturn(CompletableFuture.failedFuture( + new RestException(Response.Status.FORBIDDEN, "Broker is forbidden to do read-write operations")) + ).when(persistentTopics).validatePoliciesReadOnlyAccessAsync(); + persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(RestException.class); + verify(response, timeout(5000).times(1)).resume(exceptionCaptor.capture()); + Assert.assertEquals(exceptionCaptor.getValue().getResponse().getStatus(), + Response.Status.FORBIDDEN.getStatusCode()); + } + @Test public void testTriggerCompactionTopic() { final String partitionTopicName = "test-part";