diff --git a/controller/src/main/java/io/pravega/controller/server/AuthResourceRepresentation.java b/controller/src/main/java/io/pravega/controller/server/AuthResourceRepresentation.java
new file mode 100644
index 00000000000..d96f3d6a0e4
--- /dev/null
+++ b/controller/src/main/java/io/pravega/controller/server/AuthResourceRepresentation.java
@@ -0,0 +1,110 @@
+/**
+ * Copyright (c) 2019 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.controller.server;
+
+import io.pravega.common.Exceptions;
+
+/**
+ * A utility class with methods for preparing string representations of auth-protected resources.
+ *
+ * Background:
+ *
+ * In general, authorization is about granting a subject access to perform a particular action
+ * on an object/resource.
+ *
+ * In Pravega,
+ *
+ * - A subject is represented as an instance of type {@link java.security.Principal}.
+ * - An action is represented as an element of enum type {@link io.pravega.auth.AuthHandler.Permissions}.
+ * - An object is represented by an instance of this class.
+ *
+ */
+public final class AuthResourceRepresentation {
+
+ /**
+ * Creates a resource representation for use in authorization of actions pertaining to the collection of scopes
+ * in the system.
+ *
+ * @return a string representing the collections of scopes in the system
+ */
+ public static String ofScopes() {
+ return "/";
+ }
+
+ /**
+ * Creates a resource representation for use in authorization of actions pertaining to the specified scope.
+ *
+ * @param scopeName the name of the scope
+ * @return a string representing the scope with the specified name
+ * @throws NullPointerException if {@code scopeName} is null
+ * @throws IllegalArgumentException if {@code scopeName} is empty
+ */
+ public static String ofScope(String scopeName) {
+ Exceptions.checkNotNullOrEmpty(scopeName, "scopeName");
+ return scopeName;
+ }
+
+ /**
+ * Creates a resource representation for use in authorization of actions pertaining to the collection of streams
+ * within the specified scope.
+ *
+ * @param scopeName the name of the scope
+ * @return a string representing the collection of streams under the scope with the specified name
+ * @throws NullPointerException if {@code scopeName} is null
+ * @throws IllegalArgumentException if {@code scopeName} is empty
+ */
+ public static String ofStreamsInScope(String scopeName) {
+ return Exceptions.checkNotNullOrEmpty(scopeName, "scopeName");
+ }
+
+ /**
+ * Creates a resource representation for use in authorization of actions pertaining to the specified stream within
+ * the specified scope.
+ *
+ * @param scopeName the name of the scope
+ * @param streamName the name of the stream
+ * @return a string representing the specified stream within the specified scope
+ * @throws NullPointerException if {@code scopeName} or {@code streamName} are null
+ * @throws IllegalArgumentException if {@code scopeName} or {@code streamName} are empty
+ */
+ public static String ofStreamInScope(String scopeName, String streamName) {
+ Exceptions.checkNotNullOrEmpty(streamName, "streamName");
+ return String.format("%s/%s", ofStreamsInScope(scopeName), streamName);
+ }
+
+ /**
+ * Creates a resource representation for use in authorization of actions pertaining to the collection of reader
+ * groups within the specified scope.
+ *
+ * @param scopeName the name of the scope
+ * @return a string representing the specified the collection of reader groups
+ * @throws NullPointerException if {@code scopeName} is null
+ * @throws IllegalArgumentException if {@code scopeName} is empty
+ */
+ public static String ofReaderGroupsInScope(String scopeName) {
+ Exceptions.checkNotNullOrEmpty(scopeName, "scopeName");
+ return scopeName;
+ }
+
+ /**
+ * Creates a resource representation for use in authorization of actions pertaining to the specified reader group
+ * within the specified scope.
+ *
+ * @param scopeName the name of the scope
+ * @param readerGroupName the name of the reader group
+ * @return a string representing the specified reader group
+ * @throws NullPointerException if {@code scopeName} or {@code streamName} are null
+ * @throws IllegalArgumentException if {@code scopeName} or {@code streamName} are empty
+ */
+ public static String ofReaderGroupInScope(String scopeName, String readerGroupName) {
+ Exceptions.checkNotNullOrEmpty(readerGroupName, "readerGroupName");
+ return String.format("%s/%s", ofReaderGroupsInScope(scopeName), readerGroupName);
+ }
+}
\ No newline at end of file
diff --git a/controller/src/main/java/io/pravega/controller/server/rest/resources/StreamMetadataResourceImpl.java b/controller/src/main/java/io/pravega/controller/server/rest/resources/StreamMetadataResourceImpl.java
index b272b3d3cf5..48ecbb9d3cc 100644
--- a/controller/src/main/java/io/pravega/controller/server/rest/resources/StreamMetadataResourceImpl.java
+++ b/controller/src/main/java/io/pravega/controller/server/rest/resources/StreamMetadataResourceImpl.java
@@ -9,11 +9,7 @@
*/
package io.pravega.controller.server.rest.resources;
-import com.google.common.base.Preconditions;
import io.pravega.auth.AuthException;
-import io.pravega.auth.AuthHandler;
-import io.pravega.auth.AuthenticationException;
-import io.pravega.auth.AuthorizationException;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.netty.impl.ConnectionFactory;
@@ -22,6 +18,7 @@
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import io.pravega.common.LoggerHelpers;
+import io.pravega.controller.server.AuthResourceRepresentation;
import io.pravega.controller.server.ControllerService;
import io.pravega.controller.server.eventProcessor.LocalController;
import io.pravega.controller.server.rest.ModelHelper;
@@ -37,6 +34,7 @@
import io.pravega.controller.server.rest.generated.model.UpdateStreamRequest;
import io.pravega.controller.server.rest.v1.ApiV1;
import io.pravega.controller.server.rpc.auth.PravegaAuthManager;
+import io.pravega.controller.server.rpc.auth.RESTAuthHelper;
import io.pravega.controller.store.stream.ScaleMetadata;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.stream.api.grpc.v1.Controller.CreateScopeStatus;
@@ -73,14 +71,14 @@ public class StreamMetadataResourceImpl implements ApiV1.ScopesApi {
HttpHeaders headers;
private final ControllerService controllerService;
- private final PravegaAuthManager pravegaAuthManager;
+ private final RESTAuthHelper restAuthHelper;
private final LocalController localController;
private final ConnectionFactory connectionFactory;
public StreamMetadataResourceImpl(LocalController localController, ControllerService controllerService, PravegaAuthManager pravegaAuthManager, ConnectionFactory connectionFactory) {
this.localController = localController;
this.controllerService = controllerService;
- this.pravegaAuthManager = pravegaAuthManager;
+ this.restAuthHelper = new RESTAuthHelper(pravegaAuthManager);
this.connectionFactory = connectionFactory;
}
@@ -105,7 +103,8 @@ public void createScope(final CreateScopeRequest createScopeRequest, final Secur
}
try {
- authenticateAuthorize(createScopeRequest.getScopeName(), READ_UPDATE);
+ restAuthHelper.authenticateAuthorize(getAuthorizationHeader(),
+ AuthResourceRepresentation.ofScopes(), READ_UPDATE);
} catch (AuthException e) {
log.warn("Create scope for {} failed due to authentication failure {}.", createScopeRequest.getScopeName(), e);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -132,54 +131,15 @@ public void createScope(final CreateScopeRequest createScopeRequest, final Secur
.thenAccept(x -> LoggerHelpers.traceLeave(log, "createScope", traceId));
}
- private void authenticateAuthorize(String resourceName, AuthHandler.Permissions level) throws AuthException {
- if (pravegaAuthManager != null ) {
- List authParams = headers.getRequestHeader(HttpHeaders.AUTHORIZATION);
- if (authParams == null || authParams.isEmpty()) {
- throw new AuthenticationException("Auth failed for " + resourceName);
- }
-
- String credentials = authParams.get(0);
- Preconditions.checkNotNull(credentials, "Credentials not specified in the parameters.");
-
- if (!pravegaAuthManager.authenticateAndAuthorize(resourceName, credentials, level)) {
- throw new AuthorizationException("Auth failed for " + resourceName, Status.FORBIDDEN.getStatusCode());
- }
- }
- }
-
- private Principal authenticate() throws AuthException {
- if (pravegaAuthManager != null ) {
- List authParams = headers.getRequestHeader(HttpHeaders.AUTHORIZATION);
-
- if (authParams == null || authParams.isEmpty()) {
- throw new AuthenticationException("Auth failed.");
- }
-
- String credentials = authParams.get(0);
- Preconditions.checkNotNull(credentials, "Credentials not specified in the parameters.");
-
- return pravegaAuthManager.authenticate(credentials);
- }
- return null;
- }
-
- private void authorize(String resourceName, Principal principal, AuthHandler.Permissions level) throws AuthException {
- if (pravegaAuthManager != null ) {
- List authParams = headers.getRequestHeader(HttpHeaders.AUTHORIZATION);
-
- if (authParams == null || authParams.isEmpty()) {
- throw new AuthenticationException("Auth failed for " + resourceName);
- }
-
- String credentials = authParams.get(0);
- Preconditions.checkNotNull(credentials, "Credentials not specified in the parameters.");
-
- if (!pravegaAuthManager.authorize(resourceName, principal, credentials, level)) {
- throw new AuthorizationException("Auth failed for " + resourceName, Status.FORBIDDEN.getStatusCode());
- }
- }
+ /**
+ * This is a shortcut for {@code headers.getRequestHeader().get(HttpHeaders.AUTHORIZATION)}.
+ *
+ * @return a list of read-only values of the HTTP Authorization header
+ * @throws IllegalStateException if called outside the scope of the HTTP request
+ */
+ private List getAuthorizationHeader() {
+ return headers.getRequestHeader(HttpHeaders.AUTHORIZATION);
}
/**
@@ -206,7 +166,8 @@ public void createStream(final String scopeName, final CreateStreamRequest creat
}
try {
- authenticateAuthorize(scopeName + "/" + streamName, READ_UPDATE);
+ restAuthHelper.authenticateAuthorize(getAuthorizationHeader(),
+ AuthResourceRepresentation.ofStreamsInScope(scopeName), READ_UPDATE);
} catch (AuthException e) {
log.warn("Create stream for {} failed due to authentication failure.", streamName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -256,7 +217,8 @@ public void deleteScope(final String scopeName, final SecurityContext securityCo
long traceId = LoggerHelpers.traceEnter(log, "deleteScope");
try {
- authenticateAuthorize(scopeName, READ_UPDATE);
+ restAuthHelper.authenticateAuthorize(getAuthorizationHeader(),
+ AuthResourceRepresentation.ofScopes(), READ_UPDATE);
} catch (AuthException e) {
log.warn("Delete scope for {} failed due to authentication failure.", scopeName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -299,7 +261,10 @@ public void deleteStream(final String scopeName, final String streamName, final
long traceId = LoggerHelpers.traceEnter(log, "deleteStream");
try {
- authenticateAuthorize(scopeName + "/" + streamName, READ_UPDATE);
+ restAuthHelper.authenticateAuthorize(
+ getAuthorizationHeader(),
+ AuthResourceRepresentation.ofStreamInScope(scopeName, streamName),
+ READ_UPDATE);
} catch (AuthException e) {
log.warn("Delete stream for {} failed due to authentication failure.", streamName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -334,7 +299,8 @@ public void getReaderGroup(final String scopeName, final String readerGroupName,
long traceId = LoggerHelpers.traceEnter(log, "getReaderGroup");
try {
- authenticateAuthorize(scopeName + "/" + readerGroupName, READ);
+ restAuthHelper.authenticateAuthorize(
+ getAuthorizationHeader(), AuthResourceRepresentation.ofReaderGroupInScope(scopeName, readerGroupName), READ);
} catch (AuthException e) {
log.warn("Get reader group for {} failed due to authentication failure.", scopeName + "/" + readerGroupName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -383,7 +349,9 @@ public void getScope(final String scopeName, final SecurityContext securityConte
long traceId = LoggerHelpers.traceEnter(log, "getScope");
try {
- authenticateAuthorize(scopeName, READ);
+ restAuthHelper.authenticateAuthorize(
+ getAuthorizationHeader(),
+ AuthResourceRepresentation.ofScope(scopeName), READ);
} catch (AuthException e) {
log.warn("Get scope for {} failed due to authentication failure.", scopeName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -421,7 +389,8 @@ public void getStream(final String scopeName, final String streamName, final Sec
long traceId = LoggerHelpers.traceEnter(log, "getStream");
try {
- authenticateAuthorize(scopeName + "/" + streamName, READ);
+ restAuthHelper.authenticateAuthorize(getAuthorizationHeader(),
+ AuthResourceRepresentation.ofStreamInScope(scopeName, streamName), READ);
} catch (AuthException e) {
log.warn("Get stream for {} failed due to authentication failure.", scopeName + "/" + streamName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -452,7 +421,8 @@ public void listReaderGroups(final String scopeName, final SecurityContext secur
long traceId = LoggerHelpers.traceEnter(log, "listReaderGroups");
try {
- authenticateAuthorize(scopeName, READ);
+ restAuthHelper.authenticateAuthorize(getAuthorizationHeader(),
+ AuthResourceRepresentation.ofReaderGroupsInScope(scopeName), READ);
} catch (AuthException e) {
log.warn("Get reader groups for {} failed due to authentication failure.", scopeName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -497,10 +467,12 @@ public void listReaderGroups(final String scopeName, final SecurityContext secur
public void listScopes(final SecurityContext securityContext, final AsyncResponse asyncResponse) {
long traceId = LoggerHelpers.traceEnter(log, "listScopes");
- Principal principal;
+ final Principal principal;
+ final List authHeader = getAuthorizationHeader();
+
try {
- principal = authenticate();
- authorize("/", principal, READ);
+ principal = restAuthHelper.authenticate(authHeader);
+ restAuthHelper.authorize(authHeader, AuthResourceRepresentation.ofScopes(), principal, READ);
} catch (AuthException e) {
log.warn("Get scopes failed due to authentication failure.", e);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -512,7 +484,18 @@ public void listScopes(final SecurityContext securityContext, final AsyncRespons
.thenApply(scopesList -> {
ScopesList scopes = new ScopesList();
scopesList.forEach(scope -> {
- scopes.addScopesItem(new ScopeProperty().scopeName(scope));
+ try {
+ if (restAuthHelper.isAuthorized(authHeader,
+ AuthResourceRepresentation.ofScope(scope),
+ principal, READ)) {
+ scopes.addScopesItem(new ScopeProperty().scopeName(scope));
+ }
+ } catch (AuthException e) {
+ log.warn(e.getMessage(), e);
+ // Ignore. This exception occurs under abnormal circumstances and not to determine
+ // whether the user is authorized. In case it does occur, we assume that the user
+ // is unauthorized.
+ }
});
return Response.status(Status.OK).entity(scopes).build(); })
.exceptionally(exception -> {
@@ -537,8 +520,13 @@ public void listStreams(final String scopeName, final String showInternalStreams
final SecurityContext securityContext, final AsyncResponse asyncResponse) {
long traceId = LoggerHelpers.traceEnter(log, "listStreams");
+ final Principal principal;
+ final List authHeader = getAuthorizationHeader();
+
try {
- authenticateAuthorize(scopeName, READ);
+ principal = restAuthHelper.authenticate(authHeader);
+ restAuthHelper.authorize(authHeader,
+ AuthResourceRepresentation.ofStreamsInScope(scopeName), principal, READ);
} catch (AuthException e) {
log.warn("List streams for {} failed due to authentication failure.", scopeName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -550,10 +538,22 @@ public void listStreams(final String scopeName, final String showInternalStreams
.thenApply(streamsList -> {
StreamsList streams = new StreamsList();
streamsList.forEach((stream, config) -> {
- // If internal streams are requested select only the ones that have the special stream names
- // otherwise display the regular user created streams.
- if (!showOnlyInternalStreams ^ stream.startsWith(INTERNAL_NAME_PREFIX)) {
- streams.addStreamsItem(ModelHelper.encodeStreamResponse(scopeName, stream, config));
+
+ try {
+ if (restAuthHelper.isAuthorized(getAuthorizationHeader(),
+ AuthResourceRepresentation.ofStreamInScope(scopeName, stream),
+ principal, READ)) {
+ // If internal streams are requested select only the ones that have the special stream names
+ // otherwise display the regular user created streams.
+ if (!showOnlyInternalStreams ^ stream.startsWith(INTERNAL_NAME_PREFIX)) {
+ streams.addStreamsItem(ModelHelper.encodeStreamResponse(scopeName, stream, config));
+ }
+ }
+ } catch (AuthException e) {
+ log.warn(e.getMessage(), e);
+ // Ignore. This exception occurs under abnormal circumstances and not to determine
+ // whether the user is authorized. In case it does occur, we assume that the user
+ // is unauthorized.
}
});
log.info("Successfully fetched streams for scope: {}", scopeName);
@@ -587,7 +587,9 @@ public void updateStream(final String scopeName, final String streamName,
long traceId = LoggerHelpers.traceEnter(log, "updateStream");
try {
- authenticateAuthorize(scopeName + "/" + streamName, READ_UPDATE);
+ restAuthHelper.authenticateAuthorize(getAuthorizationHeader(),
+ AuthResourceRepresentation.ofStreamInScope(scopeName, streamName),
+ READ_UPDATE);
} catch (AuthException e) {
log.warn("Update stream for {} failed due to authentication failure.", scopeName + "/" + streamName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -632,7 +634,9 @@ public void updateStreamState(final String scopeName, final String streamName,
long traceId = LoggerHelpers.traceEnter(log, "updateStreamState");
try {
- authenticateAuthorize(scopeName + "/" + streamName, READ_UPDATE);
+ restAuthHelper.authenticateAuthorize(
+ getAuthorizationHeader(),
+ AuthResourceRepresentation.ofStreamInScope(scopeName, streamName), READ_UPDATE);
} catch (AuthException e) {
log.warn("Update stream for {} failed due to authentication failure.", scopeName + "/" + streamName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
@@ -683,7 +687,9 @@ public void getScalingEvents(final String scopeName, final String streamName, fi
long traceId = LoggerHelpers.traceEnter(log, "getScalingEvents");
try {
- authenticateAuthorize(scopeName + "/" + streamName, READ);
+ restAuthHelper.authenticateAuthorize(
+ getAuthorizationHeader(),
+ AuthResourceRepresentation.ofStreamInScope(scopeName, streamName), READ);
} catch (AuthException e) {
log.warn("Get scaling events for {} failed due to authentication failure.", scopeName + "/" + streamName);
asyncResponse.resume(Response.status(Status.fromStatusCode(e.getResponseCode())).build());
diff --git a/controller/src/main/java/io/pravega/controller/server/rpc/auth/AuthHelper.java b/controller/src/main/java/io/pravega/controller/server/rpc/auth/AuthHelper.java
index 4d63c1befd8..1416039d7f9 100644
--- a/controller/src/main/java/io/pravega/controller/server/rpc/auth/AuthHelper.java
+++ b/controller/src/main/java/io/pravega/controller/server/rpc/auth/AuthHelper.java
@@ -31,22 +31,29 @@ public static AuthHelper getDisabledAuthHelper() {
return new AuthHelper(false, "");
}
- public String checkAuthorization(String resource, AuthHandler.Permissions expectedLevel) {
+ public boolean isAuthorized(String resource, AuthHandler.Permissions permission) {
if (isAuthEnabled) {
PravegaInterceptor currentInterceptor = PravegaInterceptor.INTERCEPTOR_OBJECT.get();
AuthHandler.Permissions allowedLevel;
if (currentInterceptor == null) {
- //No interceptor, and authorization is enabled. Means no access is granted.
+ //No interceptor, and authorization is enabled. That means no access is granted.
allowedLevel = AuthHandler.Permissions.NONE;
} else {
allowedLevel = currentInterceptor.authorize(resource);
}
- if (allowedLevel.ordinal() < expectedLevel.ordinal()) {
- throw new RuntimeException(new AuthorizationException("Access not allowed"));
- }
+ return (allowedLevel.ordinal() < permission.ordinal()) ? false : true;
+ } else {
+ return true;
+ }
+ }
+
+ public String checkAuthorization(String resource, AuthHandler.Permissions expectedLevel) {
+ if (isAuthorized(resource, expectedLevel)) {
+ return "";
+ } else {
+ throw new RuntimeException(new AuthorizationException("Access not allowed"));
}
- return "";
}
public String checkAuthorizationAndCreateToken(String resource, AuthHandler.Permissions expectedLevel) {
diff --git a/controller/src/main/java/io/pravega/controller/server/rpc/auth/PasswordAuthHandler.java b/controller/src/main/java/io/pravega/controller/server/rpc/auth/PasswordAuthHandler.java
index 1ec2d78f73a..9548feccc74 100644
--- a/controller/src/main/java/io/pravega/controller/server/rpc/auth/PasswordAuthHandler.java
+++ b/controller/src/main/java/io/pravega/controller/server/rpc/auth/PasswordAuthHandler.java
@@ -111,7 +111,7 @@ private static String[] parseToken(String token) {
}
private Permissions authorizeForUser(PravegaACls pravegaACls, String resource) {
- Permissions retVal = Permissions.NONE;
+ Permissions result = Permissions.NONE;
/**
* `*` Means a wildcard.
@@ -119,14 +119,44 @@ private Permissions authorizeForUser(PravegaACls pravegaACls, String resource) {
* If it is a partial match, the target has to end with a `/`
*/
for (PravegaAcl acl : pravegaACls.acls) {
- if (acl.resource.equals(resource) ||
- (acl.resource.endsWith("/") && resource.startsWith(acl.resource))
- || (resource.startsWith(acl.resource + "/"))
- || ((acl.resource.equals("*")) && (acl.acl.ordinal() > retVal.ordinal()))) {
- retVal = acl.acl;
+
+ // Separating into different blocks, to make the code more understandable.
+ // It makes the code look a bit strange, but it is still simpler and easier to decipher than what it be
+ // if we combine the conditions.
+
+ if (acl.isResource(resource)) {
+ // Example: resource = "myscope", acl-resource = "myscope"
+ result = acl.permissions;
+ break;
+ }
+
+ if (acl.isResource("/*") && !resource.contains("/")) {
+ // Example: resource = "myscope", acl-resource ="/*"
+ result = acl.permissions;
+ break;
+ }
+
+ if (acl.resourceEndsWith("/") && acl.resourceStartsWith(resource)) {
+ result = acl.permissions;
+ break;
+ }
+
+ // Say, resource is myscope/mystream. ACL specifies permission for myscope/*.
+ // Auth should return the the ACL's permissions in that case.
+ if (resource.contains("/") && !resource.endsWith("/")) {
+ String[] values = resource.split("/");
+ if (acl.isResource(values[0] + "/*")) {
+ result = acl.permissions;
+ break;
+ }
+ }
+
+ if (acl.isResource("*") && acl.hasHigherPermissionsThan(result)) {
+ result = acl.permissions;
+ break;
}
}
- return retVal;
+ return result;
}
private List getAcls(String aclString) {
@@ -154,9 +184,23 @@ private class PravegaACls {
@Data
private class PravegaAcl {
- private final String resource;
- private final Permissions acl;
+ private final String resourceRepresentation;
+ private final Permissions permissions;
+
+ public boolean isResource(String resource) {
+ return resourceRepresentation.equals(resource);
+ }
+ public boolean resourceEndsWith(String resource) {
+ return resourceRepresentation.endsWith(resource);
+ }
+
+ public boolean resourceStartsWith(String resource) {
+ return resourceRepresentation.startsWith(resource);
+ }
+ public boolean hasHigherPermissionsThan(Permissions input) {
+ return this.permissions.ordinal() > input.ordinal();
+ }
}
}
diff --git a/controller/src/main/java/io/pravega/controller/server/rpc/auth/PravegaAuthManager.java b/controller/src/main/java/io/pravega/controller/server/rpc/auth/PravegaAuthManager.java
index ceb373be6f2..ec9ca565fce 100644
--- a/controller/src/main/java/io/pravega/controller/server/rpc/auth/PravegaAuthManager.java
+++ b/controller/src/main/java/io/pravega/controller/server/rpc/auth/PravegaAuthManager.java
@@ -9,6 +9,7 @@
*/
package io.pravega.controller.server.rpc.auth;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.ServerBuilder;
import io.pravega.auth.AuthException;
@@ -20,6 +21,7 @@
import java.util.Map;
import java.util.ServiceLoader;
import javax.annotation.concurrent.GuardedBy;
+
import lombok.extern.slf4j.Slf4j;
/**
@@ -31,6 +33,7 @@
@Slf4j
public class PravegaAuthManager {
private final GRPCServerConfig serverConfig;
+
@GuardedBy("this")
private final Map handlerMap;
@@ -127,6 +130,19 @@ public boolean authorize(String resource, Principal principal, String credential
return handler.authorize(resource, principal).ordinal() >= level.ordinal();
}
+
+ /**
+ * This method is not only visible for testing, but also intended to be used solely for testing. It allows tests
+ * to inject and register custom auth handlers. Also, this method is idempotent.
+ *
+ * @param authHandler the {@code AuthHandler} implementation to register
+ * @Throws NullPointerException {@code authHandler} is null
+ */
+ @VisibleForTesting
+ public synchronized void registerHandler(AuthHandler authHandler) {
+ this.handlerMap.put(authHandler.getHandlerName(), authHandler);
+ }
+
/**
* Loads the custom implementations of the AuthHandler interface dynamically. Registers the interceptors with grpc.
* Stores the implementation in a local map for routing the REST auth request.
diff --git a/controller/src/main/java/io/pravega/controller/server/rpc/auth/RESTAuthHelper.java b/controller/src/main/java/io/pravega/controller/server/rpc/auth/RESTAuthHelper.java
new file mode 100644
index 00000000000..f3226dc2457
--- /dev/null
+++ b/controller/src/main/java/io/pravega/controller/server/rpc/auth/RESTAuthHelper.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright (c) 2019 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.controller.server.rpc.auth;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.pravega.auth.AuthException;
+import io.pravega.auth.AuthHandler;
+import io.pravega.auth.AuthenticationException;
+import io.pravega.auth.AuthorizationException;
+
+import javax.ws.rs.core.Response;
+import java.security.Principal;
+import java.util.List;
+
+/**
+ * Helper class for handling auth (authentication and authorization) for the REST API.
+ */
+public class RESTAuthHelper {
+
+ /**
+ * The delegate used for performing authentication and authorization.
+ */
+ private final PravegaAuthManager pravegaAuthManager;
+
+ public RESTAuthHelper(PravegaAuthManager pravegaAuthManager) {
+ this.pravegaAuthManager = pravegaAuthManager;
+ }
+
+ /**
+ * Determines whether the given {@code principal} has specified {@code permission} on the given {@code resource}.
+ *
+ * @param authHeader contents of an HTTP Authorization header
+ * @param resource representation of the resource being accessed
+ * @param principal the identity of the subject accessing the resource
+ * @param permission the permission
+ * @return {@code true} if either auth is disabled or authorization is granted, and {@code false}
+ * if auth is enabled and authorization is not granted
+ * @throws AuthException if either authentication or authorization fails
+ */
+ public boolean isAuthorized(List authHeader, String resource, Principal principal,
+ AuthHandler.Permissions permission)
+ throws AuthException {
+ if (isAuthEnabled()) {
+ return pravegaAuthManager.authorize(resource,
+ principal,
+ parseCredentials(authHeader),
+ permission);
+ } else {
+ // Since auth is disabled, every request is deemed to have been authorized.
+ return true;
+ }
+ }
+
+ /**
+ * Ensures that the given {@code principal} has specified {@code permission} on the given {@code resource}.
+ *
+ * @param authHeader contents of an HTTP Authorization header
+ * @param resource representation of the resource being accessed
+ * @param principal the identity of the subject accessing the resource
+ * @param permission the permission
+ * @throws AuthException if authentication or authorization fails
+ */
+ public void authorize(List authHeader, String resource, Principal principal,
+ AuthHandler.Permissions permission) throws AuthException {
+ if (!isAuthorized(authHeader, resource, principal, permission)) {
+ throw new AuthorizationException(
+ String.format("Failed to authorize for resource [%s]", resource),
+ Response.Status.FORBIDDEN.getStatusCode());
+ }
+ }
+
+ /**
+ * Authenticates the subject represented by the specified HTTP Authorization Header value.
+ *
+ * @param authHeader contents of an HTTP Authorization header
+ * @return a {@code principal} representing the identity of the subject if auth is enabled; otherwise {@code null}
+ * @throws AuthException if authentication fails
+ */
+ public Principal authenticate(List authHeader) throws AuthException {
+ if (isAuthEnabled()) {
+ String credentials = parseCredentials(authHeader);
+ return pravegaAuthManager.authenticate(credentials);
+ }
+ return null;
+ }
+
+ /**
+ * Ensures that the subject represented by the given {@code authHeader} is authenticated and that the subject is
+ * authorized for the specified {@code permission} on the given {@code resource}.
+ *
+ * @param authHeader contents of an HTTP Authorization header
+ * @param resource representation of the resource being accessed
+ * @param permission the permission
+ * @throws AuthException if authentication/authorization fails
+ */
+ public void authenticateAuthorize(List authHeader, String resource, AuthHandler.Permissions permission)
+ throws AuthException {
+ if (isAuthEnabled()) {
+ String credentials = parseCredentials(authHeader);
+ if (!pravegaAuthManager.authenticateAndAuthorize(resource, credentials, permission)) {
+ throw new AuthException(
+ String.format("Failed to authenticate or authorize for resource [%s]", resource),
+ Response.Status.FORBIDDEN.getStatusCode());
+ }
+ }
+ }
+
+ private String parseCredentials(List authHeader) throws AuthenticationException {
+ if (authHeader == null || authHeader.isEmpty()) {
+ throw new AuthenticationException("Missing authorization header.");
+ }
+
+ // Expecting a single value here. If there are multiple, we'll deal with just the first one.
+ String credentials = authHeader.get(0);
+ Preconditions.checkNotNull(credentials, "Missing credentials.");
+ return credentials;
+ }
+
+ @VisibleForTesting
+ boolean isAuthEnabled() {
+ return pravegaAuthManager != null;
+ }
+}
\ No newline at end of file
diff --git a/controller/src/main/java/io/pravega/controller/server/rpc/grpc/v1/ControllerServiceImpl.java b/controller/src/main/java/io/pravega/controller/server/rpc/grpc/v1/ControllerServiceImpl.java
index 939a17c6e49..57dec97a5e2 100644
--- a/controller/src/main/java/io/pravega/controller/server/rpc/grpc/v1/ControllerServiceImpl.java
+++ b/controller/src/main/java/io/pravega/controller/server/rpc/grpc/v1/ControllerServiceImpl.java
@@ -19,6 +19,7 @@
import io.pravega.common.tracing.RequestTag;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.tracing.TagLogger;
+import io.pravega.controller.server.AuthResourceRepresentation;
import io.pravega.controller.server.ControllerService;
import io.pravega.controller.server.rpc.auth.AuthHelper;
import io.pravega.controller.stream.api.grpc.v1.Controller;
@@ -101,8 +102,8 @@ public void createStream(StreamConfig request, StreamObserver this.authHelper.checkAuthorizationAndCreateToken(scope + "/"
- + stream, AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(
+ AuthResourceRepresentation.ofStreamsInScope(scope), AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.createStream(scope, stream,
ModelHelper.encode(request),
System.currentTimeMillis()),
@@ -117,8 +118,8 @@ public void updateStream(StreamConfig request, StreamObserver this.authHelper.checkAuthorization(scope + "/"
- + stream, AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(scope, stream), AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.updateStream(scope, stream,
ModelHelper.encode(request)),
responseObserver, requestTag);
@@ -131,8 +132,9 @@ public void truncateStream(Controller.StreamCut request, StreamObserver this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.truncateStream(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), ModelHelper.encode(request)), responseObserver, requestTag);
}
@@ -144,8 +146,9 @@ public void sealStream(StreamInfo request, StreamObserver re
log.info(requestTag.getRequestId(), "sealStream called for stream {}/{}.",
request.getScope(), request.getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getScope() + "/" +
- request.getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.sealStream(request.getScope(), request.getStream()), responseObserver, requestTag);
}
@@ -156,16 +159,18 @@ public void deleteStream(StreamInfo request, StreamObserver
log.info(requestTag.getRequestId(), "deleteStream called for stream {}/{}.",
request.getScope(), request.getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getScope() + "/" +
- request.getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.deleteStream(request.getScope(), request.getStream()), responseObserver, requestTag);
}
@Override
public void getCurrentSegments(StreamInfo request, StreamObserver responseObserver) {
log.info("getCurrentSegments called for stream {}/{}.", request.getScope(), request.getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(request.getScope() + "/" +
- request.getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(
+ AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.getCurrentSegments(request.getScope(), request.getStream())
.thenApply(segmentRanges -> SegmentRanges.newBuilder()
.addAllSegmentRanges(segmentRanges)
@@ -178,8 +183,10 @@ public void getCurrentSegments(StreamInfo request, StreamObserver
public void getSegments(GetSegmentsRequest request, StreamObserver responseObserver) {
log.debug("getSegments called for stream " + request.getStreamInfo().getScope() + "/" +
request.getStreamInfo().getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(),
+ request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.getSegmentsAtHead(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream())
.thenApply(segments -> {
@@ -199,8 +206,9 @@ public void getSegments(GetSegmentsRequest request, StreamObserver responseObserver) {
log.info("getSegmentsImmediatelyFollowing called for segment {} ", segmentId);
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(segmentId.getStreamInfo().getScope() + "/" +
- segmentId.getStreamInfo().getStream(), AuthHandler.Permissions.READ),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(segmentId.getStreamInfo().getScope(),
+ segmentId.getStreamInfo().getStream()), AuthHandler.Permissions.READ),
delegationToken -> controllerService.getSegmentsImmediatelyFollowing(segmentId)
.thenApply(ModelHelper::createSuccessorResponse)
.thenApply(response -> {
@@ -215,7 +223,8 @@ public void getSegmentsBetween(Controller.StreamCutRange request, StreamObserver
log.info("getSegmentsBetweenStreamCuts called for stream {} for cuts from {} to {}", request.getStreamInfo(), request.getFromMap(), request.getToMap());
String scope = request.getStreamInfo().getScope();
String stream = request.getStreamInfo().getStream();
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(scope + "/" + stream, AuthHandler.Permissions.READ),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(scope, stream), AuthHandler.Permissions.READ),
delegationToken -> controllerService.getSegmentsBetweenStreamCuts(request)
.thenApply(segments -> ModelHelper.createStreamCutRangeResponse(scope, stream,
segments.stream().map(x -> ModelHelper.createSegmentId(scope, stream, x.segmentId()))
@@ -230,8 +239,9 @@ public void scale(ScaleRequest request, StreamObserver responseOb
log.info(requestTag.getRequestId(), "scale called for stream {}/{}.",
request.getStreamInfo().getScope(), request.getStreamInfo().getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.scale(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getSealedSegmentsList(),
@@ -245,8 +255,9 @@ public void scale(ScaleRequest request, StreamObserver responseOb
public void checkScale(ScaleStatusRequest request, StreamObserver responseObserver) {
log.debug("check scale status called for stream {}/{}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.checkScale(request.getStreamInfo().getScope(), request.getStreamInfo().getStream(),
request.getEpoch()), responseObserver);
}
@@ -255,8 +266,9 @@ public void checkScale(ScaleStatusRequest request, StreamObserver responseObserver) {
log.info("getURI called for segment {}/{}/{}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), request.getSegmentId());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.getURI(request),
responseObserver);
}
@@ -266,8 +278,10 @@ public void isSegmentValid(SegmentId request,
StreamObserver responseObserver) {
log.info("isSegmentValid called for segment {}/{}/{}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), request.getSegmentId());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(),
+ request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.isSegmentValid(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getSegmentId())
@@ -279,8 +293,10 @@ public void isSegmentValid(SegmentId request,
public void isStreamCutValid(Controller.StreamCut request, StreamObserver responseObserver) {
log.info("isStreamCutValid called for stream {}/{} streamcut {}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), request.getCutMap());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(),
+ request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.isStreamCutValid(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getCutMap())
@@ -292,8 +308,9 @@ public void isStreamCutValid(Controller.StreamCut request, StreamObserver responseObserver) {
log.info("createTransaction called for stream {}/{}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.createTransaction(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getLease())
@@ -309,8 +326,11 @@ public void createTransaction(CreateTxnRequest request, StreamObserver responseObserver) {
log.info("commitTransaction called for stream {}/{}, txnId={}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), request.getTxnId());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(
+ request.getStreamInfo().getScope(),
+ request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.commitTransaction(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getTxnId()),
@@ -321,8 +341,9 @@ public void commitTransaction(TxnRequest request, StreamObserver resp
public void abortTransaction(TxnRequest request, StreamObserver responseObserver) {
log.info("abortTransaction called for stream {}/{}, txnId={}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), request.getTxnId());
- authenticateExecuteAndProcessResults( () -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults( () -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.abortTransaction(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getTxnId()),
@@ -333,8 +354,9 @@ public void abortTransaction(TxnRequest request, StreamObserver respo
public void pingTransaction(PingTxnRequest request, StreamObserver responseObserver) {
log.info("pingTransaction called for stream {}/{}, txnId={}", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), request.getTxnId());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ),
delegationToken -> controllerService.pingTransaction(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getTxnId(),
@@ -346,8 +368,9 @@ public void pingTransaction(PingTxnRequest request, StreamObserver responseObserver) {
log.info("checkTransactionState called for stream {}/{}, txnId={}.", request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(), request.getTxnId());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getStreamInfo().getScope() + "/" +
- request.getStreamInfo().getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofStreamInScope(request.getStreamInfo().getScope(), request.getStreamInfo().getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.checkTransactionStatus(request.getStreamInfo().getScope(),
request.getStreamInfo().getStream(),
request.getTxnId()),
@@ -358,7 +381,8 @@ public void checkTransactionState(TxnRequest request, StreamObserver r
public void createScope(ScopeInfo request, StreamObserver responseObserver) {
RequestTag requestTag = requestTracker.initializeAndTrackRequestTag(requestIdGenerator.get(), "createScope", request.getScope());
log.info(requestTag.getRequestId(), "createScope called for scope {}.", request.getScope());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getScope(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofScopes(), AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.createScope(request.getScope()),
responseObserver, requestTag);
}
@@ -368,24 +392,29 @@ public void listStreamsInScope(Controller.StreamsInScopeRequest request, StreamO
String scope = request.getScope().getScope();
RequestTag requestTag = requestTracker.initializeAndTrackRequestTag(requestIdGenerator.get(), "listStream", scope);
log.info(requestTag.getRequestId(), "listStream called for scope {}.", scope);
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(scope, AuthHandler.Permissions.READ),
- delegationToken -> controllerService.listStreams(scope, request.getContinuationToken().getToken(), listStreamsInScopeLimit)
- .thenApply(response -> {
- List streams = response.getKey().stream()
- .map(m -> StreamInfo.newBuilder().setScope(scope).setStream(m).build())
- .collect(Collectors.toList());
- return Controller.StreamsInScopeResponse.newBuilder().addAllStreams(
- streams).setContinuationToken(Controller.ContinuationToken
- .newBuilder().setToken(response.getValue()).build()).build();
- }),
- responseObserver, requestTag);
+ authenticateExecuteAndProcessResults(
+ () -> this.authHelper.checkAuthorization(AuthResourceRepresentation.ofScope(scope),
+ AuthHandler.Permissions.READ),
+ delegationToken -> controllerService.listStreams(
+ scope, request.getContinuationToken().getToken(), listStreamsInScopeLimit)
+ .thenApply(response -> {
+ List streams = response.getKey().stream()
+ .filter(m -> authHelper.isAuthorized(m, AuthHandler.Permissions.READ))
+ .map(m -> StreamInfo.newBuilder().setScope(scope).setStream(m).build())
+ .collect(Collectors.toList());
+ return Controller.StreamsInScopeResponse.newBuilder().addAllStreams(streams)
+ .setContinuationToken(Controller.ContinuationToken.newBuilder().setToken(
+ response.getValue()).build()).build();
+ }),
+ responseObserver, requestTag);
}
@Override
public void deleteScope(ScopeInfo request, StreamObserver responseObserver) {
RequestTag requestTag = requestTracker.initializeAndTrackRequestTag(requestIdGenerator.get(), "deleteScope", request.getScope());
log.info(requestTag.getRequestId(), "deleteScope called for scope {}.", request.getScope());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(request.getScope(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorization(
+ AuthResourceRepresentation.ofScopes(), AuthHandler.Permissions.READ_UPDATE),
delegationToken -> controllerService.deleteScope(request.getScope()),
responseObserver, requestTag);
}
@@ -393,8 +422,9 @@ public void deleteScope(ScopeInfo request, StreamObserver res
@Override
public void getDelegationToken(StreamInfo request, StreamObserver responseObserver) {
log.info("getDelegationToken called for stream {}/{}.", request.getScope(), request.getStream());
- authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(request.getScope() + "/" +
- request.getStream(), AuthHandler.Permissions.READ_UPDATE),
+ authenticateExecuteAndProcessResults(() -> this.authHelper.checkAuthorizationAndCreateToken(
+ AuthResourceRepresentation.ofStreamInScope(request.getScope(), request.getStream()),
+ AuthHandler.Permissions.READ_UPDATE),
delegationToken -> CompletableFuture.completedFuture(Controller.DelegationToken
.newBuilder()
.setDelegationToken(delegationToken)
diff --git a/controller/src/test/java/io/pravega/controller/mocks/FakeAuthHandler.java b/controller/src/test/java/io/pravega/controller/mocks/FakeAuthHandler.java
new file mode 100644
index 00000000000..689c9230214
--- /dev/null
+++ b/controller/src/test/java/io/pravega/controller/mocks/FakeAuthHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) 2019 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.controller.mocks;
+
+import io.pravega.auth.AuthHandler;
+import io.pravega.auth.ServerConfig;
+import io.pravega.controller.server.rpc.auth.UserPrincipal;
+
+import java.security.Principal;
+
+public class FakeAuthHandler implements AuthHandler {
+
+ public static final String UNPRIVILEGED_USER = "unPrivilegedUser";
+ public static final String PRIVILEGED_USER = "privilegedUser";
+
+ @Override
+ public String getHandlerName() {
+ return "Basic";
+ }
+
+ @Override
+ public Principal authenticate(String token) {
+ return new UserPrincipal(token);
+ }
+
+ @Override
+ public Permissions authorize(String resource, Principal principal) {
+ if (principal.getName().equals(PRIVILEGED_USER)) {
+ return Permissions.READ_UPDATE;
+ } else {
+ return Permissions.NONE;
+ }
+ }
+
+ @Override
+ public void initialize(ServerConfig serverConfig) {
+
+ }
+
+ public static String testAuthToken(String userName) {
+ return String.format("testHandler %s", userName);
+ }
+}
\ No newline at end of file
diff --git a/controller/src/test/java/io/pravega/controller/rest/v1/SecureStreamMetaDataTests.java b/controller/src/test/java/io/pravega/controller/rest/v1/SecureStreamMetaDataTests.java
index 66dd3a320d1..cc23c8d179c 100644
--- a/controller/src/test/java/io/pravega/controller/rest/v1/SecureStreamMetaDataTests.java
+++ b/controller/src/test/java/io/pravega/controller/rest/v1/SecureStreamMetaDataTests.java
@@ -33,10 +33,17 @@ public void setup() throws Exception {
try (FileWriter writer = new FileWriter(file.getAbsolutePath())) {
String passwd = passwordEncryptor.encryptPassword("1111_aaaa");
+
+ // Admin has READ_WRITE permission to everything
writer.write("admin:" + passwd + ":*,READ_UPDATE\n");
- writer.write("user1:" + passwd + ":/,READ;scope1,READ_UPDATE;scope2,READ_UPDATE;\n");
+
+ // User "user1" can:
+ // - list, create and delete scopes
+ // - Create and delete streams within scopes "scope1" and "scope2". Also if "user1" lists scopes,
+ // she'll see those scopes, but not "scope3".
+ writer.write("user1:" + passwd + ":/,READ_UPDATE;scope1,READ_UPDATE;scope1/*,READ_UPDATE;scope2,READ_UPDATE;scope2/*,READ_UPDATE;\n");
+
writer.write("user2:" + passwd + ":/,READ;scope3,READ_UPDATE;\n");
- writer.close();
}
this.authManager = new PravegaAuthManager(GRPCServerConfigImpl.builder()
diff --git a/controller/src/test/java/io/pravega/controller/rest/v1/StreamMetaDataAuthFocusedTests.java b/controller/src/test/java/io/pravega/controller/rest/v1/StreamMetaDataAuthFocusedTests.java
new file mode 100644
index 00000000000..ff37a23942e
--- /dev/null
+++ b/controller/src/test/java/io/pravega/controller/rest/v1/StreamMetaDataAuthFocusedTests.java
@@ -0,0 +1,657 @@
+/**
+ * Copyright (c) 2019 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.controller.rest.v1;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import io.grpc.ServerBuilder;
+import io.pravega.client.ClientConfig;
+import io.pravega.client.netty.impl.ConnectionFactoryImpl;
+import io.pravega.client.stream.RetentionPolicy;
+import io.pravega.client.stream.ScalingPolicy;
+import io.pravega.client.stream.StreamConfiguration;
+import io.pravega.controller.server.ControllerService;
+import io.pravega.controller.server.eventProcessor.LocalController;
+import io.pravega.controller.server.rest.RESTServer;
+import io.pravega.controller.server.rest.RESTServerConfig;
+import io.pravega.controller.server.rest.generated.model.CreateScopeRequest;
+import io.pravega.controller.server.rest.generated.model.CreateStreamRequest;
+import io.pravega.controller.server.rest.generated.model.RetentionConfig;
+import io.pravega.controller.server.rest.generated.model.ScalingConfig;
+import io.pravega.controller.server.rest.generated.model.ScopesList;
+import io.pravega.controller.server.rest.generated.model.StreamState;
+import io.pravega.controller.server.rest.generated.model.StreamsList;
+import io.pravega.controller.server.rest.impl.RESTServerConfigImpl;
+import io.pravega.controller.server.rpc.auth.PravegaAuthManager;
+import io.pravega.controller.server.rpc.auth.StrongPasswordProcessor;
+import io.pravega.controller.server.rpc.grpc.impl.GRPCServerConfigImpl;
+import io.pravega.controller.stream.api.grpc.v1.Controller;
+import io.pravega.test.common.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.security.NoSuchAlgorithmException;
+import java.security.spec.InvalidKeySpecException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are authorization-related tests elsewhere ({@link SecureStreamMetaDataTests} and
+ * {@link UserSecureStreamMetaDataTests}) too. Here, we have focused authorization tests which test the logic more
+ * comprehensively; Tests here also run much quicker, since we share REST server, auth handler and its configuration
+ * across all tests in this class.
+ *
+ * Note: Since the tests are intended to run using a shared REST server, it is important to ensure that the tests do not
+ * create resources with the same names, as doing so can make the tests indirectly dependent on each other and flaky.
+ */
+public class StreamMetaDataAuthFocusedTests {
+
+ private final static int HTTP_STATUS_OK = 200;
+ private final static int HTTP_STATUS_CREATED = 201;
+ private final static int HTTP_STATUS_NOCONTENT = 204;
+ private final static int HTTP_STATUS_UNAUTHORIZED = 401;
+ private final static int HTTP_STATUS_FORBIDDEN = 403;
+
+ private final static String USER_PRIVILEGED = "superUser";
+ private final static String USER_SCOPE_CREATOR = "scopeCreator";
+ private final static String USER_SCOPE_LISTER = "scopeLister";
+ private final static String USER_SCOPE_MANAGER = "scopeManager";
+ private final static String USER_STREAMS_IN_A_SCOPE_CREATOR = "streamsinScopeCreater";
+ private final static String USER_USER1 = "user1";
+ private final static String USER_WITH_NO_ROOT_ACCESS = "userWithNoAccessToRoot";
+ private final static String USER_UNAUTHORIZED = "unauthorizedUser";
+ private final static String USER_ACCESS_TO_SUBSET_OF_SCOPES = "userAccessToSubsetOfScopes";
+ private final static String USER_WITH_NO_AUTHORIZATIONS = "userWithNoAuthorizations";
+ private final static String USER_WITH_READ_UPDATE_ROOT = "userWithReadUpdatePermissionOnRoot";
+ private final static String USER_ACCESS_TO_SCOPES_BUT_NOSTREAMS = "userAuthOnScopeButNotOnStreams";
+ private final static String USER_ACCESS_TO_SCOPES_READ_ALLSTREAMS = "userAuthOnScopeAndReadOnAllStreams";
+ private final static String USER_ACCESS_TO_SCOPES_READUPDATE_ALLSTREAMS = "userAuthOnScopeAndWriteOnAllStreams";
+ private final static String USER_ACCESS_TO_SCOPE_WRITE_SPECIFIC_STREAM = "userAuthOnScopeAndWriteOnSpecificStream";
+ private final static String DEFAULT_PASSWORD = "1111_aaaa";
+
+ // Suppressing the checkstyle errors below, as we are using a class initializer (a method with @BeforeClass
+ // annotation) for efficiency, and we cannot make these members final.
+
+ @SuppressWarnings("checkstyle:StaticVariableName")
+ private static ControllerService mockControllerService;
+
+ @SuppressWarnings("checkstyle:StaticVariableName")
+ private static RESTServerConfig serverConfig;
+
+ @SuppressWarnings("checkstyle:StaticVariableName")
+ private static RESTServer restServer;
+
+ @SuppressWarnings("checkstyle:StaticVariableName")
+ private static Client client;
+
+ @SuppressWarnings("checkstyle:StaticVariableName")
+ private static File passwordHandlerInputFile;
+
+ // We want to ensure that the tests in this class are run one after another (in no particular sequence), as we
+ // are using a shared server (for execution efficiency). We use this in setup and teardown method initiazers
+ // for ensuring the desired behavior.
+ Lock sequential = new ReentrantLock();
+
+ //region Test class initializer and cleanup
+
+ @BeforeClass
+ public static void initializer() throws IOException, InvalidKeySpecException, NoSuchAlgorithmException {
+
+ passwordHandlerInputFile = File.createTempFile("AuthFocusedTests", ".txt");
+
+ StrongPasswordProcessor passwordEncryptor = StrongPasswordProcessor.builder().build();
+
+ try (FileWriter writer = new FileWriter(passwordHandlerInputFile.getAbsolutePath())) {
+ String encryptedPassword = passwordEncryptor.encryptPassword(DEFAULT_PASSWORD);
+
+ // This user can do anything in the system.
+ writer.write(credentialsAndAclAsString(USER_PRIVILEGED, encryptedPassword, "*,READ_UPDATE"));
+
+ // This user can list, get, update and delete all scopes
+ writer.write(credentialsAndAclAsString(USER_SCOPE_CREATOR, encryptedPassword, "/,READ_UPDATE"));
+
+ // This user can list scopes and upon listing will see all scopes (/*).
+ writer.write(credentialsAndAclAsString(USER_SCOPE_LISTER, encryptedPassword, "/,READ;/*,READ"));
+
+ // This user can list, read, update, delete all scopes. Upon listing scopes, this user will see all scopes.
+ writer.write(credentialsAndAclAsString(USER_SCOPE_MANAGER, encryptedPassword, "/,READ_UPDATE;/*,READ_UPDATE"));
+
+ // This user can create, update, delete all child objects of a scope (streams, reader groups, etc.)
+ writer.write(credentialsAndAclAsString(USER_STREAMS_IN_A_SCOPE_CREATOR, encryptedPassword, "sisc-scope,READ_UPDATE;"));
+
+ writer.write(credentialsAndAclAsString(USER_USER1, encryptedPassword, "/,READ_UPDATE;scope1,READ_UPDATE;scope2,READ_UPDATE;"));
+ writer.write(credentialsAndAclAsString(USER_WITH_NO_ROOT_ACCESS, encryptedPassword, "scope1,READ_UPDATE;scope2,READ_UPDATE;"));
+ writer.write(credentialsAndAclAsString(USER_UNAUTHORIZED, encryptedPassword, "/,READ_UPDATE;scope1,READ_UPDATE;scope2,READ_UPDATE;"));
+ writer.write(credentialsAndAclAsString(USER_ACCESS_TO_SUBSET_OF_SCOPES, encryptedPassword, "/,READ;scope3,READ_UPDATE;"));
+ writer.write(credentialsAndAclAsString(USER_WITH_NO_AUTHORIZATIONS, encryptedPassword, ";"));
+ writer.write(credentialsAndAclAsString(USER_WITH_READ_UPDATE_ROOT, encryptedPassword, "scopeToDelete,READ_UPDATE;"));
+ writer.write(credentialsAndAclAsString(USER_ACCESS_TO_SCOPES_BUT_NOSTREAMS, encryptedPassword, "myscope,READ_UPDATE;"));
+ writer.write(credentialsAndAclAsString(USER_ACCESS_TO_SCOPES_READ_ALLSTREAMS, encryptedPassword,
+ "myscope,READ_UPDATE;myscope/*,READ;"));
+ writer.write(credentialsAndAclAsString(USER_ACCESS_TO_SCOPES_READUPDATE_ALLSTREAMS, encryptedPassword,
+ "myscope,READ_UPDATE;myscope/*,READ_UPDATE;"));
+ writer.write(credentialsAndAclAsString(USER_ACCESS_TO_SCOPE_WRITE_SPECIFIC_STREAM, encryptedPassword,
+ "myscope,READ_UPDATE;myscope/stream1,READ_UPDATE;"));
+ }
+
+ PravegaAuthManager authManager = new PravegaAuthManager(GRPCServerConfigImpl.builder()
+ .authorizationEnabled(true)
+ .userPasswordFile(passwordHandlerInputFile.getAbsolutePath())
+ .port(1000)
+ .build());
+ ServerBuilder> server = ServerBuilder.forPort(TestUtils.getAvailableListenPort());
+ authManager.registerInterceptors(server);
+
+ mockControllerService = mock(ControllerService.class);
+ serverConfig = RESTServerConfigImpl.builder().host("localhost").port(TestUtils.getAvailableListenPort()).build();
+ LocalController controller = new LocalController(mockControllerService, false, "");
+ restServer = new RESTServer(controller, mockControllerService, authManager, serverConfig,
+ new ConnectionFactoryImpl(ClientConfig.builder()
+ .controllerURI(URI.create("tcp://localhost"))
+ .build()));
+ restServer.startAsync();
+ restServer.awaitRunning();
+ client = ClientBuilder.newClient();
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ if (restServer != null && restServer.isRunning()) {
+ restServer.stopAsync();
+ try {
+ restServer.awaitTerminated(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ // ignore
+ }
+ }
+ if (passwordHandlerInputFile != null) {
+ passwordHandlerInputFile.delete();
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sequential.lock();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ sequential.unlock();
+ }
+
+ //endregion
+
+ //region Scope listing tests
+
+ @Test
+ public void testListScopesReturnsAllScopesForUserWithPermissionOnRootAndChildren() {
+ // Arrange
+ final String resourceURI = getURI() + "v1/scopes";
+ when(mockControllerService.listScopes()).thenReturn(CompletableFuture.completedFuture(
+ Arrays.asList("scopea", "scopeb", "scopec")));
+ Invocation requestInvocation = this.invocationBuilder(resourceURI, USER_SCOPE_LISTER, DEFAULT_PASSWORD)
+ .buildGet();
+
+ // Act
+ Response response = requestInvocation.invoke();
+ ScopesList scopes = response.readEntity(ScopesList.class);
+
+ // Assert
+ assertEquals(3, scopes.getScopes().size());
+
+ response.close();
+ }
+
+ @Test
+ public void testListScopesReturnsFilteredResults() throws ExecutionException, InterruptedException {
+ // Arrange
+ final String resourceURI = getURI() + "v1/scopes";
+ when(mockControllerService.listScopes()).thenReturn(CompletableFuture.completedFuture(
+ Arrays.asList("scope1", "scope2", "scope3")));
+ Invocation requestInvocation = this.invocationBuilder(resourceURI, USER_ACCESS_TO_SUBSET_OF_SCOPES, DEFAULT_PASSWORD)
+ .buildGet();
+
+ // Act
+ Response response = requestInvocation.invoke();
+ ScopesList scopes = response.readEntity(ScopesList.class);
+
+ // Assert
+ assertEquals(1, scopes.getScopes().size());
+ assertEquals("scope3", scopes.getScopes().get(0).getScopeName());
+
+ response.close();
+ }
+
+ @Test
+ public void testListScopesReturnsUnauthorizedStatusForInvalidUser() {
+ // Arrange
+ final String resourceURI = getURI() + "v1/scopes";
+ when(mockControllerService.listScopes()).thenReturn(CompletableFuture.completedFuture(
+ Arrays.asList("scope1", "scope2", "scope3")));
+ Invocation requestInvocation = this.invocationBuilder(resourceURI, "fictitiousUser", "whatever")
+ .buildGet();
+
+ // Act
+ Response response = requestInvocation.invoke();
+
+ // Assert
+ assertEquals(HTTP_STATUS_UNAUTHORIZED, response.getStatus());
+
+ response.close();
+ }
+
+ @Test
+ public void testListScopesIsForbiddenForValidButUnauthorizedUser() {
+ // Arrange
+ final String resourceURI = getURI() + "v1/scopes";
+ when(mockControllerService.listScopes()).thenReturn(CompletableFuture.completedFuture(
+ Arrays.asList("scope1", "scope2", "scope3")));
+ Invocation requestInvocation = this.invocationBuilder(resourceURI,
+ USER_WITH_NO_AUTHORIZATIONS, DEFAULT_PASSWORD)
+ .buildGet();
+
+ // Act
+ Response response = requestInvocation.invoke();
+
+ // Assert
+ assertEquals(HTTP_STATUS_FORBIDDEN, response.getStatus());
+
+ response.close();
+ }
+
+ //endregion
+
+ //region Scope creation tests
+
+ @Test
+ public void testPrivilegedUserCanCreateScope() {
+ Response response = createScope("newScope1", USER_PRIVILEGED, DEFAULT_PASSWORD);
+ assertEquals(HTTP_STATUS_CREATED, response.getStatus());
+ response.close();
+ }
+
+ @Test
+ public void testUserWithPermissionOnRootCanCreateScope() {
+ Response response = createScope("newScope", USER_SCOPE_CREATOR, DEFAULT_PASSWORD);
+ assertEquals(HTTP_STATUS_CREATED, response.getStatus());
+ response.close();
+ }
+
+ //endregion
+
+ //region Scope delete tests
+
+ @Test
+ public void testDeleteScopeSucceedsForAuthorizedUser() {
+ // Arrange
+ String scopeName = "scopeToDelete";
+
+ createScope(scopeName, "privilegedUser", DEFAULT_PASSWORD);
+
+ final String resourceUri = getURI() + "v1/scopes/" + scopeName;
+ when(mockControllerService.deleteScope(scopeName)).thenReturn(
+ CompletableFuture.completedFuture(
+ Controller.DeleteScopeStatus.newBuilder().setStatus(
+ Controller.DeleteScopeStatus.Status.SUCCESS).build()));
+
+ // Act
+ Response response = invocationBuilder(resourceUri, USER_SCOPE_MANAGER, DEFAULT_PASSWORD)
+ .buildDelete().invoke();
+
+ // Assert
+ assertEquals(HTTP_STATUS_NOCONTENT, response.getStatus());
+ response.close();
+ }
+
+ @Test
+ public void testDeleteScopeSucceedsForPrivilegedUser() {
+ String scopeName = "scopeForAdminToDelete";
+
+ // The special thing about this user is that the user is assigned a wildcard permission: "*,READ_UPDATE"
+ String username = USER_PRIVILEGED;
+ String password = DEFAULT_PASSWORD;
+
+ createScope(scopeName, username, password);
+ Response response = deleteScope(scopeName, username, password);
+ assertEquals(HTTP_STATUS_NOCONTENT, response.getStatus());
+ response.close();
+ }
+
+ @Test
+ public void testDeleteScopeIsForbiddenForUnauthorizedUser() {
+ String scopeName = "scope-ud";
+
+ Response createScopeResponse = createScope(scopeName, USER_PRIVILEGED, DEFAULT_PASSWORD);
+ createScopeResponse.close();
+
+ Response response = deleteScope(scopeName, USER_WITH_NO_ROOT_ACCESS, DEFAULT_PASSWORD);
+ assertEquals(HTTP_STATUS_FORBIDDEN, response.getStatus());
+ response.close();
+ }
+
+ //endregion
+
+ //region Stream creation tests
+ @Test
+ public void testCreateStreamsSucceedsForUserHavingWriteAccessToTheScope() {
+ String username = USER_STREAMS_IN_A_SCOPE_CREATOR;
+ String password = DEFAULT_PASSWORD;
+ String scopeName = "sisc-scope";
+ String streamName = "stream1";
+ String streamResourceURI = getURI() + "v1/scopes/" + scopeName + "/streams";
+
+ CompletableFuture createStreamStatus = CompletableFuture.
+ completedFuture(Controller.CreateStreamStatus.newBuilder().setStatus(
+ Controller.CreateStreamStatus.Status.SUCCESS).build());
+
+ final CreateStreamRequest createStreamRequest = new CreateStreamRequest();
+ createStreamRequest.setStreamName(streamName);
+
+ ScalingConfig scalingPolicy = new ScalingConfig();
+ scalingPolicy.setType(ScalingConfig.TypeEnum.FIXED_NUM_SEGMENTS);
+ scalingPolicy.setMinSegments(2);
+
+ RetentionConfig retentionPolicy = new RetentionConfig();
+ retentionPolicy.setType(RetentionConfig.TypeEnum.LIMITED_DAYS);
+ retentionPolicy.setValue(123L);
+
+ createStreamRequest.setScalingPolicy(scalingPolicy);
+ createStreamRequest.setRetentionPolicy(retentionPolicy);
+
+ when(mockControllerService.createStream(any(), any(), any(), anyLong())).thenReturn(createStreamStatus);
+ Response response = this.invocationBuilder(streamResourceURI, username, password)
+ .buildPost(Entity.json(createStreamRequest))
+ .invoke();
+ assertEquals(HTTP_STATUS_CREATED, response.getStatus());
+
+ response.close();
+ }
+
+ //endregion
+
+ //region Streams listing tests
+
+ @Test
+ public void testListStreamsReturnsEmptyListWhenUserHasNoStreamsAssigned() {
+ // Arrange
+ String resourceURI = getURI() + "v1/scopes/myscope/streams";
+
+ Map streamsList = ImmutableMap.of("stream1", this.aStreamConfig(),
+ "stream2", this.aStreamConfig());
+ when(mockControllerService.listStreamsInScope("myscope")).thenReturn(CompletableFuture.completedFuture(streamsList));
+
+ // Act
+ Response response = this.invocationBuilder(resourceURI,
+ USER_ACCESS_TO_SCOPES_BUT_NOSTREAMS, DEFAULT_PASSWORD).buildGet().invoke();
+ StreamsList listedStreams = response.readEntity(StreamsList.class);
+
+ // Assert
+ assertEquals(HTTP_STATUS_OK, response.getStatus());
+ assertEquals(null, listedStreams.getStreams());
+
+ response.close();
+ }
+
+ @Test
+ public void testListStreamsReturnsAllStreamsWhenUserHasWildcardOnScope() {
+ // Arrange
+ String resourceURI = getURI() + "v1/scopes/myscope/streams";
+
+ Map streamsList = ImmutableMap.of(
+ "stream1", this.aStreamConfig(),
+ "stream2", this.aStreamConfig(),
+ "stream3", this.aStreamConfig());
+ when(mockControllerService.listStreamsInScope("myscope")).thenReturn(CompletableFuture.completedFuture(streamsList));
+
+ // Act
+ Response response = this.invocationBuilder(resourceURI,
+ USER_ACCESS_TO_SCOPES_READ_ALLSTREAMS, DEFAULT_PASSWORD).buildGet().invoke();
+ StreamsList listedStreams = response.readEntity(StreamsList.class);
+
+ // Assert
+ assertEquals(HTTP_STATUS_OK, response.getStatus());
+ assertEquals(3, listedStreams.getStreams().size());
+
+ response.close();
+ }
+
+ @Test
+ public void testListStreamsReturnsAllWhenUserHasWildCardAccess() {
+ // Arrange
+ String resourceURI = getURI() + "v1/scopes/myscope/streams";
+
+ Map streamsList = ImmutableMap.of(
+ "stream1", this.aStreamConfig(),
+ "stream2", this.aStreamConfig(),
+ "stream3", this.aStreamConfig());
+ when(mockControllerService.listStreamsInScope("myscope")).thenReturn(CompletableFuture.completedFuture(streamsList));
+
+ // Act
+ Response response = this.invocationBuilder(resourceURI,
+ USER_PRIVILEGED, DEFAULT_PASSWORD).buildGet().invoke();
+ StreamsList listedStreams = response.readEntity(StreamsList.class);
+
+ // Assert
+ assertEquals(HTTP_STATUS_OK, response.getStatus());
+ assertEquals(3, listedStreams.getStreams().size());
+
+ response.close();
+ }
+
+ //endregion
+
+ //region Streams update tests
+ @Test
+ public void testUpdateStreamStateAuthorizedForPrivilegedUser() {
+ String resourceURI = getURI() + "v1/scopes/myscope/streams/stream1/state";
+
+ // Test to seal a stream.
+ when(mockControllerService.sealStream("myscope", "stream1")).thenReturn(CompletableFuture.completedFuture(
+ Controller.UpdateStreamStatus.newBuilder().setStatus(Controller.UpdateStreamStatus.Status.SUCCESS).build()));
+ StreamState streamState = new StreamState().streamState(StreamState.StreamStateEnum.SEALED);
+ Response response = this.invocationBuilder(resourceURI, USER_PRIVILEGED, DEFAULT_PASSWORD)
+ .buildPut(Entity.json(streamState)).invoke();
+
+ assertEquals("Update Stream State response code", HTTP_STATUS_OK, response.getStatus());
+ response.close();
+ }
+
+ @Test
+ public void testUpdateStreamStateAuthorizedForUserWithStreamWriteAccess() {
+ String resourceURI = getURI() + "v1/scopes/myscope/streams/stream1/state";
+
+ // Test to seal a stream.
+ when(mockControllerService.sealStream("myscope", "stream1")).thenReturn(CompletableFuture.completedFuture(
+ Controller.UpdateStreamStatus.newBuilder().setStatus(Controller.UpdateStreamStatus.Status.SUCCESS).build()));
+ StreamState streamState = new StreamState().streamState(StreamState.StreamStateEnum.SEALED);
+ Response response = this.invocationBuilder(resourceURI, USER_ACCESS_TO_SCOPE_WRITE_SPECIFIC_STREAM, DEFAULT_PASSWORD)
+ .buildPut(Entity.json(streamState)).invoke();
+
+ assertEquals("Update Stream State response code", HTTP_STATUS_OK, response.getStatus());
+ response.close();
+ }
+
+ @Test
+ public void testUpdateStreamStateAuthorizedWhenUserHasWildcardAccessOnScope() {
+
+ String resourceURI = getURI() + "v1/scopes/myscope/streams/stream1/state";
+
+ // Test to seal a stream.
+ when(mockControllerService.sealStream("myscope", "stream1")).thenReturn(CompletableFuture.completedFuture(
+ Controller.UpdateStreamStatus.newBuilder().setStatus(Controller.UpdateStreamStatus.Status.SUCCESS).build()));
+ StreamState streamState = new StreamState().streamState(StreamState.StreamStateEnum.SEALED);
+ Response response = this.invocationBuilder(resourceURI,
+ USER_ACCESS_TO_SCOPES_READUPDATE_ALLSTREAMS, DEFAULT_PASSWORD)
+ .buildPut(Entity.json(streamState)).invoke();
+
+ assertEquals("Update Stream State response code", HTTP_STATUS_OK, response.getStatus());
+ response.close();
+ }
+
+ @Test
+ public void testUpdateStreamStateAuthorizedWhenUserHasReadOnlyAccessOnScopeChildren() {
+
+ String resourceURI = getURI() + "v1/scopes/myscope/streams/stream1/state";
+
+ // Test to seal a stream.
+ when(mockControllerService.sealStream("myscope", "stream1")).thenReturn(CompletableFuture.completedFuture(
+ Controller.UpdateStreamStatus.newBuilder().setStatus(Controller.UpdateStreamStatus.Status.SUCCESS).build()));
+ StreamState streamState = new StreamState().streamState(StreamState.StreamStateEnum.SEALED);
+ Response response = this.invocationBuilder(resourceURI,
+ USER_ACCESS_TO_SCOPES_READ_ALLSTREAMS, DEFAULT_PASSWORD)
+ .buildPut(Entity.json(streamState)).invoke();
+
+ assertEquals("Update Stream State response code", HTTP_STATUS_FORBIDDEN, response.getStatus());
+ response.close();
+ }
+
+ //endregion
+
+ //region Combination tests
+ @Test
+ public void testUserWithReadWriteOnAllScopesCanCreateListAndDeleteScopes() {
+ List scopes = Arrays.asList("sm-scope1", "sm-scope2", "sm-scope3");
+ boolean isCreateSuccessful = createScopes(scopes, USER_SCOPE_MANAGER, DEFAULT_PASSWORD);
+ assertTrue(isCreateSuccessful);
+
+ ScopesList listedScopes = listScopes(scopes, USER_SCOPE_MANAGER, DEFAULT_PASSWORD);
+ assertNotNull(listedScopes.getScopes());
+ assertEquals(3, listedScopes.getScopes().size());
+
+ boolean isDeleteSuccessful = deleteScopes(scopes, USER_SCOPE_MANAGER, DEFAULT_PASSWORD);
+ assertTrue(isDeleteSuccessful);
+ }
+
+ //endregion
+
+ //region Private methods
+
+ private static String credentialsAndAclAsString(String username, String password, String acl) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(username)
+ && !Strings.isNullOrEmpty(password)
+ && acl != null
+ && !acl.startsWith(":"));
+
+ // This will return a string that looks like this:"::acl\n"
+ return String.format("%s:%s:%s%n", username, password, acl);
+ }
+
+ private boolean createScopes(List scopeNames, String username, String password) {
+ boolean result = true;
+ for (String scopeName : scopeNames) {
+ Response response = createScope(scopeName, username, password);
+ if (response.getStatus() != HTTP_STATUS_CREATED) {
+ result = false;
+ }
+ response.close();
+ }
+ return result;
+ }
+
+ private Response createScope(String scopeName, String username, String password) {
+
+ final String resourceURI = getURI() + "v1/scopes/";
+ final CreateScopeRequest createScopeRequest = new CreateScopeRequest().scopeName(scopeName);
+
+ // Test to create a new scope.
+ when(mockControllerService.createScope(scopeName)).thenReturn(CompletableFuture.completedFuture(
+ Controller.CreateScopeStatus.newBuilder().setStatus(
+ Controller.CreateScopeStatus.Status.SUCCESS).build()));
+ return invocationBuilder(resourceURI, username, password).buildPost(Entity.json(createScopeRequest)).invoke();
+ }
+
+ private boolean deleteScopes(List scopeNames, String username, String password) {
+ boolean result = true;
+ for (String scopeName : scopeNames) {
+ Response response = deleteScope(scopeName, username, password);
+ if (response.getStatus() != HTTP_STATUS_NOCONTENT) {
+ result = false;
+ }
+ response.close();
+ }
+ return result;
+ }
+
+ private Response deleteScope(String scopeName, String username, String password) {
+ final String resourceUri = getURI() + "v1/scopes/" + scopeName;
+
+ when(mockControllerService.deleteScope(scopeName)).thenReturn(
+ CompletableFuture.completedFuture(
+ Controller.DeleteScopeStatus.newBuilder().setStatus(
+ Controller.DeleteScopeStatus.Status.SUCCESS).build()));
+
+ return invocationBuilder(resourceUri, username, password)
+ .buildDelete().invoke();
+ }
+
+ private ScopesList listScopes(List scopeNames, String username, String password) {
+ final String resourceURI = getURI() + "v1/scopes";
+ when(mockControllerService.listScopes())
+ .thenReturn(CompletableFuture.completedFuture(scopeNames));
+ Invocation requestInvocation = this.invocationBuilder(resourceURI, username, password)
+ .buildGet();
+
+ Response response = requestInvocation.invoke();
+ ScopesList scopes = response.readEntity(ScopesList.class);
+ response.close();
+ return scopes;
+ }
+
+ private String getURI() {
+ return "http://localhost:" + serverConfig.getPort() + "/";
+ }
+
+ private Invocation.Builder invocationBuilder(String resourceUri, String username, String password) {
+ MultivaluedMap map = new MultivaluedHashMap<>();
+ map.addAll(HttpHeaders.AUTHORIZATION, TestUtils.basicAuthToken(username, password));
+ return client.target(resourceUri).request().headers(map);
+ }
+
+ private StreamConfiguration aStreamConfig() {
+ return StreamConfiguration.builder()
+ .scalingPolicy(ScalingPolicy.byEventRate(100, 2, 2))
+ .retentionPolicy(RetentionPolicy.byTime(Duration.ofMillis(123L)))
+ .build();
+ }
+
+ //endregion
+}
diff --git a/controller/src/test/java/io/pravega/controller/rest/v1/StreamMetaDataTests.java b/controller/src/test/java/io/pravega/controller/rest/v1/StreamMetaDataTests.java
index 0da1247388d..8c680e99310 100644
--- a/controller/src/test/java/io/pravega/controller/rest/v1/StreamMetaDataTests.java
+++ b/controller/src/test/java/io/pravega/controller/rest/v1/StreamMetaDataTests.java
@@ -526,11 +526,9 @@ public void testListScopes() throws ExecutionException, InterruptedException {
Response response = addAuthHeaders(client.target(resourceURI).request()).buildGet().invoke();
assertEquals("List Scopes response code", 200, response.getStatus());
assertTrue(response.bufferEntity());
- final ScopesList scopesList1 = response.readEntity(ScopesList.class);
- assertEquals("List count", scopesList1.getScopes().size(), 3);
- assertEquals("List element", scopesList1.getScopes().get(0).getScopeName(), "scope1");
- assertEquals("List element", scopesList1.getScopes().get(1).getScopeName(), "scope2");
- assertEquals("List element", scopesList1.getScopes().get(2).getScopeName(), "scope3");
+
+ verifyScopes(response.readEntity(ScopesList.class));
+
response.close();
// Test for list scopes failure.
@@ -542,6 +540,13 @@ public void testListScopes() throws ExecutionException, InterruptedException {
response.close();
}
+ protected void verifyScopes(ScopesList scopesList1) {
+ assertEquals(3, scopesList1.getScopes().size());
+ assertEquals("scope1", scopesList1.getScopes().get(0).getScopeName());
+ assertEquals("scope2", scopesList1.getScopes().get(1).getScopeName());
+ assertEquals("scope3", scopesList1.getScopes().get(2).getScopeName());
+ }
+
/**
* Test for listStreams REST API.
*
diff --git a/controller/src/test/java/io/pravega/controller/rest/v1/UserSecureStreamMetaDataTests.java b/controller/src/test/java/io/pravega/controller/rest/v1/UserSecureStreamMetaDataTests.java
index f66181f54f2..5a290da0dbb 100644
--- a/controller/src/test/java/io/pravega/controller/rest/v1/UserSecureStreamMetaDataTests.java
+++ b/controller/src/test/java/io/pravega/controller/rest/v1/UserSecureStreamMetaDataTests.java
@@ -9,6 +9,7 @@
*/
package io.pravega.controller.rest.v1;
+import io.pravega.controller.server.rest.generated.model.ScopesList;
import io.pravega.test.common.TestUtils;
import javax.ws.rs.client.Invocation;
@@ -16,6 +17,8 @@
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
+import static org.junit.Assert.assertEquals;
+
public class UserSecureStreamMetaDataTests extends SecureStreamMetaDataTests {
@Override
@@ -25,4 +28,10 @@ protected Invocation.Builder addAuthHeaders(Invocation.Builder request) {
return request.headers(map);
}
+ @Override
+ protected void verifyScopes(ScopesList scopesList1) {
+ assertEquals(2, scopesList1.getScopes().size());
+ assertEquals("scope1", scopesList1.getScopes().get(0).getScopeName());
+ assertEquals("scope2", scopesList1.getScopes().get(1).getScopeName());
+ }
}
diff --git a/controller/src/test/java/io/pravega/controller/server/AuthResourceRepresentationTest.java b/controller/src/test/java/io/pravega/controller/server/AuthResourceRepresentationTest.java
new file mode 100644
index 00000000000..cc5532ab306
--- /dev/null
+++ b/controller/src/test/java/io/pravega/controller/server/AuthResourceRepresentationTest.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (c) 2019 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.controller.server;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for the AuthResourceRepresentation class.
+ */
+public class AuthResourceRepresentationTest {
+
+ @Test
+ public void testOfScopesReturnsValidResourceStr() {
+ assertEquals("/", AuthResourceRepresentation.ofScopes());
+ }
+
+ @Test
+ public void testOfAScopeReturnsValidResourceStrWhenInputIsLegal() {
+ assertEquals("testScopeName", AuthResourceRepresentation.ofScope("testScopeName"));
+ }
+
+ @Test
+ public void testOfStreamsInScopeReturnsValidResourceStrWhenInputIsLegal() {
+ assertEquals("testScopeName", AuthResourceRepresentation.ofStreamsInScope("testScopeName"));
+ }
+
+ @Test (expected = NullPointerException.class)
+ public void testOfStreamsInScopeThrowsExceptionWhenInputIsNull() {
+ AuthResourceRepresentation.ofStreamsInScope(null);
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testOfStreamsInScopeThrowsExceptionWhenInputIsEmpty() {
+ AuthResourceRepresentation.ofStreamsInScope("");
+ }
+
+ @Test
+ public void testOfAStreamInScopeReturnsValidResourceStrWhenInputIsLegal() {
+ assertEquals("testScopeName/testStreamName",
+ AuthResourceRepresentation.ofStreamInScope("testScopeName", "testStreamName"));
+ }
+
+ @Test (expected = NullPointerException.class)
+ public void testOfAStreamInScopeThrowsExceptionWhenStreamNameIsNull() {
+ AuthResourceRepresentation.ofStreamInScope("testScopeName", null);
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testOfAStreamInScopeThrowsExceptionWhenStreamNameIsEmpty() {
+ AuthResourceRepresentation.ofStreamInScope("testScopeName", "");
+ }
+
+ @Test
+ public void testOfReaderGroupsInScopeReturnsValidResourceStrWhenInputIsLegal() {
+ assertEquals("scopeName", AuthResourceRepresentation.ofReaderGroupsInScope("scopeName"));
+ }
+
+ @Test
+ public void testOfAReaderGroupInScopeReturnsValidResourceStrWhenInputIsLegal() {
+ assertEquals("scopeName/readerGroupName",
+ AuthResourceRepresentation.ofReaderGroupInScope("scopeName", "readerGroupName"));
+ }
+}
\ No newline at end of file
diff --git a/controller/src/test/java/io/pravega/controller/server/rpc/auth/PravegaAuthManagerTest.java b/controller/src/test/java/io/pravega/controller/server/rpc/auth/PravegaAuthManagerTest.java
index 85c43b558f6..ba5e2bcb1b4 100644
--- a/controller/src/test/java/io/pravega/controller/server/rpc/auth/PravegaAuthManagerTest.java
+++ b/controller/src/test/java/io/pravega/controller/server/rpc/auth/PravegaAuthManagerTest.java
@@ -62,9 +62,8 @@ public void setUp() throws Exception {
writer.write("dummy:password:\n");
writer.write("dummy1:password:readresource;;\n");
writer.write("dummy2:password:readresource;specificresouce,READ;totalaccess,READ_UPDATE\n");
- writer.write("dummy3:" + passwordEncryptor.encryptPassword("password") + ":readresource;specificresouce,READ;totalaccess,READ_UPDATE\n");
+ writer.write("dummy3:" + passwordEncryptor.encryptPassword("password") + ":readresource;specificresouce,READ;readresource/*,READ;totalaccess,READ_UPDATE\n");
writer.write("dummy4:" + passwordEncryptor.encryptPassword("password") + ":readresource;specificresouce,READ;*,READ_UPDATE\n");
- writer.close();
}
}
diff --git a/controller/src/test/java/io/pravega/controller/server/rpc/auth/RESTAuthHelperTest.java b/controller/src/test/java/io/pravega/controller/server/rpc/auth/RESTAuthHelperTest.java
new file mode 100644
index 00000000000..06c39026c89
--- /dev/null
+++ b/controller/src/test/java/io/pravega/controller/server/rpc/auth/RESTAuthHelperTest.java
@@ -0,0 +1,120 @@
+/**
+ * Copyright (c) 2019 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.controller.server.rpc.auth;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static io.pravega.auth.AuthHandler.Permissions.*;
+
+import io.pravega.auth.AuthException;
+import io.pravega.controller.mocks.FakeAuthHandler;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+
+/**
+ * Unit tests for the RESTAuthHelper class.
+ */
+public class RESTAuthHelperTest {
+
+ private RESTAuthHelper authHelper;
+
+ @Before
+ public void init() {
+ PravegaAuthManager authManager = new PravegaAuthManager(null);
+ authManager.registerHandler(new FakeAuthHandler());
+ authHelper = new RESTAuthHelper(authManager);
+ }
+
+ @Test
+ public void testAuthOkWhenAuthMgrIsNull() throws AuthException {
+ RESTAuthHelper authHelper = new RESTAuthHelper(null);
+ assertTrue(authHelper.isAuthorized(null, null, null, null));
+
+ authHelper.authenticate(null);
+ authHelper.authorize(null, null, null, null);
+ authHelper.authenticateAuthorize(null, null, null);
+ }
+
+ @Test
+ public void testAuthOkForPrivilegedUserWhenCredentialsAreValid() throws AuthException {
+ String username = FakeAuthHandler.PRIVILEGED_USER;
+ String password = "whatever";
+
+ assertTrue(authHelper.isAuthorized(createAuthHeader(username, password), "/",
+ new UserPrincipal(username), READ_UPDATE));
+ }
+
+ @Test
+ public void testAuthFailsForUnknownUser() throws AuthException {
+ String username = "unknown";
+ String password = "whatever";
+
+ boolean authorized = authHelper.isAuthorized(
+ createAuthHeader(username, password),
+ "/",
+ new UserPrincipal(username),
+ READ);
+ assertFalse(authorized);
+ }
+
+ @Test
+ public void testAuthOkForUnprivilegedUserForAssignedPermission() throws AuthException {
+ String username = FakeAuthHandler.UNPRIVILEGED_USER;
+ String password = "whatever";
+
+ boolean authorized = authHelper.isAuthorized(
+ createAuthHeader(username, password),
+ "/",
+ new UserPrincipal(username),
+ READ);
+ assertFalse(authorized);
+ }
+
+ @Test
+ public void testAuthFailsForUnprivilegedUserForUnassignedPermission() throws AuthException {
+ String username = FakeAuthHandler.UNPRIVILEGED_USER;
+ String password = "whatever";
+
+ boolean authorized = authHelper.isAuthorized(
+ createAuthHeader(username, password),
+ "/",
+ new UserPrincipal(username),
+ READ_UPDATE);
+ assertFalse(authorized);
+ }
+
+ @Test
+ public void testAuthIsEnabledWhenPravegaAuthManagerIsNonNull() {
+ RESTAuthHelper sut = new RESTAuthHelper(new PravegaAuthManager(null));
+ assertTrue(sut.isAuthEnabled());
+ }
+
+ @Test
+ public void testAuthIsDisabledWhenPravegaAuthManagerIsNull() {
+ RESTAuthHelper sut = new RESTAuthHelper(null);
+ assertFalse(sut.isAuthEnabled());
+ }
+
+ private List createAuthHeader(String username, String password) {
+ String value = String.format("%s:%s", username, password);
+ String encodedCredentials = null;
+ try {
+ encodedCredentials = Base64.getEncoder().encodeToString(value.getBytes("utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ return Arrays.asList("Basic " + encodedCredentials);
+ }
+}
diff --git a/standalone/src/test/java/io/pravega/local/AuthEnabledInProcPravegaClusterTest.java b/standalone/src/test/java/io/pravega/local/AuthEnabledInProcPravegaClusterTest.java
new file mode 100644
index 00000000000..0b91103fa4f
--- /dev/null
+++ b/standalone/src/test/java/io/pravega/local/AuthEnabledInProcPravegaClusterTest.java
@@ -0,0 +1,161 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.local;
+
+import io.pravega.client.ClientConfig;
+import io.pravega.client.ClientFactory;
+import io.pravega.client.admin.ReaderGroupManager;
+import io.pravega.client.admin.StreamManager;
+
+import io.pravega.client.stream.EventStreamWriter;
+import io.pravega.client.stream.ReinitializationRequiredException;
+import io.pravega.client.stream.ScalingPolicy;
+import io.pravega.client.stream.StreamConfiguration;
+import io.pravega.client.stream.impl.JavaSerializer;
+import io.pravega.client.stream.EventWriterConfig;
+import io.pravega.client.stream.ReaderGroupConfig;
+import io.pravega.client.stream.Stream;
+import io.pravega.client.stream.EventStreamReader;
+import io.pravega.client.stream.ReaderConfig;
+import io.pravega.client.stream.impl.DefaultCredentials;
+import io.pravega.test.common.AssertExtensions;
+import java.net.URI;
+import java.util.UUID;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration tests for auth enabled in-process standalone cluster.
+ */
+@Slf4j
+public class AuthEnabledInProcPravegaClusterTest extends InProcPravegaClusterTest {
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ this.authEnabled = true;
+ this.tlsEnabled = false;
+ super.setUp();
+ }
+
+ @Override
+ public void createTestStream() {
+ // This test from the base is unnecessary in this class as that and more is already being
+ // tested in another test method.
+ }
+
+ /**
+ * Compares reads and writes to verify that an in-process Pravega cluster runs properly with
+ * with valid auth (authentication and authorization) configuration.
+ *
+ * Note:
+ * Strictly speaking, this test is really an "integration test" and is a little time consuming. For now, its
+ * intended to also run as a unit test, but it could be moved to an integration test suite if and when necessary.
+ *
+ */
+ @Test(timeout = 50000)
+ public void testWriteAndReadEventsWhenConfigIsProper() throws ReinitializationRequiredException {
+ String scope = "org.example.auth";
+ String streamName = "stream1";
+ int numSegments = 1;
+
+ ClientConfig clientConfig = ClientConfig.builder()
+ .credentials(new DefaultCredentials("1111_aaaa", "admin"))
+ .controllerURI(URI.create(localPravega.getInProcPravegaCluster().getControllerURI()))
+ .build();
+ log.info("Done creating client config");
+
+ @Cleanup
+ StreamManager streamManager = StreamManager.create(clientConfig);
+ log.info("Created a stream manager");
+
+ streamManager.createScope(scope);
+ log.info("Created a scope [{}]", scope);
+
+ streamManager.createStream(scope, streamName, StreamConfiguration.builder()
+ .scalingPolicy(ScalingPolicy.fixed(numSegments))
+ .build());
+ log.info("Created a stream with name [{}]", streamName);
+
+ @Cleanup
+ ClientFactory clientFactory = ClientFactory.withScope(scope, clientConfig);
+
+ @Cleanup
+ EventStreamWriter writer = clientFactory.createEventWriter(streamName,
+ new JavaSerializer(),
+ EventWriterConfig.builder().build());
+ log.info("Got a writer");
+
+ String writeEvent1 = "This is event 1";
+ writer.writeEvent(writeEvent1);
+ log.info("Done writing event [{}]", writeEvent1);
+
+ String writeEvent2 = "This is event 2";
+ writer.writeEvent(writeEvent2);
+ log.info("Done writing event [{}]", writeEvent2);
+
+ // Now, read the events from the stream.
+
+ String readerGroup = UUID.randomUUID().toString().replace("-", "");
+ ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
+ .stream(Stream.of(scope, streamName))
+ .disableAutomaticCheckpoints()
+ .build();
+
+ @Cleanup
+ ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, clientConfig);
+ readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
+
+ @Cleanup
+ EventStreamReader reader = clientFactory.createReader(
+ "readerId", readerGroup,
+ new JavaSerializer(), ReaderConfig.builder().build());
+
+ // Keeping the read timeout large so that there is ample time for reading the event even in
+ // case of abnormal delays in test environments.
+ String readEvent1 = reader.readNextEvent(10000).getEvent();
+ log.info("Done reading event [{}]", readEvent1);
+
+ String readEvent2 = reader.readNextEvent(10000).getEvent();
+ log.info("Done reading event [{}]", readEvent2);
+
+ assertEquals(writeEvent1, readEvent1);
+ assertEquals(writeEvent2, readEvent2);
+ }
+
+ @Test
+ public void testCreateStreamFailsWhenCredentialsAreInvalid() {
+ Assert.assertNotNull("Pravega not initialized", localPravega);
+ String scope = "Scope";
+
+ ClientConfig clientConfig = ClientConfig.builder()
+ .credentials(new DefaultCredentials("", ""))
+ .controllerURI(URI.create(localPravega.getInProcPravegaCluster().getControllerURI()))
+ .build();
+
+ @Cleanup
+ StreamManager streamManager = StreamManager.create(clientConfig);
+
+ AssertExtensions.assertThrows(RuntimeException.class,
+ () -> streamManager.createScope(scope));
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+}
\ No newline at end of file
diff --git a/standalone/src/test/java/io/pravega/local/PartialSecurePravegaClusterTest.java b/standalone/src/test/java/io/pravega/local/PartialSecurePravegaClusterTest.java
deleted file mode 100644
index 214649cb3c6..00000000000
--- a/standalone/src/test/java/io/pravega/local/PartialSecurePravegaClusterTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.pravega.local;
-
-import io.pravega.client.ClientConfig;
-import io.pravega.client.admin.StreamManager;
-import io.pravega.client.stream.impl.DefaultCredentials;
-import io.pravega.test.common.AssertExtensions;
-import java.net.URI;
-import lombok.Cleanup;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Unit tests for secure standalone cluster.
- */
-public class PartialSecurePravegaClusterTest extends InProcPravegaClusterTest {
- @Before
- @Override
- public void setUp() throws Exception {
- this.authEnabled = true;
- this.tlsEnabled = false;
- super.setUp();
- }
-
- /**
- * Create the test stream.
- *
- * @throws Exception on any errors.
- */
- @Test
- public void failingCreateTestStream() throws Exception {
- Assert.assertNotNull("Pravega not initialized", localPravega);
- String scope = "Scope";
- String streamName = "Stream";
- int numSegments = 10;
-
- ClientConfig clientConfig = ClientConfig.builder()
- .credentials(new DefaultCredentials("", ""))
- .controllerURI(URI.create(localPravega.getInProcPravegaCluster().getControllerURI()))
- .build();
- @Cleanup
- StreamManager streamManager = StreamManager.create(clientConfig);
-
- AssertExtensions.assertThrows(RuntimeException.class,
- () -> streamManager.createScope(scope));
- }
-
- @After
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- }
-}
\ No newline at end of file