From afc924405829f23962e04a717571f33930c165d6 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Tue, 26 Sep 2023 10:33:35 +0800 Subject: [PATCH] [fix][broker] Fix inconsistent topic policy (#21231) --- .../ProxySaslAuthenticationTest.java | 13 +- .../authentication/SaslAuthenticateTest.java | 11 +- .../pulsar/broker/service/BrokerService.java | 295 +++++++++--------- .../SystemTopicBasedTopicPoliciesService.java | 100 ++++-- .../broker/service/TopicPoliciesService.java | 41 +++ .../broker/admin/PersistentTopicsTest.java | 1 + .../broker/admin/TopicPoliciesTest.java | 2 +- .../TopicPoliciesWithBrokerRestartTest.java | 104 ++++++ .../pulsar/broker/admin/TopicsAuthTest.java | 2 + .../pulsar/broker/auth/AuthLogsTest.java | 2 + .../broker/auth/MockAuthentication.java | 6 +- .../auth/MockedPulsarServiceBaseTest.java | 19 ++ .../service/BrokerBookieIsolationTest.java | 3 +- ...temTopicBasedTopicPoliciesServiceTest.java | 6 +- .../persistent/PersistentTopicTest.java | 3 +- ...enticationTlsHostnameVerificationTest.java | 3 + .../AuthorizationProducerConsumerTest.java | 5 + .../client/api/MutualAuthenticationTest.java | 2 +- ...okenAuthenticatedProducerConsumerTest.java | 3 + ...uth2AuthenticatedProducerConsumerTest.java | 14 +- ...reTlsProducerConsumerTestWithAuthTest.java | 22 ++ .../PatternTopicsConsumerImplAuthTest.java | 1 + .../standalone_no_client_auth.conf | 3 +- .../proxy/server/ProxyAuthenticationTest.java | 2 +- .../server/ProxyForwardAuthDataTest.java | 2 +- .../server/ProxyRolesEnforcementTest.java | 2 +- .../server/ProxyWithAuthorizationNegTest.java | 2 + .../pulsar/sql/presto/TestPulsarAuth.java | 4 + .../ExtensibleLoadManagerTest.java | 1 + .../integration/presto/TestPulsarSQLAuth.java | 1 + 30 files changed, 478 insertions(+), 197 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java index 261efe680f862..f0e45aa734afb 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationSasl; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; import org.slf4j.Logger; @@ -193,15 +194,17 @@ protected void setup() throws Exception { conf.setAuthenticationProviders(providers); conf.setClusterName("test"); conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm())); - - super.init(); - - lookupUrl = new URI(pulsar.getBrokerServiceUrl()); - // set admin auth, to verify admin web resources Map clientSaslConfig = new HashMap<>(); clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient"); clientSaslConfig.put("serverType", "broker"); + conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName()); + conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory + .getMapper().getObjectMapper().writeValueAsString(clientSaslConfig)); + + super.init(); + + lookupUrl = new URI(pulsar.getBrokerServiceUrl()); log.info("set client jaas section name: PulsarClient"); admin = PulsarAdmin.builder() .serviceHttpUrl(brokerUrl.toString()) diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java index 5cace2221dea8..230c2ad787de4 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java @@ -58,6 +58,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationSasl; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.sasl.SaslConstants; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -186,7 +187,12 @@ protected void setup() throws Exception { conf.setAuthenticationProviders(providers); conf.setClusterName("test"); conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm())); - + Map clientSaslConfig = new HashMap<>(); + clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient"); + clientSaslConfig.put("serverType", "broker"); + conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName()); + conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory + .getMapper().getObjectMapper().writeValueAsString(clientSaslConfig)); super.init(); lookupUrl = new URI(pulsar.getWebServiceAddress()); @@ -197,9 +203,6 @@ protected void setup() throws Exception { .authentication(authSasl)); // set admin auth, to verify admin web resources - Map clientSaslConfig = new HashMap<>(); - clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient"); - clientSaslConfig.put("serverType", "broker"); log.info("set client jaas section name: PulsarClient"); admin = PulsarAdmin.builder() .serviceHttpUrl(brokerUrl.toString()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 391affef1da9b..388d9de84f833 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1760,165 +1760,172 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { }); } - public CompletableFuture getManagedLedgerConfig(TopicName topicName) { + public CompletableFuture getManagedLedgerConfig(@Nonnull TopicName topicName) { + requireNonNull(topicName); NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); - return nsr.getPoliciesAsync(namespace) - .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, localPolicies) -> { - PersistencePolicies persistencePolicies = null; - RetentionPolicies retentionPolicies = null; - OffloadPoliciesImpl topicLevelOffloadPolicies = null; - - if (pulsar.getConfig().isTopicLevelPoliciesEnabled() - && !NamespaceService.isSystemServiceNamespace(namespace.toString())) { - final TopicPolicies topicPolicies = pulsar.getTopicPoliciesService() - .getTopicPoliciesIfExists(topicName); - if (topicPolicies != null) { - persistencePolicies = topicPolicies.getPersistence(); - retentionPolicies = topicPolicies.getRetentionPolicies(); - topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); - } - } - - if (persistencePolicies == null) { - persistencePolicies = policies.map(p -> p.persistence).orElseGet( - () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), - serviceConfig.getManagedLedgerDefaultWriteQuorum(), - serviceConfig.getManagedLedgerDefaultAckQuorum(), - serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); - } + final CompletableFuture> topicPoliciesFuture; + if (pulsar.getConfig().isTopicLevelPoliciesEnabled() + && !NamespaceService.isSystemServiceNamespace(namespace.toString()) + && !SystemTopicNames.isTopicPoliciesSystemTopic(topicName.toString())) { + topicPoliciesFuture = pulsar.getTopicPoliciesService().getTopicPoliciesAsync(topicName); + } else { + topicPoliciesFuture = CompletableFuture.completedFuture(Optional.empty()); + } + return topicPoliciesFuture.thenCompose(topicPoliciesOptional -> { + final CompletableFuture> nsPolicies = nsr.getPoliciesAsync(namespace); + final CompletableFuture> lcPolicies = lpr.getLocalPoliciesAsync(namespace); + return nsPolicies.thenCombine(lcPolicies, (policies, localPolicies) -> { + PersistencePolicies persistencePolicies = null; + RetentionPolicies retentionPolicies = null; + OffloadPoliciesImpl topicLevelOffloadPolicies = null; + if (topicPoliciesOptional.isPresent()) { + final TopicPolicies topicPolicies = topicPoliciesOptional.get(); + persistencePolicies = topicPolicies.getPersistence(); + retentionPolicies = topicPolicies.getRetentionPolicies(); + topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); + } - if (retentionPolicies == null) { - retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( - () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), - serviceConfig.getDefaultRetentionSizeInMB()) - ); - } + if (persistencePolicies == null) { + persistencePolicies = policies.map(p -> p.persistence).orElseGet( + () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), + serviceConfig.getManagedLedgerDefaultWriteQuorum(), + serviceConfig.getManagedLedgerDefaultAckQuorum(), + serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); + } - ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); - managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); - managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); - managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); + if (retentionPolicies == null) { + retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( + () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), + serviceConfig.getDefaultRetentionSizeInMB()) + ); + } - if (serviceConfig.isStrictBookieAffinityEnabled()) { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); + managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); + managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); + + if (serviceConfig.isStrictBookieAffinityEnabled()) { + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName( + IsolatedBookieEnsemblePlacementPolicy.class); + if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { + Map properties = new HashMap<>(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); + properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } else if (isSystemTopic(topicName)) { + Map properties = new HashMap<>(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*"); + properties.put(IsolatedBookieEnsemblePlacementPolicy + .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*"); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } else { + Map properties = new HashMap<>(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, ""); + properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, ""); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } + } else { + if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName( IsolatedBookieEnsemblePlacementPolicy.class); - if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { - Map properties = new HashMap<>(); - properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); - properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); - } else if (isSystemTopic(topicName)) { - Map properties = new HashMap<>(); - properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*"); - properties.put(IsolatedBookieEnsemblePlacementPolicy - .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*"); - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); - } else { - Map properties = new HashMap<>(); - properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, ""); - properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, ""); - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); - } - } else { - if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName( - IsolatedBookieEnsemblePlacementPolicy.class); - Map properties = new HashMap<>(); - properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); - properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); - } + Map properties = new HashMap<>(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); + properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); } + } - managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); - managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); - managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); - - managedLedgerConfig - .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); - managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled( - serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled()); - managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore( - serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore()); - managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); - managedLedgerConfig - .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), - TimeUnit.MINUTES); - managedLedgerConfig - .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), - TimeUnit.MINUTES); - managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes()); - - managedLedgerConfig.setMetadataOperationsTimeoutSeconds( - serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); - managedLedgerConfig - .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); - managedLedgerConfig - .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); - managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); - managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( - serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); - managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); - managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); - managedLedgerConfig - .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); - - managedLedgerConfig - .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); - managedLedgerConfig - .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); - managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); - managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData()); - managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); - managedLedgerConfig.setInactiveLedgerRollOverTime( - serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); - managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( - serviceConfig.isCacheEvictionByMarkDeletedPosition()); - managedLedgerConfig.setMinimumBacklogCursorsForCaching( - serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching()); - managedLedgerConfig.setMinimumBacklogEntriesForCaching( - serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); - managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( - serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); - - OffloadPoliciesImpl nsLevelOffloadPolicies = - (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); - OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration( - topicLevelOffloadPolicies, - OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), - getPulsar().getConfig().getProperties()); - if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { - managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); - } else { - if (topicLevelOffloadPolicies != null) { - try { - LedgerOffloader topicLevelLedgerOffLoader = - pulsar().createManagedLedgerOffloader(offloadPolicies); - managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); - } catch (PulsarServerException e) { - throw new RuntimeException(e); - } - } else { - //If the topic level policy is null, use the namespace level - managedLedgerConfig - .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); + managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); + managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); + managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); + + managedLedgerConfig + .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); + managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled( + serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled()); + managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore( + serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore()); + managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); + managedLedgerConfig + .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), + TimeUnit.MINUTES); + managedLedgerConfig + .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), + TimeUnit.MINUTES); + managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes()); + + managedLedgerConfig.setMetadataOperationsTimeoutSeconds( + serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); + managedLedgerConfig + .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); + managedLedgerConfig + .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); + managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); + managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( + serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); + managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); + managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); + managedLedgerConfig + .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); + + managedLedgerConfig + .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); + managedLedgerConfig + .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); + managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); + managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData()); + managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); + managedLedgerConfig.setInactiveLedgerRollOverTime( + serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); + managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( + serviceConfig.isCacheEvictionByMarkDeletedPosition()); + managedLedgerConfig.setMinimumBacklogCursorsForCaching( + serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching()); + managedLedgerConfig.setMinimumBacklogEntriesForCaching( + serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); + managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( + serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); + + OffloadPoliciesImpl nsLevelOffloadPolicies = + (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration( + topicLevelOffloadPolicies, + OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), + getPulsar().getConfig().getProperties()); + if (NamespaceService.isSystemServiceNamespace(namespace.toString())) { + managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE); + } else { + if (topicLevelOffloadPolicies != null) { + try { + LedgerOffloader topicLevelLedgerOffLoader = + pulsar().createManagedLedgerOffloader(offloadPolicies); + managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); + } catch (PulsarServerException e) { + throw new RuntimeException(e); } + } else { + //If the topic level policy is null, use the namespace level + managedLedgerConfig + .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } + } - managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( - serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); - managedLedgerConfig.setNewEntriesCheckDelayInMillis( - serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); - return managedLedgerConfig; - }); + managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( + serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); + managedLedgerConfig.setNewEntriesCheckDelayInMillis( + serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); + return managedLedgerConfig; + }); + }); } private void addTopicToStatsMaps(TopicName topicName, Topic topic) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 09f8de818db0a..ed76d37ae2536 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -18,12 +18,14 @@ */ package org.apache.pulsar.broker.service; +import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -54,6 +56,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.FutureUtil; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,8 +81,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final Map>> readerCaches = new ConcurrentHashMap<>(); - @VisibleForTesting - final Map policyCacheInitMap = new ConcurrentHashMap<>(); + + final Map> policyCacheInitMap = new ConcurrentHashMap<>(); @VisibleForTesting final Map>> listeners = new ConcurrentHashMap<>(); @@ -219,12 +222,12 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException { if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) { NamespaceName namespace = topicName.getNamespaceObject(); - prepareInitPoliciesCache(namespace, new CompletableFuture<>()); + prepareInitPoliciesCacheAsync(namespace); } MutablePair result = new MutablePair<>(); policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> { - if (initialized == null || !initialized) { + if (initialized == null || !initialized.isDone()) { result.setLeft(new TopicPoliciesCacheNotInitException()); } else { TopicPolicies topicPolicies = @@ -242,6 +245,34 @@ public TopicPolicies getTopicPolicies(TopicName topicName, } } + @NotNull + @Override + public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName, + boolean isGlobal) { + requireNonNull(topicName); + final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); + return preparedFuture.thenApply(__ -> { + final TopicPolicies candidatePolicies = isGlobal + ? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())) + : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); + return Optional.ofNullable(candidatePolicies); + }); + } + + @NotNull + @Override + public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName) { + requireNonNull(topicName); + final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); + return preparedFuture.thenApply(__ -> { + final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); + if (localPolicies != null) { + return Optional.of(localPolicies); + } + return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))); + }); + } + @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); @@ -265,39 +296,48 @@ public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicNa @Override public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { - CompletableFuture result = new CompletableFuture<>(); NamespaceName namespace = namespaceBundle.getNamespaceObject(); if (NamespaceService.isHeartbeatNamespace(namespace)) { - result.complete(null); - return result; + return CompletableFuture.completedFuture(null); } synchronized (this) { if (readerCaches.get(namespace) != null) { ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); - result.complete(null); + return CompletableFuture.completedFuture(null); } else { - prepareInitPoliciesCache(namespace, result); + return prepareInitPoliciesCacheAsync(namespace); } } - return result; } - private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture result) { - if (policyCacheInitMap.putIfAbsent(namespace, false) == null) { - CompletableFuture> readerCompletableFuture = + private @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { + requireNonNull(namespace); + return policyCacheInitMap.computeIfAbsent(namespace, (k) -> { + final CompletableFuture> readerCompletableFuture = createSystemTopicClientWithRetry(namespace); readerCaches.put(namespace, readerCompletableFuture); ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); - readerCompletableFuture.thenAccept(reader -> { - initPolicesCache(reader, result); - result.thenRun(() -> readMorePolicies(reader)); - }).exceptionally(ex -> { - log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); - cleanCacheAndCloseReader(namespace, false); - result.completeExceptionally(ex); + final CompletableFuture initFuture = readerCompletableFuture + .thenCompose(reader -> { + final CompletableFuture stageFuture = new CompletableFuture<>(); + initPolicesCache(reader, stageFuture); + return stageFuture + // Read policies in background + .thenAccept(__ -> readMorePoliciesAsync(reader)); + }); + initFuture.exceptionally(ex -> { + try { + log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); + cleanCacheAndCloseReader(namespace, false); + } catch (Throwable cleanupEx) { + // Adding this catch to avoid break callback chain + log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx); + } return null; }); - } + // let caller know we've got an exception. + return initFuture; + }); } protected CompletableFuture> createSystemTopicClientWithRetry( @@ -381,8 +421,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp if (log.isDebugEnabled()) { log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName()); } - policyCacheInitMap.computeIfPresent( - reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); + // replay policy message policiesCache.forEach(((topicName, topicPolicies) -> { if (listeners.get(topicName) != null) { @@ -395,6 +434,7 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp } } })); + future.complete(null); } }); @@ -420,7 +460,13 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean }); } - private void readMorePolicies(SystemTopicClient.Reader reader) { + /** + * This is an async method for the background reader to continue syncing new messages. + * + * Note: You should not do any blocking call here. because it will affect + * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic. + */ + private void readMorePoliciesAsync(SystemTopicClient.Reader reader) { reader.readNextAsync() .thenAccept(msg -> { refreshTopicPoliciesCache(msg); @@ -428,7 +474,7 @@ private void readMorePolicies(SystemTopicClient.Reader reader) { }) .whenComplete((__, ex) -> { if (ex == null) { - readMorePolicies(reader); + readMorePoliciesAsync(reader); } else { Throwable cause = FutureUtil.unwrapCompletionException(ex); if (cause instanceof PulsarClientException.AlreadyClosedException) { @@ -437,7 +483,7 @@ private void readMorePolicies(SystemTopicClient.Reader reader) { reader.getSystemTopic().getTopicName().getNamespaceObject(), false); } else { log.warn("Read more topic polices exception, read again.", ex); - readMorePolicies(reader); + readMorePoliciesAsync(reader); } } }); @@ -605,7 +651,7 @@ boolean checkReaderIsCached(NamespaceName namespaceName) { } @VisibleForTesting - public Boolean getPoliciesCacheInit(NamespaceName namespaceName) { + public CompletableFuture getPoliciesCacheInit(NamespaceName namespaceName) { return policyCacheInitMap.get(namespaceName); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index c4bcc0c39353c..aa3a6aaeff29f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.BackoffBuilder; @@ -31,6 +32,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.FutureUtil; +import org.jetbrains.annotations.NotNull; /** * Topic policies service. @@ -109,6 +111,32 @@ default CompletableFuture> getTopicPoliciesAsyncWithRetr return response; } + /** + * Asynchronously retrieves topic policies. + * This triggers the Pulsar broker's internal client to load policies from the + * system topic `persistent://tenant/namespace/__change_event`. + * + * @param topicName The name of the topic. + * @param isGlobal Indicates if the policies are global. + * @return A CompletableFuture containing an Optional of TopicPolicies. + * @throws NullPointerException If the topicName is null. + */ + @Nonnull + CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal); + + /** + * Asynchronously retrieves topic policies. + * This triggers the Pulsar broker's internal client to load policies from the + * system topic `persistent://tenant/namespace/__change_event`. + * + * NOTE: If local policies are not available, it will fallback to using topic global policies. + * @param topicName The name of the topic. + * @return A CompletableFuture containing an Optional of TopicPolicies. + * @throws NullPointerException If the topicName is null. + */ + @Nonnull + CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName); + /** * Get policies for a topic without cache async. * @param topicName topic name @@ -162,6 +190,19 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) return null; } + @NotNull + @Override + public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName, + boolean isGlobal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @NotNull + @Override + public CompletableFuture> getTopicPoliciesAsync(@NotNull TopicName topicName) { + return CompletableFuture.completedFuture(Optional.empty()); + } + @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index a4f6bd4650f7c..25ad6cab94272 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -127,6 +127,7 @@ public void initPersistentTopics() throws Exception { @Override @BeforeMethod protected void setup() throws Exception { + conf.setTopicLevelPoliciesEnabled(false); super.internalSetup(); persistentTopics = spy(PersistentTopics.class); persistentTopics.setServletContext(new MockServletContext()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 87471f4972f8d..faf141a5d1cf4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -180,7 +180,7 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic)); //make sure namespace policy reader is fully started. Awaitility.await().untilAsserted(()-> { - assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject())); + assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone()); }); //load the topic. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java new file mode 100644 index 0000000000000..672fc2c95f890 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesWithBrokerRestartTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Test(groups = "broker-admin") +public class TopicPoliciesWithBrokerRestartTest extends MockedPulsarServiceBaseTest { + + @Override + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + super.internalSetup(); + setupDefaultTenantAndNamespace(); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testRetentionWithBrokerRestart() throws Exception { + final int messages = 1_000; + final int topicNum = 500; + // (1) Init topic + admin.namespaces().createNamespace("public/retention"); + final String topicName = "persistent://public/retention/retention_with_broker_restart"; + admin.topics().createNonPartitionedTopic(topicName); + for (int i = 0; i < topicNum; i++) { + final String shadowTopicNames = topicName + "_" + i; + admin.topics().createNonPartitionedTopic(shadowTopicNames); + } + // (2) Set retention + final RetentionPolicies retentionPolicies = new RetentionPolicies(20, 20); + for (int i = 0; i < topicNum; i++) { + final String shadowTopicNames = topicName + "_" + i; + admin.topicPolicies().setRetention(shadowTopicNames, retentionPolicies); + } + admin.topicPolicies().setRetention(topicName, retentionPolicies); + // (3) Send messages + @Cleanup + final Producer publisher = pulsarClient.newProducer() + .topic(topicName) + .create(); + for (int i = 0; i < messages; i++) { + publisher.send((i + "").getBytes(StandardCharsets.UTF_8)); + } + // (4) Check configuration + Awaitility.await().untilAsserted(() -> { + final PersistentTopic persistentTopic1 = (PersistentTopic) + pulsar.getBrokerService().getTopic(topicName, true).join().get(); + final ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger(); + Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), 20); + Assert.assertEquals(managedLedger1.getConfig().getRetentionTimeMillis(), + TimeUnit.MINUTES.toMillis(20)); + }); + // (5) Restart broker + restartBroker(); + // (6) Check configuration again + for (int i = 0; i < topicNum; i++) { + final String shadowTopicNames = topicName + "_" + i; + admin.lookups().lookupTopic(shadowTopicNames); + final PersistentTopic persistentTopicTmp = (PersistentTopic) + pulsar.getBrokerService().getTopic(shadowTopicNames, true).join().get(); + final ManagedLedgerImpl managedLedgerTemp = (ManagedLedgerImpl) persistentTopicTmp.getManagedLedger(); + Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionSizeInMB(), 20); + Assert.assertEquals(managedLedgerTemp.getConfig().getRetentionTimeMillis(), + TimeUnit.MINUTES.toMillis(20)); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java index efd8b66d754ac..234af7afa8d09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java @@ -84,6 +84,8 @@ protected void setup() throws Exception { Set providers = new HashSet<>(); providers.add(AuthenticationProviderToken.class.getName()); conf.setAuthenticationProviders(providers); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN); super.internalSetup(); PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java index 6ffcecbeb9f8b..942a42fa7aaa1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthLogsTest.java @@ -60,6 +60,8 @@ public void setup() throws Exception { conf.setAuthorizationEnabled(true); conf.setAuthorizationAllowWildcardsMatching(true); conf.setSuperUserRoles(Sets.newHashSet("super")); + conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName()); + conf.setBrokerClientAuthenticationParameters("user:pass.pass"); internalSetup(); try (PulsarAdmin admin = PulsarAdmin.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java index 0b1726617f71f..25ac59796b02c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java @@ -29,7 +29,10 @@ public class MockAuthentication implements Authentication { private static final Logger log = LoggerFactory.getLogger(MockAuthentication.class); - private final String user; + private String user; + + public MockAuthentication() { + } public MockAuthentication(String user) { this.user = user; @@ -67,6 +70,7 @@ public String getCommandData() { @Override public void configure(Map authParams) { + this.user = authParams.get("user"); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 5803165c1ac6a..2c8ef373f0950 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -42,6 +42,7 @@ import lombok.Data; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -53,6 +54,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -235,6 +238,22 @@ protected void doInitConf() throws Exception { protected final void init() throws Exception { doInitConf(); + // trying to config the broker internal client + if (conf.getWebServicePortTls().isPresent() + && conf.getAuthenticationProviders().contains(AuthenticationProviderTls.class.getName()) + && !conf.isTlsEnabledWithKeyStore()) { + // enabled TLS + if (conf.getBrokerClientAuthenticationPlugin() == null + || conf.getBrokerClientAuthenticationPlugin().equals(AuthenticationDisabled.class.getName())) { + conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); + conf.setBrokerClientAuthenticationParameters("tlsCertFile:" + BROKER_CERT_FILE_PATH + + ",tlsKeyFile:" + BROKER_KEY_FILE_PATH); + conf.setBrokerClientTlsEnabled(true); + conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); + conf.setBrokerClientCertificateFilePath(BROKER_CERT_FILE_PATH); + conf.setBrokerClientKeyFilePath(BROKER_KEY_FILE_PATH); + } + } startBroker(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 951892f4ebfbc..5252407892eea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -304,6 +304,7 @@ public void testSetRackInfoAndAffinityGroupDuringProduce() throws Exception { bookies[3].getBookieId()); ServiceConfiguration config = new ServiceConfiguration(); + config.setTopicLevelPoliciesEnabled(false); config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); config.setClusterName(cluster); config.setWebServicePort(Optional.of(0)); @@ -612,9 +613,9 @@ public void testBookieIsolationWithSecondaryGroup() throws Exception { config.setBrokerShutdownTimeoutMs(0L); config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); config.setBrokerServicePort(Optional.of(0)); + config.setTopicLevelPoliciesEnabled(false); config.setAdvertisedAddress("localhost"); config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); - config.setManagedLedgerDefaultEnsembleSize(2); config.setManagedLedgerDefaultWriteQuorum(2); config.setManagedLedgerDefaultAckQuorum(2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 31b5bcb23cd98..5b70ff996756e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -141,7 +141,7 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top // Wait for all topic policies updated. Awaitility.await().untilAsserted(() -> Assert.assertTrue(systemTopicBasedTopicPoliciesService - .getPoliciesCacheInit(TOPIC1.getNamespaceObject()))); + .getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone())); // Assert broker is cache all topic policies Awaitility.await().untilAsserted(() -> @@ -304,8 +304,8 @@ private void prepareData() throws PulsarAdminException { @Test public void testGetPolicyTimeout() throws Exception { SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); - Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()))); - service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false); + Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone())); + service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new CompletableFuture<>()); long start = System.currentTimeMillis(); Backoff backoff = new BackoffBuilder() .setInitialTime(500, TimeUnit.MILLISECONDS) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index e29d015c45db7..9995b6a28a903 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -620,7 +621,7 @@ public void testCheckPersistencePolicies() throws Exception { doReturn(policiesService).when(pulsar).getTopicPoliciesService(); TopicPolicies policies = new TopicPolicies(); policies.setRetentionPolicies(retentionPolicies); - doReturn(policies).when(policiesService).getTopicPoliciesIfExists(TopicName.get(topic)); + doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(policiesService).getTopicPoliciesAsync(TopicName.get(topic)); persistentTopic.onUpdate(policies); verify(persistentTopic, times(1)).checkPersistencePolicies(); Awaitility.await().untilAsserted(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java index e3bd321d76332..042c9b328d58b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.tls.PublicSuffixMatcher; import org.apache.pulsar.common.tls.TlsHostnameVerifier; +import org.assertj.core.util.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -141,6 +142,7 @@ public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean hostname // setup broker cert which has CN = "pulsar" different than broker's hostname="localhost" conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); + conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName())); conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH); @@ -182,6 +184,7 @@ public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws Exception { // setup broker cert which has CN = "localhost" conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); + conf.setAuthenticationProviders(Sets.newTreeSet(AuthenticationProviderTls.class.getName())); conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH); conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index bdbe8efc8e6ef..9a36e0683b422 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -119,6 +119,7 @@ protected void cleanup() throws Exception { public void testProducerAndConsumerAuthorization() throws Exception { log.info("-- Starting {} test --", methodName); cleanup(); + conf.setTopicLevelPoliciesEnabled(false); conf.setAuthorizationProvider(TestAuthorizationProvider.class.getName()); setup(); @@ -179,6 +180,7 @@ public void testProducerAndConsumerAuthorization() throws Exception { public void testSubscriberPermission() throws Exception { log.info("-- Starting {} test --", methodName); cleanup(); + conf.setTopicLevelPoliciesEnabled(false); conf.setEnablePackagesManagement(true); conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName()); conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); @@ -369,6 +371,7 @@ public void testSubscriberPermission() throws Exception { public void testClearBacklogPermission() throws Exception { log.info("-- Starting {} test --", methodName); cleanup(); + conf.setTopicLevelPoliciesEnabled(false); conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); setup(); @@ -610,6 +613,7 @@ public void testUpdateTopicPropertiesAuthorization() throws Exception { public void testSubscriptionPrefixAuthorization() throws Exception { log.info("-- Starting {} test --", methodName); cleanup(); + conf.setTopicLevelPoliciesEnabled(false); conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName()); setup(); @@ -749,6 +753,7 @@ public void testAuthData() throws Exception { public void testPermissionForProducerCreateInitialSubscription() throws Exception { log.info("-- Starting {} test --", methodName); cleanup(); + conf.setTopicLevelPoliciesEnabled(false); conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); setup(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java index 2fc8aebf64a4a..81d65b192049b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java @@ -195,7 +195,7 @@ protected void setup() throws Exception { Set superUserRoles = new HashSet<>(); superUserRoles.add("admin"); conf.setSuperUserRoles(superUserRoles); - + conf.setTopicLevelPoliciesEnabled(false); conf.setAuthorizationEnabled(true); conf.setAuthenticationEnabled(true); Set providersClassNames = Sets.newHashSet(MutualAuthenticationProvider.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java index 87f12e6acdcb2..4d5e7deaf7d99 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.slf4j.Logger; @@ -92,6 +93,8 @@ protected void setup() throws Exception { Set providers = new HashSet<>(); providers.add(AuthenticationProviderToken.class.getName()); conf.setAuthenticationProviders(providers); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN); conf.setClusterName("test"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java index fdf41c4a6ada1..ba43ee6d6a2dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java @@ -27,7 +27,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -39,6 +41,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,11 +90,12 @@ protected void setup() throws Exception { conf.setAuthenticationProviders(providers); conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName()); - conf.setBrokerClientAuthenticationParameters("{\n" - + " \"privateKey\": \"" + CREDENTIALS_FILE + "\",\n" - + " \"issuerUrl\": \"" + server.getIssuer() + "\",\n" - + " \"audience\": \"" + audience + "\",\n" - + "}\n"); + final Map oauth2Param = new HashMap<>(); + oauth2Param.put("privateKey", CREDENTIALS_FILE); + oauth2Param.put("issuerUrl", server.getIssuer()); + oauth2Param.put("audience", audience); + conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory + .getMapper().getObjectMapper().writeValueAsString(oauth2Param)); conf.setClusterName("test"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java index 8e508b6cf2068..77405e142013a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeyStoreTlsProducerConsumerTestWithAuthTest.java @@ -32,6 +32,7 @@ import io.jsonwebtoken.SignatureAlgorithm; import lombok.Cleanup; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; @@ -49,6 +50,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -83,6 +85,7 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @SneakyThrows protected void internalSetUpForBroker() { conf.setBrokerServicePortTls(Optional.of(0)); conf.setWebServicePortTls(Optional.of(0)); @@ -114,6 +117,25 @@ protected void internalSetUpForBroker() { conf.setAuthenticationProviders(providers); conf.setNumExecutorThreadPoolSize(5); + Set tlsProtocols = Sets.newConcurrentHashSet(); + tlsProtocols.add("TLSv1.3"); + tlsProtocols.add("TLSv1.2"); + conf.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName()); + Map authParams = new HashMap<>(); + authParams.put(AuthenticationKeyStoreTls.KEYSTORE_TYPE, KEYSTORE_TYPE); + authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PATH, CLIENT_KEYSTORE_FILE_PATH); + authParams.put(AuthenticationKeyStoreTls.KEYSTORE_PW, CLIENT_KEYSTORE_PW); + conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory.getMapper() + .getObjectMapper().writeValueAsString(authParams)); + conf.setBrokerClientTlsEnabled(true); + conf.setBrokerClientTlsEnabledWithKeyStore(true); + conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH); + conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW); + conf.setBrokerClientTlsKeyStore(CLIENT_KEYSTORE_FILE_PATH); + conf.setBrokerClientTlsKeyStoreType(KEYSTORE_TYPE); + conf.setBrokerClientTlsKeyStorePassword(CLIENT_KEYSTORE_PW); + conf.setBrokerClientTlsProtocols(tlsProtocols); + } protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java index 76936334eb0ba..b9139dabdf021 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplAuthTest.java @@ -85,6 +85,7 @@ public void setup() throws Exception { // set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern. isTcpLookup = true; + conf.setTopicLevelPoliciesEnabled(false); conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); diff --git a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf index d9411e655ad5b..4e2fd40298354 100644 --- a/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf +++ b/pulsar-broker/src/test/resources/configurations/standalone_no_client_auth.conf @@ -29,4 +29,5 @@ authenticationEnabled=true authenticationProviders=org.apache.pulsar.MockTokenAuthenticationProvider brokerClientAuthenticationPlugin= brokerClientAuthenticationParameters= -loadBalancerOverrideBrokerNicSpeedGbps=2 \ No newline at end of file +loadBalancerOverrideBrokerNicSpeedGbps=2 +topicLevelPoliciesEnabled=false \ No newline at end of file diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 8229d929ee5e3..9c8e5197adf1a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -168,7 +168,7 @@ protected void setup() throws Exception { conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); // Expires after an hour conf.setBrokerClientAuthenticationParameters( - "entityType:broker,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000)); + "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 3600 * 1000)); Set superUserRoles = new HashSet<>(); superUserRoles.add("admin"); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 99af3b1cf6abe..b7cfb87474707 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -53,7 +53,7 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); - conf.setBrokerClientAuthenticationParameters("authParam:broker"); + conf.setBrokerClientAuthenticationParameters("authParam:admin"); conf.setAuthenticateOriginalAuthData(true); Set superUserRoles = new HashSet(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 2c8c382b6a5ef..3259cfd95c741 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -144,7 +144,7 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); - conf.setBrokerClientAuthenticationParameters("authParam:broker"); + conf.setBrokerClientAuthenticationParameters("authParam:admin"); Set superUserRoles = new HashSet<>(); superUserRoles.add("admin"); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index e8bb128c8c190..2d97a4b06a856 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -78,6 +78,8 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { protected void setup() throws Exception { // enable tls and auth&auth at broker + conf.setTopicLevelPoliciesEnabled(false); + conf.setAuthenticationEnabled(true); conf.setAuthorizationEnabled(true); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java index 9119ffed4e28f..7b550b7270f37 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -63,6 +64,9 @@ public void setup() throws Exception { conf.setProperties(properties); conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_ROLE)); conf.setClusterName("c1"); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + AuthTokenUtils + .createToken(secretKey, SUPER_USER_ROLE, Optional.empty())); internalSetup(); admin.clusters().createCluster("c1", ClusterData.builder().build()); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 9f6c5e58a2111..035d88c7be0e0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -91,6 +91,7 @@ public void setup() throws Exception { "org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder"); brokerEnvs.put("forceDeleteNamespaceAllowed", "true"); brokerEnvs.put("loadBalancerDebugModeEnabled", "true"); + brokerEnvs.put("topicLevelPoliciesEnabled", "false"); brokerEnvs.put("PULSAR_MEM", "-Xmx512M"); spec.brokerEnvs(brokerEnvs); pulsarCluster = PulsarCluster.forSpec(spec); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java index 0a9bb5e19592a..87db46f2bb625 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java @@ -68,6 +68,7 @@ protected void beforeStartCluster() { envMap.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); envMap.put("superUserRoles", "admin"); envMap.put("brokerDeleteInactiveTopicsEnabled", "false"); + envMap.put("topicLevelPoliciesEnabled", "false"); for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) { brokerContainer.withEnv(envMap);