Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] fix permission hole for subscription. #49

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,10 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
}
} else {
if (isNotBlank(subscription)) {
// validate if role is authorized to access subscription. (skip validation if authorization
// list is empty)
// validate if role is authorized to access subscription.
Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
if (roles == null || roles.isEmpty() || !roles.contains(role)) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
return CompletableFuture.completedFuture(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public void testGetPartitionedStatsAndInternalStats() {
@SneakyThrows
public void testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog() {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand All @@ -322,7 +323,9 @@ public void testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubsc
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (action == AuthAction.consume) {
subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest);
String subscriptionName = "test-sub" + suffix.incrementAndGet();
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Sets.newHashSet(subject));
subAdmin.topics().createSubscription(topic, subscriptionName, MessageId.earliest);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest));
Expand Down Expand Up @@ -361,8 +364,10 @@ public void testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubsc
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (action == AuthAction.consume) {
subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties);
subAdmin.topics().getSubscriptionProperties(topic, "test-sub");
String subscriptionName = "test-sub";
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Sets.newHashSet(subject));
subAdmin.topics().updateSubscriptionProperties(topic, subscriptionName, properties);
subAdmin.topics().getSubscriptionProperties(topic, subscriptionName);
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty());
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand Down Expand Up @@ -706,6 +711,7 @@ public void testGetInternalStats(boolean partitioned) {
@SneakyThrows
public void testDeleteSubscription(boolean partitioned) {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand Down Expand Up @@ -734,6 +740,8 @@ public void testDeleteSubscription(boolean partitioned) {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
String subscriptionName = "test-sub";
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Sets.newHashSet(subject));
subAdmin.topics().deleteSubscription(topic, "test-sub");
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand All @@ -748,6 +756,7 @@ public void testDeleteSubscription(boolean partitioned) {
@SneakyThrows
public void testSkipAllMessage(boolean partitioned) {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand All @@ -773,6 +782,7 @@ public void testSkipAllMessage(boolean partitioned) {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subName, Sets.newHashSet(subject));
subAdmin.topics().skipAllMessages(topic,subName);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand All @@ -787,6 +797,7 @@ public void testSkipAllMessage(boolean partitioned) {
@SneakyThrows
public void testSkipMessage() {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand All @@ -811,6 +822,7 @@ public void testSkipMessage() {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subName, Sets.newHashSet(subject));
subAdmin.topics().skipMessages(topic, subName, 1);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand Down Expand Up @@ -861,6 +873,7 @@ public void testExpireMessagesForAllSubscriptions(boolean partitioned) {
@SneakyThrows
public void testResetCursor(boolean partitioned) {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand All @@ -885,6 +898,7 @@ public void testResetCursor(boolean partitioned) {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subName, Sets.newHashSet(subject));
subAdmin.topics().resetCursor(topic, subName, System.currentTimeMillis());
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand All @@ -899,6 +913,7 @@ public void testResetCursor(boolean partitioned) {
@SneakyThrows
public void testResetCursorOnPosition() {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand All @@ -923,6 +938,7 @@ public void testResetCursorOnPosition() {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subName, Sets.newHashSet(subject));
subAdmin.topics().resetCursor(topic, subName, MessageId.latest);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand Down Expand Up @@ -983,6 +999,7 @@ public void testGetMessageById() {
@SneakyThrows
public void testPeekNthMessage() {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand Down Expand Up @@ -1015,6 +1032,7 @@ public void testPeekNthMessage() {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subName, Sets.newHashSet(subject));
subAdmin.topics().peekMessages(topic, subName, 1);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand Down Expand Up @@ -1075,6 +1093,7 @@ public void testExamineMessage() {
@SneakyThrows
public void testExpireMessage() {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
Expand Down Expand Up @@ -1112,6 +1131,7 @@ public void testExpireMessage() {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subName, Sets.newHashSet(subject));
subAdmin.topics().expireMessages(topic, subName, 1);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand All @@ -1127,6 +1147,7 @@ public void testExpireMessage() {
public void testExpireMessageByPosition() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String namespace = "public/default";
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
Expand Down Expand Up @@ -1163,6 +1184,7 @@ public void testExpireMessageByPosition() {
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (AuthAction.consume == action) {
superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subName, Sets.newHashSet(subject));
subAdmin.topics().expireMessages(topic, subName, MessageId.earliest, false);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,19 @@ public void simple() throws Exception {
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null));
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null));
try {
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub1", Sets.newHashSet("role1"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1"));
fail();
} catch (Exception ignored) {}
try {
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub2", Sets.newHashSet("role2"));
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2"));
fail();
} catch (Exception ignored) {}

// though we use prefix subscription mode, we still require that the role own the permission.
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "role1-sub1", Sets.newHashSet("role1"));
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "role2-sub2", Sets.newHashSet("role2"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1"));
Expand Down
Loading
Loading