Skip to content

Commit

Permalink
[fix][admin] Verify is policies read only before revoke permissions o…
Browse files Browse the repository at this point in the history
…n topic (#23730)
  • Loading branch information
dao-jun authored Dec 17, 2024
1 parent 9a7269a commit 069cc3d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
CompletableFuture<Void> validateAccessForTenantCf =
validateAdminAccessForTenantAsync(namespaceName.getTenant());
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());

var checkIfTopicExists = !pulsar().getConfiguration().isAllowAclChangesOnNonExistentTopics();
if (checkIfTopicExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,36 @@ public void testRevokePartitionedTopic() {
}
}

@Test
public void testRevokePartitionedTopicWithReadonlyPolicies() throws Exception {
final String partitionedTopicName = "testRevokePartitionedTopicWithReadonlyPolicies-topic";
final int numPartitions = 5;
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(
response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role,
expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
doReturn(CompletableFuture.failedFuture(
new RestException(Response.Status.FORBIDDEN, "Broker is forbidden to do read-write operations"))
).when(persistentTopics).validatePoliciesReadOnlyAccessAsync();
persistentTopics.revokePermissionsOnTopic(response, testTenant, testNamespace, partitionedTopicName, role);
ArgumentCaptor<RestException> exceptionCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(exceptionCaptor.capture());
Assert.assertEquals(exceptionCaptor.getValue().getResponse().getStatus(),
Response.Status.FORBIDDEN.getStatusCode());
}

@Test
public void testTriggerCompactionTopic() {
final String partitionTopicName = "test-part";
Expand Down

0 comments on commit 069cc3d

Please sign in to comment.