Skip to content

Commit

Permalink
Merge pull request #186 from ing-bank/feature/auth-interface
Browse files Browse the repository at this point in the history
Introducing authorization interface
  • Loading branch information
kr7ysztof authored Nov 1, 2023
2 parents a94a6c5 + a412591 commit 8ad56ba
Show file tree
Hide file tree
Showing 18 changed files with 319 additions and 255 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/feature.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ jobs:
- uses: actions/checkout@v2
with:
fetch-depth: '0'
- name: Setup Scala
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: "17"
- name: Build and publish docker image
run: |
# Login to docker
Expand Down
46 changes: 14 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,35 +176,6 @@ debug civetweb = 20

2. Restart rgw process (either docker stop <ceph/daemon rgw> or whole ceph/demo)

# Lineage to Atlas

Currently it is possible to create lineage based on incoming request to proxy server. It is however disabled by
default (preview feature). To enable lineage shipment to Atlas, following setting has to be added to application.conf:

```
rokku {
atlas {
enabled = true
}
}
```

As alternative environment value `ROKKU_ATLAS_ENABLED` should be set to true.

Lineage is done according to model

![alt text](./docs/img/atlas_model.jpg)

To check lineage that has been created, login to Atlas web UI console, [default url](http://localhost:21000) with
admin user and password

## Classifications and metadata

You can set classifications and metadata to objects in lineage by setting http headers:

* **rokku-metadata** - key value pair in format _key1=val1,key2=val2_ - the matadata is presented in lineage entity as "awsTags" properties.
* **rokku-classifications** - comma separated classifications names (the classifications must exist)

# Events Notification

Rokku can send event notification to message queue based on user requests, in [AWS format](https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html).
Expand Down Expand Up @@ -338,15 +309,26 @@ ROKKU_KERBEROS_KEYTAB: "keytab_full_path"
ROKKU_KERBEROS_PRINCIPAL: "user"
```

# Ranger Audit Log
# Authorization plugin

By default rokku uses Apache Ranger for authorization but you can change it.
To provide other implementation you need:

1. implement [AccessControl](./src/main/java/com/ing/wbaa/rokku/proxy/security/AccessControl.java) - example is [Ranger](./src/main/scala/com/ing/wbaa/rokku/proxy/provider/AccessControlProviderRanger.scala)
2. configure
* set the access control class in the config [access-control.class-name](./src/main/resources/reference.conf) or environment ```ROKKU_ACCESS_CONTROL_CLASS_NAME=...```
* if you need any specific configuration for the plugin add it to [access-control.plugin-param](./src/main/resources/reference.conf)
3. add the access control class to rokku classpath

# Authorization Audit Log

To enable the log set:

```bash
ROKKU_RANGER_ENABLED_AUDIT="true"
ROKKU_ENABLED_AUDIT="true"
```

and provide on the classpath the ranger-s3-audit.xml configuration.
For AccessControlProviderRanger you need to provide on the classpath the ranger-s3-audit.xml configuration.

# ECS multi namespace support

Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ val metricVersion = "4.2.12"

libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"ch.qos.logback" % "logback-classic" % "1.4.5",
"ch.qos.logback" % "logback-classic" % "1.4.7",
"ch.qos.logback.contrib" % "logback-json-classic" % logbackJson,
"ch.qos.logback.contrib" % "logback-jackson" % logbackJson exclude("com.fasterxml.jackson.core", "jackson-databind"),
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
Expand All @@ -33,19 +33,19 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion,
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.470",
"org.apache.kafka" % "kafka-clients" % "3.3.2",
"org.apache.kafka" % "kafka-clients" % "3.4.0",
"org.apache.ranger" % "ranger-plugins-common" % "2.4.0" exclude("org.eclipse.jetty", "jetty-io") exclude("com.amazonaws", "aws-java-sdk-bundle") exclude("org.elasticsearch", "elasticsearch-x-content") exclude("org.elasticsearch", "elasticsearch") exclude("org.apache.hadoop", "hadoop-common"),
"org.apache.hadoop" % "hadoop-common" % "3.3.5" exclude("org.apache.hadoop.thirdparty", "hadoop-shaded-protobuf_3_7") exclude("org.eclipse.jetty", "jetty-io") exclude("org.apache.zookeeper", "zookeeper") exclude("com.google.protobuf", "protobuf-java"), //needed for ranger 2.3.0 - if vulnerabilities are fixed remove this
"com.lightbend.akka" %% "akka-stream-alpakka-xml"% "3.0.4",
"io.dropwizard.metrics" % "metrics-core" % metricVersion,
"io.dropwizard.metrics" % "metrics-jmx" % metricVersion,
"com.auth0" % "java-jwt" % "4.2.1",
"com.auth0" % "java-jwt" % "4.3.0",
"com.github.cb372" %% "scalacache-core" % "0.28.0",
"com.github.cb372" %% "scalacache-caffeine" % "0.28.0",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.16" % "it,test",
"com.amazonaws" % "aws-java-sdk-sts" % "1.12.470" % IntegrationTest,
"com.amazonaws" % "aws-java-sdk-sts" % "1.12.471" % IntegrationTest,
)
dependencyOverrides ++= Seq(
"net.minidev" % "json-smart" % "2.4.11",
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.7.1
sbt.version =1.8.3
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RokkuS3ProxyItTest extends AsyncWordSpec with Diagrams
def withSdkToMockProxy(testCode: (AWSSecurityTokenService, Authority) => Future[Assertion]): Future[Assertion] = {
val proxy: RokkuS3Proxy = new RokkuS3Proxy with RequestHandlerS3
with FilterRecursiveListBucketHandler with AuthenticationProviderSTS
with AuthorizationProviderRanger with SignatureProviderAws
with AccessControlProvider with SignatureProviderAws
with MessageProviderKafka with AuditLogProvider with MemoryUserRequestQueue with RequestParser {
override implicit lazy val system: ActorSystem = testSystem
override def materializer: Materializer = Materializer(system)
Expand All @@ -87,7 +87,7 @@ class RokkuS3ProxyItTest extends AsyncWordSpec with Diagrams
override val stsSettings: StsSettings = StsSettings(testSystem)
override val kafkaSettings: KafkaSettings = KafkaSettings(testSystem)

override protected def rangerSettings: RangerSettings = RangerSettings(testSystem)
override protected def accessControlProviderSettings: AccessControlProviderSettings = AccessControlProviderSettings(testSystem)

override def isUserAuthorizedForRequest(request: S3Request, user: User)(implicit id: RequestId): Boolean = {
user match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.ing.wbaa.rokku.proxy.provider
import java.net.InetAddress
import akka.actor.ActorSystem
import akka.http.scaladsl.model.RemoteAddress
import com.ing.wbaa.rokku.proxy.config.RangerSettings
import com.ing.wbaa.rokku.proxy.config.AccessControlProviderSettings
import com.ing.wbaa.rokku.proxy.data._
import com.ing.wbaa.rokku.proxy.handler.exception.RokkuListingBucketsException
import org.scalatest.Assertion
Expand All @@ -12,7 +12,7 @@ import org.scalatest.wordspec.AsyncWordSpec

import scala.concurrent.Future

class AuthorizationProviderRangerItTest extends AsyncWordSpec with Diagrams {
class AccessControlProviderRangerItTest extends AsyncWordSpec with Diagrams {
final implicit val testSystem: ActorSystem = ActorSystem.create("test-system")

implicit val requestId: RequestId = RequestId("test")
Expand Down Expand Up @@ -57,11 +57,11 @@ class AuthorizationProviderRangerItTest extends AsyncWordSpec with Diagrams {
* @param testCode Code that accepts the created authorization provider
* @return Assertion
*/
def withAuthorizationProviderRanger(rangerTestSettings: RangerSettings =
RangerSettings(testSystem))
(testCode: AuthorizationProviderRanger => Future[Assertion]): Future[Assertion] = {
testCode(new AuthorizationProviderRanger {
override def rangerSettings: RangerSettings = rangerTestSettings
def withAuthorizationProviderRanger(rangerTestSettings: AccessControlProviderSettings =
AccessControlProviderSettings(testSystem))
(testCode: AccessControlProvider => Future[Assertion]): Future[Assertion] = {
testCode(new AccessControlProvider {
override def accessControlProviderSettings: AccessControlProviderSettings = rangerTestSettings
})
}

Expand Down Expand Up @@ -108,14 +108,14 @@ class AuthorizationProviderRangerItTest extends AsyncWordSpec with Diagrams {
accessType = Read(), clientIPAddress = clientIPAddress, headerIPs = headerIPs), user))
}

"throw exception allow-list-buckets set to false" in withAuthorizationProviderRanger(new RangerSettings(testSystem.settings.config) {
"throw exception allow-list-buckets set to false" in withAuthorizationProviderRanger(new AccessControlProviderSettings(testSystem.settings.config) {
override val listBucketsEnabled: Boolean = false
}) { apr =>
assertThrows[RokkuListingBucketsException](apr.isUserAuthorizedForRequest(s3Request.copy(s3BucketPath = None, s3Object = None,
accessType = Read(), clientIPAddress = clientIPAddress, headerIPs = headerIPs), user))
}

"does not authorize allow-create-delete-buckets set to false" in withAuthorizationProviderRanger(new RangerSettings(testSystem.settings.config) {
"does not authorize allow-create-delete-buckets set to false" in withAuthorizationProviderRanger(new AccessControlProviderSettings(testSystem.settings.config) {
override val createDeleteBucketsEnabled: Boolean = false
}) { apr =>
assert(!apr.isUserAuthorizedForRequest(s3Request.copy(s3Object = None, accessType = Put(),
Expand All @@ -124,25 +124,25 @@ class AuthorizationProviderRangerItTest extends AsyncWordSpec with Diagrams {
clientIPAddress = clientIPAddress, headerIPs = headerIPs), user))
}

"does authorize creating bucket for an admin" in withAuthorizationProviderRanger(new RangerSettings(testSystem.settings.config) {
"does authorize creating bucket for an admin" in withAuthorizationProviderRanger(new AccessControlProviderSettings(testSystem.settings.config) {
}) { apr =>
assert(apr.isUserAuthorizedForRequest(s3Request.copy(s3Object = None, accessType = Put(),
clientIPAddress = clientIPAddress, headerIPs = headerIPs), adminUser))
}

"does authorize deleting bucket for an admin" in withAuthorizationProviderRanger(new RangerSettings(testSystem.settings.config) {
"does authorize deleting bucket for an admin" in withAuthorizationProviderRanger(new AccessControlProviderSettings(testSystem.settings.config) {
}) { apr =>
assert(apr.isUserAuthorizedForRequest(s3Request.copy(s3Object = None, accessType = Delete(),
clientIPAddress = clientIPAddress, headerIPs = headerIPs), adminUser))
}

"does not authorize creating bucket for a user" in withAuthorizationProviderRanger(new RangerSettings(testSystem.settings.config) {
"does not authorize creating bucket for a user" in withAuthorizationProviderRanger(new AccessControlProviderSettings(testSystem.settings.config) {
}) { apr =>
assert(!apr.isUserAuthorizedForRequest(s3Request.copy(s3Object = None, accessType = Put(),
clientIPAddress = clientIPAddress, headerIPs = headerIPs), user))
}

"does not authorize deleting bucket for a user" in withAuthorizationProviderRanger(new RangerSettings(testSystem.settings.config) {
"does not authorize deleting bucket for a user" in withAuthorizationProviderRanger(new AccessControlProviderSettings(testSystem.settings.config) {
}) { apr =>
assert(!apr.isUserAuthorizedForRequest(s3Request.copy(s3Object = None, accessType = Delete(),
clientIPAddress = clientIPAddress, headerIPs = headerIPs), user))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.ing.wbaa.rokku.proxy.security;

public interface AccessControl {
String AUDIT_ENABLED_PARAM = "auditEnabled";

void init();

boolean isAccessAllowed(AccessControlRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ing.wbaa.rokku.proxy.security;

import java.util.List;
import java.util.Set;
public record AccessControlRequest(
String user,
Set<String> userGroups,
String userRole,
String path,
String accessType,
String action,
String clientIpAddress,
String remoteIpAddress,
List<String> forwardedIpAddresses) {
}
13 changes: 8 additions & 5 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ rokku {
bind = ${?ROKKU_HTTP_BIND}
port = ${?ROKKU_HTTP_PORT}
}
ranger {
access-control {
allow-list-buckets = ${?ROKKU_ALLOW_LIST_BUCKETS}
allow-create-delete-buckets = ${?ROKKU_ALLOW_CREATE_DELETE_BUCKETS}
user-domain-postfix = ${?ROKKU_RANGER_USER_DOMAIN_POSTFIX}
enabled-audit = ${?ROKKU_RANGER_ENABLED_AUDIT}
role-prefix = ${?ROKKU_RANGER_ROLE_PREFIX}
app_id = ${?ROKKU_RANGER_API_ID}
enabled-audit = ${?ROKKU_ENABLED_AUDIT}
class-name = ${?ROKKU_ACCESS_CONTROL_CLASS_NAME}
plugin-params {
appId = ${?ROKKU_RANGER_API_ID}
userDomainPostfix = ${?ROKKU_RANGER_USER_DOMAIN_POSTFIX}
rolePrefix = ${?ROKKU_RANGER_ROLE_PREFIX}
}
}
storage.s3 {
# Settings for reaching backing storage.
Expand Down
19 changes: 10 additions & 9 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ rokku {
port = 8987
}

ranger {
# Settings for reaching Ranger.

# make sure the service_type is equal to what is specified in
# ranger-s3-security.xml
service_type = "s3"
app_id = "testservice"
access-control {
allow-list-buckets = true
allow-create-delete-buckets = true
user-domain-postfix = ""
enabled-audit = false
role-prefix = "role_"
class-name = "com.ing.wbaa.rokku.proxy.provider.AccessControlProviderRanger"
plugin-params {
appId = "testservice"
# make sure the service_type is equal to what is specified in
# ranger-s3-security.xml
serviceType = "s3"
userDomainPostfix = ""
rolePrefix = "role_"
}
}

storage.s3 {
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/ing/wbaa/rokku/proxy/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import com.ing.wbaa.rokku.proxy.queue.MemoryUserRequestQueue

object Server extends App {

new RokkuS3Proxy with AuthorizationProviderRanger with RequestHandlerS3WithNamespaces with AuthenticationCachedProviderSTS with SignatureProviderAws with KerberosLoginProvider with FilterRecursiveListBucketHandler with MessageProviderKafka with AuditLogProvider with MemoryUserRequestQueue with RequestParser {
new RokkuS3Proxy with AccessControlProvider with RequestHandlerS3WithNamespaces with AuthenticationCachedProviderSTS with SignatureProviderAws with KerberosLoginProvider with FilterRecursiveListBucketHandler with MessageProviderKafka with AuditLogProvider with MemoryUserRequestQueue with RequestParser {

override implicit lazy val system: ActorSystem = ActorSystem.create("rokku")
override implicit def materializer: Materializer = Materializer(system)

override def kerberosSettings: KerberosSettings = KerberosSettings(system)

override val httpSettings: HttpSettings = HttpSettings(system)
override val rangerSettings: RangerSettings = RangerSettings(system)
override val accessControlProviderSettings: AccessControlProviderSettings = AccessControlProviderSettings(system)
override val storageS3Settings: StorageS3Settings = StorageS3Settings(system)
override val stsSettings: StsSettings = StsSettings(system)
override val kafkaSettings: KafkaSettings = KafkaSettings(system)
override val namespaceSettings: NamespaceSettings = NamespaceSettings(system)

// Force Ranger plugin to initialise on startup
rangerPluginForceInit
init()
}.startup

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.ing.wbaa.rokku.proxy.config

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import com.typesafe.config.Config

import scala.jdk.CollectionConverters._

class AccessControlProviderSettings(config: Config) extends Extension {
val listBucketsEnabled: Boolean = config.getBoolean("rokku.access-control.allow-list-buckets")
val createDeleteBucketsEnabled: Boolean = config.getBoolean("rokku.access-control.allow-create-delete-buckets")
val auditEnabled: Boolean = config.getBoolean("rokku.access-control.enabled-audit")
val className: String = config.getString("rokku.access-control.class-name")
val pluginParams: Map[String, String] = config.getConfig("rokku.access-control.plugin-params")
.entrySet().asScala.map(e => e.getKey -> e.getValue.unwrapped().toString).toMap
}

object AccessControlProviderSettings extends ExtensionId[AccessControlProviderSettings] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): AccessControlProviderSettings = new AccessControlProviderSettings(system.settings.config)

override def lookup: ExtensionId[AccessControlProviderSettings] = AccessControlProviderSettings
}

This file was deleted.

Loading

0 comments on commit 8ad56ba

Please sign in to comment.