Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/sierra_adapter/sierra_progres…
Browse files Browse the repository at this point in the history
…s_reporter/urllib3-1.26.19
  • Loading branch information
paul-butcher authored Jan 9, 2025
2 parents 48105bd + 8521802 commit 30b6543
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 24 deletions.
27 changes: 27 additions & 0 deletions common/lambda/src/main/scala/weco/lambda/Downstream.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package weco.lambda

import com.typesafe.config.Config
import grizzled.slf4j.Logging
import io.circe.Encoder
import software.amazon.awssdk.services.sns.SnsClient
import weco.messaging.sns.{SNSConfig, SNSMessageSender}
import weco.json.JsonUtil.toJson
import weco.messaging.typesafe.SNSBuilder.buildSNSConfig

import scala.util.Try

Expand Down Expand Up @@ -44,3 +47,27 @@ object Downstream {
}
def apply(): Downstream = STDIODownstream
}

// Typesafe specific configuration builder
object DownstreamBuilder extends Logging {
import weco.typesafe.config.builders.EnrichConfig._

def buildDownstreamTarget(config: Config): DownstreamTarget = {
config.getStringOption("downstream.target") match {
case Some("sns") =>
val snsConfig = buildSNSConfig(config)
info(s"Building SNS downstream with config: $snsConfig")
SNS(snsConfig)
case Some("stdio") =>
info("Building StdOut downstream")
StdOut
case Some(unknownTarget) =>
throw new IllegalArgumentException(
s"Invalid downstream target: $unknownTarget"
)
case None =>
warn("No downstream target specified, defaulting to StdOut")
StdOut
}
}
}
55 changes: 55 additions & 0 deletions common/lambda/src/test/scala/weco/lambda/DownstreamTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package weco.lambda

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.lambda.helpers.ConfigurationTestHelpers
import weco.messaging.sns.SNSConfig

class DownstreamTest extends AnyFunSpec
with ConfigurationTestHelpers
with Matchers {

describe("DownstreamBuilder") {
it("builds a StdOut downstream target") {
val config =
"""
|downstream.target = "stdio"
|""".asConfig

DownstreamBuilder.buildDownstreamTarget(config) shouldBe StdOut
}

it("builds an SNS downstream target") {
val config =
"""
|downstream.target = "sns"
|aws.sns.topic.arn = "arn:aws:sns:eu-west-1:123456789012:my-topic"
|""".asConfig

DownstreamBuilder.buildDownstreamTarget(config) shouldBe SNS(
config = SNSConfig(
topicArn = "arn:aws:sns:eu-west-1:123456789012:my-topic"
)
)
}

it("builds a StdOut downstream target if no downstream target is specified") {
val config =
"""
|""".asConfig

DownstreamBuilder.buildDownstreamTarget(config) shouldBe StdOut
}

it("throws an exception if the downstream target is not recognised") {
val config =
"""
|downstream.target = "invalid"
|""".asConfig

intercept[IllegalArgumentException] {
DownstreamBuilder.buildDownstreamTarget(config)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ aws.sns.topic.arn=${?output_topic_arn}
batcher.flush_interval_minutes=${?flush_interval_minutes}
batcher.max_processed_paths=${?max_processed_paths}
batcher.max_batch_size=${?max_batch_size}
batcher.use_downstream=sns
batcher.use_downstream=${?use_downstream}
downstream.target=sns
downstream.target=${?use_downstream}
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package weco.pipeline.batcher.lib

import com.typesafe.config.Config
import weco.lambda.{
ApplicationConfig,
DownstreamTarget,
LambdaConfigurable,
SNS,
StdOut
}
import weco.messaging.typesafe.SNSBuilder.buildSNSConfig
import weco.lambda.DownstreamBuilder.buildDownstreamTarget
import weco.lambda.{ApplicationConfig, DownstreamTarget, LambdaConfigurable}

case class BatcherConfig(
maxBatchSize: Int,
Expand All @@ -21,11 +15,6 @@ trait BatcherConfigurable extends LambdaConfigurable[BatcherConfig] {
def build(rawConfig: Config): BatcherConfig =
BatcherConfig(
maxBatchSize = rawConfig.requireInt("batcher.max_batch_size"),
downstreamTarget = {
rawConfig.requireString("batcher.use_downstream") match {
case "sns" => SNS(buildSNSConfig(rawConfig))
case "stdio" => StdOut
}
}
downstreamTarget = buildDownstreamTarget(rawConfig)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ es.host=${?es_host}
es.port=${?es_port}
es.protocol=${?es_protocol}
es.apikey=${?es_apikey}
relation_embedder.use_downstream=${?use_downstream}
downstream.target=sns
downstream.target=${?use_downstream}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package weco.pipeline.relation_embedder.lib

import com.typesafe.config.Config
import weco.elasticsearch.typesafe.ElasticBuilder.buildElasticClientConfig
import weco.lambda.DownstreamBuilder.buildDownstreamTarget
import weco.elasticsearch.typesafe.ElasticConfig
import weco.lambda._
import weco.messaging.typesafe.SNSBuilder.buildSNSConfig

case class RelationEmbedderConfig(
mergedWorkIndex: String,
Expand All @@ -31,11 +31,6 @@ trait RelationEmbedderConfigurable
affectedWorksScroll =
rawConfig.requireInt("es.works.scroll.affected_works"),
elasticConfig = buildElasticClientConfig(rawConfig),
downstreamTarget = {
rawConfig.requireString("relation_embedder.use_downstream") match {
case "sns" => SNS(buildSNSConfig(rawConfig))
case "stdio" => StdOut
}
}
downstreamTarget = buildDownstreamTarget(rawConfig)
)
}

0 comments on commit 30b6543

Please sign in to comment.