From 15b4daa9e6fa75c76d5c1ef3b6fabaa1d8083499 Mon Sep 17 00:00:00 2001 From: Samudro Sinha Date: Mon, 29 Dec 2025 15:15:22 +0530 Subject: [PATCH 1/6] auth check part 2 --- .../src/main/resources/reference.conf | 2 + .../org/elasticmq/rest/sqs/SQSException.scala | 9 ++++ .../rest/sqs/SQSRestServerBuilder.scala | 41 +++++++++++++++---- .../directives/AWSCredentialDirectives.scala | 32 +++++++++++++++ 4 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala diff --git a/rest/rest-sqs/src/main/resources/reference.conf b/rest/rest-sqs/src/main/resources/reference.conf index 72ff4b99..8a0e50ea 100644 --- a/rest/rest-sqs/src/main/resources/reference.conf +++ b/rest/rest-sqs/src/main/resources/reference.conf @@ -17,6 +17,8 @@ rest-sqs { bind-hostname = "0.0.0.0" # Possible values: relaxed, strict sqs-limits = strict + aws-access-key-id = "" + aws-secret-access-key = "" } rest-stats { diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala index e1c2f0c7..7cbd97a9 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSException.scala @@ -130,6 +130,15 @@ object SQSException { errorMessage = Some("BatchEntryIdsNotDistinct") ) + def invalidClientTokenId(message: String): SQSException = { + new SQSException( + "InvalidClientTokenId", + 403, + "AuthFailure", + Some(message) + ) + } + def tooManyEntriesInBatchRequest: SQSException = new SQSException( "AWS.SimpleQueueService.TooManyEntriesInBatchRequest", errorType = "com.amazonaws.sqs#TooManyEntriesInBatchRequest", diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala index 6c9fad6b..9a9971c3 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala @@ -12,6 +12,7 @@ import org.elasticmq.metrics.QueuesMetrics import org.elasticmq.rest.sqs.Constants._ import org.elasticmq.rest.sqs.XmlNsVersion.extractXmlNs import org.elasticmq.rest.sqs.directives.{ + AWSCredentialDirectives, AWSProtocolDirectives, AnyParamDirectives, ElasticMQDirectives, @@ -127,6 +128,10 @@ case class TheSQSRestServerBuilder( this.copy(queueEventListener = Some(_queueEventListener)) def start(): SQSRestServer = { + val rootConfig = ConfigFactory.load() + val restSqsConfig = rootConfig.getConfig("elasticmq.rest-sqs") + val credentials = AWSCredentials.fromConfig(restSqsConfig) + val (theActorSystem, stopActorSystem) = getOrCreateActorSystem val theQueueManagerActor = getOrCreateQueueManagerActor(theActorSystem) val theServerAddress = @@ -173,7 +178,9 @@ case class TheSQSRestServerBuilder( with ListDeadLetterSourceQueuesDirectives with StartMessageMoveTaskDirectives with CancelMessageMoveTaskDirectives - with ListMessageMoveTasksDirectives { + with ListMessageMoveTasksDirectives + with AWSCredentialsModule + with AWSCredentialDirectives { def serverAddress = currentServerAddress.get() @@ -183,6 +190,7 @@ case class TheSQSRestServerBuilder( lazy val sqsLimits = theLimits lazy val timeout = Timeout(21, TimeUnit.SECONDS) // see application.conf lazy val contextPath = serverAddress.contextPathStripped + lazy val awsCredentials: AWSCredentials = credentials lazy val awsRegion: String = _awsRegion lazy val awsAccountId: String = _awsAccountId @@ -235,13 +243,15 @@ case class TheSQSRestServerBuilder( implicit val protocol: AWSProtocol = _protocol handleServerExceptions(protocol) { handleRejectionsWithSQSError(protocol) { - anyParamsMap(protocol) { p => - val marshallerDependencies = MarshallerDependencies(protocol, version) - if (config.debug) { - logRequestResult("") { - rawRoutes(p)(marshallerDependencies) - } - } else rawRoutes(p)(marshallerDependencies) + verifyAWSAccessKeyId(protocol) { + anyParamsMap(protocol) { p => + val marshallerDependencies = MarshallerDependencies(protocol, version) + if (config.debug) { + logRequestResult("") { + rawRoutes(p)(marshallerDependencies) + } + } else rawRoutes(p)(marshallerDependencies) + } } } } @@ -506,6 +516,21 @@ trait SQSLimitsModule { def sqsLimits: Limits } +trait AWSCredentialsModule { + def awsCredentials: AWSCredentials +} + +case class AWSCredentials(accessKey: String, secretKey: String) + +object AWSCredentials { + def fromConfig(config: com.typesafe.config.Config): AWSCredentials = { + AWSCredentials( + config.getString("aws-access-key-id"), + config.getString("aws-secret-access-key") + ) + } +} + class ElasticMQConfig { private lazy val rootConfig = ConfigFactory.load() private lazy val elasticMQConfig = rootConfig.getConfig("elasticmq") diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala new file mode 100644 index 00000000..1d476160 --- /dev/null +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala @@ -0,0 +1,32 @@ +package org.elasticmq.rest.sqs.directives + +import org.apache.pekko.http.scaladsl.server.{Directive0, Directives} +import org.elasticmq.rest.sqs.{AWSCredentialsModule, AWSProtocol, SQSException} + +trait AWSCredentialDirectives extends Directives { + this: AWSCredentialsModule with ElasticMQDirectives => + + private val accessKeyRegex = "Credential=([^/]+)/".r + + def verifyAWSAccessKeyId(protocol: AWSProtocol): Directive0 = { + if (awsCredentials.accessKey.nonEmpty) { + headerValueByName("Authorization").flatMap { authHeader => + accessKeyRegex.findFirstMatchIn(authHeader) match { + case Some(m) if m.group(1) == awsCredentials.accessKey => pass + case _ => + complete( + SQSException.invalidClientTokenId( + "The security token included in the request is invalid." + ) + ) + } + } | complete( + SQSException.invalidClientTokenId( + "The security token included in the request is invalid." + ) + ) + } else { + pass + } + } +} From 85611bee40df5cdf3fe469014b15061ded078157 Mon Sep 17 00:00:00 2001 From: Samudro Sinha Date: Mon, 29 Dec 2025 15:31:50 +0530 Subject: [PATCH 2/6] fix --- .../elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala index 1d476160..c391031b 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala @@ -14,7 +14,7 @@ trait AWSCredentialDirectives extends Directives { accessKeyRegex.findFirstMatchIn(authHeader) match { case Some(m) if m.group(1) == awsCredentials.accessKey => pass case _ => - complete( + failWith( SQSException.invalidClientTokenId( "The security token included in the request is invalid." ) From e6287dd0b29c024467c008e883f5b64f3d8b9220 Mon Sep 17 00:00:00 2001 From: Samudro Sinha Date: Mon, 29 Dec 2025 15:36:44 +0530 Subject: [PATCH 3/6] fix --- .../rest/sqs/directives/AWSCredentialDirectives.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala index c391031b..7aa3a1dd 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala @@ -13,12 +13,7 @@ trait AWSCredentialDirectives extends Directives { headerValueByName("Authorization").flatMap { authHeader => accessKeyRegex.findFirstMatchIn(authHeader) match { case Some(m) if m.group(1) == awsCredentials.accessKey => pass - case _ => - failWith( - SQSException.invalidClientTokenId( - "The security token included in the request is invalid." - ) - ) + case _ => reject } } | complete( SQSException.invalidClientTokenId( From 82eea2bd1bc5ad0a8dd72a5b4f0e86bc2f811652 Mon Sep 17 00:00:00 2001 From: Samudro Sinha Date: Mon, 29 Dec 2025 15:39:44 +0530 Subject: [PATCH 4/6] fix --- .../directives/AWSCredentialDirectives.scala | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala index 7aa3a1dd..500a87ee 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/directives/AWSCredentialDirectives.scala @@ -10,16 +10,27 @@ trait AWSCredentialDirectives extends Directives { def verifyAWSAccessKeyId(protocol: AWSProtocol): Directive0 = { if (awsCredentials.accessKey.nonEmpty) { - headerValueByName("Authorization").flatMap { authHeader => - accessKeyRegex.findFirstMatchIn(authHeader) match { - case Some(m) if m.group(1) == awsCredentials.accessKey => pass - case _ => reject - } - } | complete( - SQSException.invalidClientTokenId( - "The security token included in the request is invalid." - ) - ) + // Optional header in case it's missing + optionalHeaderValueByName("Authorization").flatMap { + case Some(authHeader) => + accessKeyRegex.findFirstMatchIn(authHeader) match { + case Some(m) if m.group(1) == awsCredentials.accessKey => + pass + case _ => + // Must return a Directive0 here + complete( + SQSException.invalidClientTokenId( + "The security token included in the request is invalid." + ) + ) + } + case None => + complete( + SQSException.invalidClientTokenId( + "The security token included in the request is invalid." + ) + ) + } } else { pass } From 338406b1c0ec042ba33b1c494755fbf1a092271d Mon Sep 17 00:00:00 2001 From: Samudro Sinha Date: Mon, 29 Dec 2025 15:44:38 +0530 Subject: [PATCH 5/6] fix --- .../scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala index 9a9971c3..c00e8f17 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/SQSRestServerBuilder.scala @@ -129,7 +129,7 @@ case class TheSQSRestServerBuilder( def start(): SQSRestServer = { val rootConfig = ConfigFactory.load() - val restSqsConfig = rootConfig.getConfig("elasticmq.rest-sqs") + val restSqsConfig = rootConfig.getConfig("rest-sqs") val credentials = AWSCredentials.fromConfig(restSqsConfig) val (theActorSystem, stopActorSystem) = getOrCreateActorSystem From a5517f7b932d74c1bc30e57509599b75e3ca1e40 Mon Sep 17 00:00:00 2001 From: Samudro Sinha Date: Mon, 29 Dec 2025 18:09:12 +0530 Subject: [PATCH 6/6] sql fix --- .../persistence/sql/MessageRepository.scala | 12 ++++++------ .../elasticmq/persistence/sql/QueueRepository.scala | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala index 386e57d6..eff8031d 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/MessageRepository.scala @@ -19,7 +19,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging { sql""" create table if not exists $tableName ( - message_id varchar unique, + message_id varchar(255) not null unique, delivery_receipts blob, next_delivery bigint, content blob, @@ -27,11 +27,11 @@ class MessageRepository(queueName: String, db: DB) extends Logging { created bigint, received bigint, receive_count int, - group_id varchar, - deduplication_id varchar, - tracing_id varchar, - sequence_number varchar, - dead_letter_source_queue_name varchar + group_id varchar(255), + deduplication_id varchar(255), + tracing_id varchar(255), + sequence_number varchar(255), + dead_letter_source_queue_name varchar(255) )""".execute.apply() def drop(): Unit = { diff --git a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala index 830125c9..500d31c5 100644 --- a/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala +++ b/persistence/persistence-sql/src/main/scala/org/elasticmq/persistence/sql/QueueRepository.scala @@ -17,7 +17,7 @@ class QueueRepository(db: DB) extends Logging { sql""" create table if not exists $tableName ( - name varchar unique, + name varchar(255) not null unique, data blob )""".execute.apply()