From 852180296c35b6ea7c69406625db3b8f1965d8f0 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Thu, 9 Jan 2025 13:51:32 +0000 Subject: [PATCH] Add config builder for DownstreamTarget to avoid ambiguity (#2804) * Add config builder for DownstreamTarget to avoid ambiguity * Apply auto-formatting rules --------- Co-authored-by: Github on behalf of Wellcome Collection --- .../main/scala/weco/lambda/Downstream.scala | 27 +++++++++ .../scala/weco/lambda/DownstreamTest.scala | 55 +++++++++++++++++++ .../src/main/resources/application.conf | 4 +- .../pipeline/batcher/lib/BatcherConfig.scala | 17 +----- .../src/main/resources/application.conf | 3 +- .../lib/RelationEmbedderConfig.scala | 9 +-- 6 files changed, 91 insertions(+), 24 deletions(-) create mode 100644 common/lambda/src/test/scala/weco/lambda/DownstreamTest.scala diff --git a/common/lambda/src/main/scala/weco/lambda/Downstream.scala b/common/lambda/src/main/scala/weco/lambda/Downstream.scala index fed30504d5..1680eca0d0 100644 --- a/common/lambda/src/main/scala/weco/lambda/Downstream.scala +++ b/common/lambda/src/main/scala/weco/lambda/Downstream.scala @@ -1,9 +1,12 @@ package weco.lambda +import com.typesafe.config.Config +import grizzled.slf4j.Logging import io.circe.Encoder import software.amazon.awssdk.services.sns.SnsClient import weco.messaging.sns.{SNSConfig, SNSMessageSender} import weco.json.JsonUtil.toJson +import weco.messaging.typesafe.SNSBuilder.buildSNSConfig import scala.util.Try @@ -44,3 +47,27 @@ object Downstream { } def apply(): Downstream = STDIODownstream } + +// Typesafe specific configuration builder +object DownstreamBuilder extends Logging { + import weco.typesafe.config.builders.EnrichConfig._ + + def buildDownstreamTarget(config: Config): DownstreamTarget = { + config.getStringOption("downstream.target") match { + case Some("sns") => + val snsConfig = buildSNSConfig(config) + info(s"Building SNS downstream with config: $snsConfig") + SNS(snsConfig) + case Some("stdio") => + info("Building StdOut downstream") + StdOut + case Some(unknownTarget) => + throw new IllegalArgumentException( + s"Invalid downstream target: $unknownTarget" + ) + case None => + warn("No downstream target specified, defaulting to StdOut") + StdOut + } + } +} diff --git a/common/lambda/src/test/scala/weco/lambda/DownstreamTest.scala b/common/lambda/src/test/scala/weco/lambda/DownstreamTest.scala new file mode 100644 index 0000000000..8d91d4a87e --- /dev/null +++ b/common/lambda/src/test/scala/weco/lambda/DownstreamTest.scala @@ -0,0 +1,55 @@ +package weco.lambda + +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers +import weco.lambda.helpers.ConfigurationTestHelpers +import weco.messaging.sns.SNSConfig + +class DownstreamTest extends AnyFunSpec + with ConfigurationTestHelpers + with Matchers { + + describe("DownstreamBuilder") { + it("builds a StdOut downstream target") { + val config = + """ + |downstream.target = "stdio" + |""".asConfig + + DownstreamBuilder.buildDownstreamTarget(config) shouldBe StdOut + } + + it("builds an SNS downstream target") { + val config = + """ + |downstream.target = "sns" + |aws.sns.topic.arn = "arn:aws:sns:eu-west-1:123456789012:my-topic" + |""".asConfig + + DownstreamBuilder.buildDownstreamTarget(config) shouldBe SNS( + config = SNSConfig( + topicArn = "arn:aws:sns:eu-west-1:123456789012:my-topic" + ) + ) + } + + it("builds a StdOut downstream target if no downstream target is specified") { + val config = + """ + |""".asConfig + + DownstreamBuilder.buildDownstreamTarget(config) shouldBe StdOut + } + + it("throws an exception if the downstream target is not recognised") { + val config = + """ + |downstream.target = "invalid" + |""".asConfig + + intercept[IllegalArgumentException] { + DownstreamBuilder.buildDownstreamTarget(config) + } + } + } +} diff --git a/pipeline/relation_embedder/batcher/src/main/resources/application.conf b/pipeline/relation_embedder/batcher/src/main/resources/application.conf index e32a2178ad..5721558aaa 100644 --- a/pipeline/relation_embedder/batcher/src/main/resources/application.conf +++ b/pipeline/relation_embedder/batcher/src/main/resources/application.conf @@ -5,5 +5,5 @@ aws.sns.topic.arn=${?output_topic_arn} batcher.flush_interval_minutes=${?flush_interval_minutes} batcher.max_processed_paths=${?max_processed_paths} batcher.max_batch_size=${?max_batch_size} -batcher.use_downstream=sns -batcher.use_downstream=${?use_downstream} \ No newline at end of file +downstream.target=sns +downstream.target=${?use_downstream} diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala index 3bbc515ec2..39e3bf40b6 100644 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala @@ -1,14 +1,8 @@ package weco.pipeline.batcher.lib import com.typesafe.config.Config -import weco.lambda.{ - ApplicationConfig, - DownstreamTarget, - LambdaConfigurable, - SNS, - StdOut -} -import weco.messaging.typesafe.SNSBuilder.buildSNSConfig +import weco.lambda.DownstreamBuilder.buildDownstreamTarget +import weco.lambda.{ApplicationConfig, DownstreamTarget, LambdaConfigurable} case class BatcherConfig( maxBatchSize: Int, @@ -21,11 +15,6 @@ trait BatcherConfigurable extends LambdaConfigurable[BatcherConfig] { def build(rawConfig: Config): BatcherConfig = BatcherConfig( maxBatchSize = rawConfig.requireInt("batcher.max_batch_size"), - downstreamTarget = { - rawConfig.requireString("batcher.use_downstream") match { - case "sns" => SNS(buildSNSConfig(rawConfig)) - case "stdio" => StdOut - } - } + downstreamTarget = buildDownstreamTarget(rawConfig) ) } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/resources/application.conf b/pipeline/relation_embedder/relation_embedder/src/main/resources/application.conf index 873912dfa9..d037e53780 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/resources/application.conf +++ b/pipeline/relation_embedder/relation_embedder/src/main/resources/application.conf @@ -11,4 +11,5 @@ es.host=${?es_host} es.port=${?es_port} es.protocol=${?es_protocol} es.apikey=${?es_apikey} -relation_embedder.use_downstream=${?use_downstream} +downstream.target=sns +downstream.target=${?use_downstream} diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala index 9926af9592..0afc571b67 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala @@ -2,9 +2,9 @@ package weco.pipeline.relation_embedder.lib import com.typesafe.config.Config import weco.elasticsearch.typesafe.ElasticBuilder.buildElasticClientConfig +import weco.lambda.DownstreamBuilder.buildDownstreamTarget import weco.elasticsearch.typesafe.ElasticConfig import weco.lambda._ -import weco.messaging.typesafe.SNSBuilder.buildSNSConfig case class RelationEmbedderConfig( mergedWorkIndex: String, @@ -31,11 +31,6 @@ trait RelationEmbedderConfigurable affectedWorksScroll = rawConfig.requireInt("es.works.scroll.affected_works"), elasticConfig = buildElasticClientConfig(rawConfig), - downstreamTarget = { - rawConfig.requireString("relation_embedder.use_downstream") match { - case "sns" => SNS(buildSNSConfig(rawConfig)) - case "stdio" => StdOut - } - } + downstreamTarget = buildDownstreamTarget(rawConfig) ) }