From 1bb9378b50aa891834b64cd39f55ae0e32a055bb Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Sun, 28 Apr 2024 10:37:37 +0800 Subject: [PATCH] [improve][test] Add policy authentication test for namespace API (#22593) --- .../broker/admin/NamespaceAuthZTest.java | 1248 +++++++++++++++-- 1 file changed, 1140 insertions(+), 108 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index 5358295b78568..ec6a122f7df80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -20,9 +20,11 @@ package org.apache.pulsar.broker.admin; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.deleteNamespaceWithRetry; +import static org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; import java.io.File; import java.util.ArrayList; @@ -32,6 +34,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; @@ -44,17 +48,33 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.security.MockedPulsarStandalone; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -72,7 +92,7 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private AuthorizationService authorizationService; - private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @@ -122,16 +142,46 @@ public void after() throws Exception { superUserAdmin.namespaces().createNamespace("public/default"); } - private void setAuthorizationOperationChecker(String role, NamespaceOperation operation) { + private AtomicBoolean setAuthorizationOperationChecker(String role, NamespaceOperation operation) { + AtomicBoolean execFlag = new AtomicBoolean(false); Mockito.doAnswer(invocationOnMock -> { String role_ = invocationOnMock.getArgument(2); if (role.equals(role_)) { NamespaceOperation operation_ = invocationOnMock.getArgument(1); Assert.assertEquals(operation_, operation); } + execFlag.set(true); return invocationOnMock.callRealMethod(); }).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + return execFlag; + } + + private void clearAuthorizationOperationChecker() { + Mockito.doAnswer(InvocationOnMock::callRealMethod).when(authorizationService) + .allowNamespaceOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any()); + } + + private AtomicBoolean setAuthorizationPolicyOperationChecker(String role, Object policyName, Object operation) { + AtomicBoolean execFlag = new AtomicBoolean(false); + if (operation instanceof PolicyOperation) { + Mockito.doAnswer(invocationOnMock -> { + String role_ = invocationOnMock.getArgument(3); + if (role.equals(role_)) { + PolicyName policyName_ = invocationOnMock.getArgument(1); + PolicyOperation operation_ = invocationOnMock.getArgument(2); + assertEquals(operation_, operation); + assertEquals(policyName_, policyName); + } + execFlag.set(true); + return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowNamespacePolicyOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.any()); + } else { + throw new IllegalArgumentException(""); + } + return execFlag; } @SneakyThrows @@ -140,13 +190,12 @@ public void testProperties() { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://public/default/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -214,18 +263,17 @@ public void testProperties() { superUserAdmin.topics().delete(topic, true); } - @Test + @Test public void testTopics() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -236,10 +284,10 @@ public void testTopics() throws Exception { // test tenant manager tenantManagerAdmin.namespaces().getTopics(namespace); + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.GET_TOPICS); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().getTopics(namespace)); - - setAuthorizationOperationChecker(subject, NamespaceOperation.GET_TOPICS); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); @@ -260,13 +308,12 @@ public void testBookieAffinityGroup() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -302,11 +349,11 @@ public void testBookieAffinityGroup() throws Exception { for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupData)); + () -> subAdmin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupData)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().getBookieAffinityGroup(namespace)); + () -> subAdmin.namespaces().getBookieAffinityGroup(namespace)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().deleteBookieAffinityGroup(namespace)); + () -> subAdmin.namespaces().deleteBookieAffinityGroup(namespace)); superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } @@ -319,20 +366,19 @@ public void testGetBundles() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); Producer producer = pulsarClient.newProducer(Schema.BYTES) - .topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -343,10 +389,10 @@ public void testGetBundles() throws Exception { tenantManagerAdmin.namespaces().getBundles(namespace); // test nobody + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.GET_BUNDLE); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().getBundles(namespace)); - - setAuthorizationOperationChecker(subject, NamespaceOperation.GET_BUNDLE); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); @@ -367,20 +413,19 @@ public void testUnloadBundles() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); Producer producer = pulsarClient.newProducer(Schema.BYTES) - .topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -401,7 +446,7 @@ public void testUnloadBundles() throws Exception { for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle)); + () -> subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle)); superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } @@ -413,20 +458,19 @@ public void testSplitBundles() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); Producer producer = pulsarClient.newProducer(Schema.BYTES) - .topic(topic) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -447,7 +491,7 @@ public void testSplitBundles() throws Exception { for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, null)); + () -> subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, null)); superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } @@ -459,13 +503,12 @@ public void testDeleteBundles() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -478,7 +521,8 @@ public void testDeleteBundles() throws Exception { producer.send("message".getBytes()); for (int i = 0; i < 3; i++) { - superUserAdmin.namespaces().splitNamespaceBundle(namespace, Policies.BundleType.LARGEST.toString(), false, null); + superUserAdmin.namespaces() + .splitNamespaceBundle(namespace, Policies.BundleType.LARGEST.toString(), false, null); } BundlesData bundles = superUserAdmin.namespaces().getBundles(namespace); @@ -490,7 +534,7 @@ public void testDeleteBundles() throws Exception { for (int i = 0; i < boundaries.size() - 1; i++) { String bundleRange = boundaries.get(i) + "_" + boundaries.get(i + 1); List allTopicsFromNamespaceBundle = getPulsarService().getBrokerService() - .getAllTopicsFromNamespaceBundle(namespace, namespace + "/" + bundleRange); + .getAllTopicsFromNamespaceBundle(namespace, namespace + "/" + bundleRange); System.out.println(StringUtils.join(allTopicsFromNamespaceBundle)); if (allTopicsFromNamespaceBundle.isEmpty()) { bundleRanges.add(bundleRange); @@ -504,15 +548,15 @@ public void testDeleteBundles() throws Exception { tenantManagerAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1)); // test nobody + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.DELETE_BUNDLE); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1))); - - setAuthorizationOperationChecker(subject, NamespaceOperation.DELETE_BUNDLE); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1))); + () -> subAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1))); superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } } @@ -522,7 +566,7 @@ public void testPermission() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); @@ -530,13 +574,11 @@ public void testPermission() throws Exception { final String role = "sub"; final AuthAction testAction = AuthAction.consume; - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); - // test super admin superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction)); Map> permissions = superUserAdmin.namespaces().getPermissions(namespace); @@ -554,25 +596,33 @@ public void testPermission() throws Exception { Assert.assertTrue(permissions.isEmpty()); // test nobody + AtomicBoolean execFlag = + setAuthorizationOperationChecker(subject, NamespaceOperation.GRANT_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction))); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().getPermissions(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = + setAuthorizationOperationChecker(subject, NamespaceOperation.REVOKE_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role)); + Assert.assertTrue(execFlag.get()); + clearAuthorizationOperationChecker(); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); - setAuthorizationOperationChecker(subject, NamespaceOperation.GRANT_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction))); - setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PERMISSION); + () -> subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction))); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().getPermissions(namespace)); - setAuthorizationOperationChecker(subject, NamespaceOperation.REVOKE_PERMISSION); + () -> subAdmin.namespaces().getPermissions(namespace)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role)); + () -> subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role)); superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } @@ -584,13 +634,12 @@ public void testPermissionOnSubscription() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -604,7 +653,8 @@ public void testPermissionOnSubscription() throws Exception { // test super admin superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role)); - Map> permissionOnSubscription = superUserAdmin.namespaces().getPermissionOnSubscription(namespace); + Map> permissionOnSubscription = + superUserAdmin.namespaces().getPermissionOnSubscription(namespace); Assert.assertEquals(permissionOnSubscription.get(subscription), Set.of(role)); superUserAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role); permissionOnSubscription = superUserAdmin.namespaces().getPermissionOnSubscription(namespace); @@ -619,25 +669,29 @@ public void testPermissionOnSubscription() throws Exception { Assert.assertTrue(permissionOnSubscription.isEmpty()); // test nobody + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.GRANT_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role))); + Assert.assertTrue(execFlag.get()); + execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().getPermissionOnSubscription(namespace)); + Assert.assertTrue(execFlag.get()); + execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.REVOKE_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role)); + Assert.assertTrue(execFlag.get()); + clearAuthorizationOperationChecker(); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); - setAuthorizationOperationChecker(subject, NamespaceOperation.GRANT_PERMISSION); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role))); - setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PERMISSION); + () -> subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role))); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().getPermissionOnSubscription(namespace)); - setAuthorizationOperationChecker(subject, NamespaceOperation.REVOKE_PERMISSION); + () -> subAdmin.namespaces().getPermissionOnSubscription(namespace)); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role)); + () -> subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role)); superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); } @@ -649,12 +703,11 @@ public void testClearBacklog() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -665,10 +718,10 @@ public void testClearBacklog() throws Exception { tenantManagerAdmin.namespaces().clearNamespaceBacklog(namespace); // test nobody + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.CLEAR_BACKLOG); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().clearNamespaceBacklog(namespace)); - - setAuthorizationOperationChecker(subject, NamespaceOperation.CLEAR_BACKLOG); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); @@ -684,17 +737,16 @@ public void testClearBacklog() throws Exception { superUserAdmin.topics().delete(topic, true); } - @Test + @Test public void testClearNamespaceBundleBacklog() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -706,17 +758,17 @@ public void testClearNamespaceBundleBacklog() throws Exception { final String defaultBundle = "0x00000000_0xffffffff"; - // test super admin + // test super admin superUserAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle); // test tenant manager tenantManagerAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle); // test nobody - Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.CLEAR_BACKLOG); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle)); - - setAuthorizationOperationChecker(subject, NamespaceOperation.CLEAR_BACKLOG); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); @@ -737,12 +789,11 @@ public void testUnsubscribeNamespace() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -756,17 +807,17 @@ public void testUnsubscribeNamespace() throws Exception { .subscriptionName("sub") .subscribe().close(); - // test super admin + // test super admin superUserAdmin.namespaces().unsubscribeNamespace(namespace, "sub"); // test tenant manager tenantManagerAdmin.namespaces().unsubscribeNamespace(namespace, "sub"); // test nobody - Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.UNSUBSCRIBE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().unsubscribeNamespace(namespace, "sub")); - - setAuthorizationOperationChecker(subject, NamespaceOperation.UNSUBSCRIBE); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); @@ -786,13 +837,12 @@ public void testUnsubscribeNamespace() throws Exception { public void testUnsubscribeNamespaceBundle() throws Exception { final String random = UUID.randomUUID().toString(); final String namespace = "public/default"; - final String topic = "persistent://" + namespace + "/" + random; - final String subject = UUID.randomUUID().toString(); + final String topic = "persistent://" + namespace + "/" + random ; + final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); superUserAdmin.topics().createNonPartitionedTopic(topic); - @Cleanup - final PulsarAdmin subAdmin = PulsarAdmin.builder() + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); @@ -808,17 +858,17 @@ public void testUnsubscribeNamespaceBundle() throws Exception { final String defaultBundle = "0x00000000_0xffffffff"; - // test super admin + // test super admin superUserAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub"); // test tenant manager tenantManagerAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub"); // test nobody - Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.UNSUBSCRIBE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub")); - - setAuthorizationOperationChecker(subject, NamespaceOperation.UNSUBSCRIBE); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); @@ -921,6 +971,7 @@ public void testPackageAPI() throws Exception { tenantManagerAdmin.packages().updateMetadata(packageName, updatedMetadata); // ---- test nobody --- + AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.PACKAGES); File file3 = File.createTempFile("package-api-test", ".package"); @@ -954,9 +1005,7 @@ public void testPackageAPI() throws Exception { updatedMetadata3.setProperties(Collections.singletonMap("key", "value")); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> subAdmin.packages().updateMetadata(packageName3, updatedMetadata3)); - - - setAuthorizationOperationChecker(subject, NamespaceOperation.PACKAGES); + Assert.assertTrue(execFlag.get()); for (AuthAction action : AuthAction.values()) { superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); @@ -1022,7 +1071,7 @@ public void testPackageAPI() throws Exception { @Test @SneakyThrows - public void testOffloadThresholdInSeconds() { + public void testDispatchRate() { final String namespace = "public/default"; final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() @@ -1031,16 +1080,27 @@ public void testOffloadThresholdInSeconds() { .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.READ); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().getOffloadThresholdInSeconds(namespace)); + () -> subAdmin.namespaces().getDispatchRate(namespace)); + Assert.assertTrue(execFlag.get()); + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + DispatchRate dispatchRate = + DispatchRate.builder().dispatchThrottlingRateInByte(10).dispatchThrottlingRateInMsg(10).build(); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 10000)); + () -> subAdmin.namespaces().setDispatchRate(namespace, dispatchRate)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeDispatchRate(namespace)); + Assert.assertTrue(execFlag.get()); } @Test @SneakyThrows - public void testMaxSubscriptionsPerTopic() { + public void testSubscribeRate() { final String namespace = "public/default"; final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() @@ -1049,13 +1109,985 @@ public void testMaxSubscriptionsPerTopic() { .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(token)) .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.READ); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace)); + () -> subAdmin.namespaces().getSubscribeRate(namespace)); + Assert.assertTrue(execFlag.get()); + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 100)); + () -> subAdmin.namespaces().setSubscribeRate(namespace, new SubscribeRate())); + Assert.assertTrue(execFlag.get()); + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, - () -> subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace)); + () -> subAdmin.namespaces().removeSubscribeRate(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testPublishRate() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getPublishRate(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setPublishRate(namespace, new PublishRate(10, 10))); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removePublishRate(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testSubscriptionDispatchRate() { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getSubscriptionDispatchRate(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + DispatchRate dispatchRate = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(10).build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeSubscriptionDispatchRate(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testCompactionThreshold() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.COMPACTION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getCompactionThreshold(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.COMPACTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setCompactionThreshold(namespace, 100L * 1024L *1024L)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.COMPACTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeCompactionThreshold(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testAutoTopicCreation() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getAutoTopicCreation(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE); + AutoTopicCreationOverride build = AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setAutoTopicCreation(namespace, build)); + Assert.assertTrue(execFlag.get()); + + execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeAutoTopicCreation(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testAutoSubscriptionCreation() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_SUBSCRIPTION_CREATION, + PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getAutoSubscriptionCreation(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_SUBSCRIPTION_CREATION, + PolicyOperation.WRITE); + AutoSubscriptionCreationOverride build = + AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setAutoSubscriptionCreation(namespace, build)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.AUTO_SUBSCRIPTION_CREATION, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeAutoSubscriptionCreation(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testMaxUnackedMessagesPerConsumer() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, + PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getMaxUnackedMessagesPerConsumer(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, + PolicyOperation.WRITE); + AutoSubscriptionCreationOverride build = + AutoSubscriptionCreationOverride.builder().allowAutoSubscriptionCreation(true).build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setMaxUnackedMessagesPerConsumer(namespace, 100)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeMaxUnackedMessagesPerConsumer(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testMaxUnackedMessagesPerSubscription() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, + PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getMaxUnackedMessagesPerSubscription(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setMaxUnackedMessagesPerSubscription(namespace, 100)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_UNACKED, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeMaxUnackedMessagesPerSubscription(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testNamespaceResourceGroup() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RESOURCEGROUP, + PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getNamespaceResourceGroup(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RESOURCEGROUP, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setNamespaceResourceGroup(namespace, "test-group")); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RESOURCEGROUP, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeNamespaceResourceGroup(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testDispatcherPauseOnAckStatePersistent() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, + PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getDispatcherPauseOnAckStatePersistent(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setDispatcherPauseOnAckStatePersistent(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, + PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeDispatcherPauseOnAckStatePersistent(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testBacklogQuota() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getBacklogQuotaMap(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, PolicyOperation.WRITE); + BacklogQuota backlogQuota = BacklogQuota.builder().limitTime(10).limitSize(10).build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setBacklogQuota(namespace, backlogQuota)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.BACKLOG, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeBacklogQuota(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testDeduplicationSnapshotInterval() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getDeduplicationSnapshotInterval(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setDeduplicationSnapshotInterval(namespace, 100)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeDeduplicationSnapshotInterval(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testMaxSubscriptionsPerTopic() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getMaxSubscriptionsPerTopic(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setMaxSubscriptionsPerTopic(namespace, 10)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeMaxSubscriptionsPerTopic(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testMaxProducersPerTopic() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_PRODUCERS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getMaxProducersPerTopic(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setMaxProducersPerTopic(namespace, 10)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeMaxProducersPerTopic(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testMaxConsumersPerTopic() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getMaxConsumersPerTopic(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setMaxConsumersPerTopic(namespace, 10)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeMaxConsumersPerTopic(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testNamespaceReplicationClusters() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getNamespaceReplicationClusters(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"))); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testReplicatorDispatchRate() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION_RATE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getReplicatorDispatchRate(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE); + DispatchRate build = + DispatchRate.builder().dispatchThrottlingRateInByte(10).dispatchThrottlingRateInMsg(10).build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setReplicatorDispatchRate(namespace, build)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeReplicatorDispatchRate(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testMaxConsumersPerSubscription() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getMaxConsumersPerSubscription(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setMaxConsumersPerSubscription(namespace, 10)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeMaxConsumersPerSubscription(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testOffloadThreshold() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getOffloadThreshold(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setOffloadThreshold(namespace, 10)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testOffloadPolicies() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getOffloadPolicies(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.WRITE); + OffloadPolicies offloadPolicies = OffloadPolicies.builder().managedLedgerOffloadThresholdInBytes(10L).build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setOffloadPolicies(namespace, offloadPolicies)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeOffloadPolicies(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testMaxTopicsPerNamespace() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_TOPICS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getMaxTopicsPerNamespace(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setMaxTopicsPerNamespace(namespace, 10)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeMaxTopicsPerNamespace(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testDeduplicationStatus() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getDeduplicationStatus(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setDeduplicationStatus(namespace, true)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeDeduplicationStatus(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testPersistence() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.PERSISTENCE, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getPersistence(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.PERSISTENCE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setPersistence(namespace, new PersistencePolicies(10, 10, 10, 10))); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.PERSISTENCE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removePersistence(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testNamespaceMessageTTL() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getNamespaceMessageTTL(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setNamespaceMessageTTL(namespace, 10)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.TTL, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeNamespaceMessageTTL(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testSubscriptionExpirationTime() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, + PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getSubscriptionExpirationTime(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setSubscriptionExpirationTime(namespace, 10)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeSubscriptionExpirationTime(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testDelayedDeliveryMessages() { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getDelayedDelivery(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testRetention() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getRetention(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10))); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.RETENTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeRetention(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testInactiveTopicPolicies() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getInactiveTopicPolicies(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE); + InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies( + InactiveTopicDeleteMode.delete_when_no_subscriptions, 10, false); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeInactiveTopicPolicies(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testNamespaceAntiAffinityGroup() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ANTI_AFFINITY, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getNamespaceAntiAffinityGroup(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setNamespaceAntiAffinityGroup(namespace, "invalid-group")); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testOffloadDeleteLagMs() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getOffloadDeleteLagMs(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setOffloadDeleteLag(namespace, 100, TimeUnit.HOURS)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testOffloadThresholdInSeconds() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = + setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getOffloadThresholdInSeconds(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.OFFLOAD, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setOffloadThresholdInSeconds(namespace, 10000)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testNamespaceEntryFilters() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getNamespaceEntryFilters(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setNamespaceEntryFilters(namespace, new EntryFilters("filter1"))); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeNamespaceEntryFilters(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testEncryptionRequiredStatus() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ENCRYPTION, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getEncryptionRequiredStatus(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.ENCRYPTION, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setEncryptionRequiredStatus(namespace, false)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testSubscriptionTypesEnabled() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_AUTH_MODE, + PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getSubscriptionTypesEnabled(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setSubscriptionTypesEnabled(namespace, Sets.newHashSet(SubscriptionType.Failover))); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().removeSubscriptionTypesEnabled(namespace)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testIsAllowAutoUpdateSchema() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getIsAllowAutoUpdateSchema(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testSchemaAutoUpdateCompatibilityStrategy() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, AutoUpdateDisabled)); + Assert.assertTrue(execFlag.get()); + } + + @Test + @SneakyThrows + public void testSchemaValidationEnforced() { + final String namespace = "public/default"; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + AtomicBoolean execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getSchemaValidationEnforced(namespace)); + Assert.assertTrue(execFlag.get()); + + execFlag = setAuthorizationPolicyOperationChecker(subject, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setSchemaValidationEnforced(namespace, true)); + Assert.assertTrue(execFlag.get()); } }