Skip to content

Commit

Permalink
Merge branch 'main' into update/non_aws
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-butcher authored Jan 7, 2025
2 parents 96d56ca + c3e21d0 commit 6df0a66
Show file tree
Hide file tree
Showing 406 changed files with 266,902 additions and 313,545 deletions.
1 change: 1 addition & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ steps:

matrix:
- "internal_model"
- "lambda"
- "display_model"
- "source_model"
- "pipeline_storage"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/dependency-graph.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jobs:
name: Update Dependency Graph
runs-on: ubuntu-latest
steps:
- name: Install sbt
uses: sbt/setup-sbt@v1
- uses: actions/checkout@v3
- uses: aws-actions/configure-aws-credentials@v4
with:
Expand Down
9 changes: 1 addition & 8 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,15 @@ __pycache__/

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
target/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
12 changes: 2 additions & 10 deletions CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,10 +1,2 @@
# These are default codeowners, later rules take precedence
* @wellcomecollection/scala-reviewers

# To ensure only properly audited changes are run in CI, we require reviews
# from @wellcomecollection/developers when updating pipeline config
/CODEOWNERS @wellcomecollection/developers
/.buildkite/ @wellcomecollection/developers

# Allow reviews from all developers on infrastructure changes
/pipeline/terraform @wellcomecollection/developers
* @wellcomecollection/digital-platform

55 changes: 8 additions & 47 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ lazy val display_model = setupProject(
externalDependencies = CatalogueDependencies.displayModelDependencies
)

lazy val lambda = setupProject(
project,
"common/lambda",
externalDependencies = CatalogueDependencies.lambdaDependencies
)

lazy val flows = setupProject(
project,
"common/flows",
Expand Down Expand Up @@ -135,8 +141,7 @@ lazy val path_concatenator = setupProject(
lazy val relation_embedder = setupProject(
project,
"pipeline/relation_embedder/relation_embedder",
localDependencies = Seq(internal_model, pipeline_storage_typesafe),
externalDependencies = CatalogueDependencies.relationEmbedderDependencies
localDependencies = Seq(internal_model, pipeline_storage_typesafe, lambda)
)

lazy val router = setupProject(
Expand All @@ -149,51 +154,7 @@ lazy val router = setupProject(
lazy val batcher = setupProject(
project,
"pipeline/relation_embedder/batcher",
localDependencies = Seq(
// Strictly speaking, the batcher doesn't need any of the internal model code,
// but for some reason the batcher tests fail if we don't include it in the
// class path:
//
// Cause: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: The connection was closed during the request. The request will usually succeed on a retry, but if it does not: consider disabling any proxies you have configured, enabling debug logging, or performing a TCP dump to identify the root cause. If this is a streaming operation, validate that data is being read or written in a timely manner. Channel Information: ChannelDiagnostics(channel=[id: 0x49117641, L:0.0.0.0/0.0.0.0:38210], channelAge=PT0.000813S, requestCount=1)
// at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
// at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
// at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:223)
// at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:218)
// at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:182)
// at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
// at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
// at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
// at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
// at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
// ...
// Cause: java.io.IOException: The connection was closed during the request. The request will usually succeed on a retry, but if it does not: consider disabling any proxies you have configured, enabling debug logging, or performing a TCP dump to identify the root cause. If this is a streaming operation, validate that data is being read or written in a timely manner. Channel Information: ChannelDiagnostics(channel=[id: 0x49117641, L:0.0.0.0/0.0.0.0:38210], channelAge=PT0.000813S, requestCount=1)
// at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.configurePipeline(NettyRequestExecutor.java:233)
// at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.lambda$makeRequestListener$10(NettyRequestExecutor.java:181)
// at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
// at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106)
// at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
// at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
// at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
// at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
// at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
// at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
// ...
//
// We started seeing these issues when we upgraded our Buildkite CI Stack
// from 5.7.2 to 5.16.1.
//
// This is presumably an issue of dependency resolution somewhere, and the batcher
// is getting a different version of a dependency to all our other apps -- but
// I can't work out exactly where it is.
//
// You can see some experiments trying to find it in this PR, where I created a new
// copy of internal_model and started cutting bits out:
// https://github.com/wellcomecollection/catalogue-pipeline/pull/2327
//
// But ultimately it wasn't a good use of time to keep debugging this.
internal_model
),
externalDependencies = CatalogueDependencies.batcherDependencies
localDependencies = Seq(lambda)
)

lazy val reindex_worker = setupProject(
Expand Down
6 changes: 6 additions & 0 deletions builds/deploy_catalogue_pipeline.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ ENV_TAG="env.$PIPELINE_DATE" "$ROOT/builds/update_ecr_image_tag.sh" \

if [[ "$TASK" == "tag_images_and_deploy_services" ]]
then
echo "Deploying ECS pipeline services to catalogue-$PIPELINE_DATE"
CLUSTER="catalogue-$PIPELINE_DATE" "$ROOT/builds/deploy_ecs_services.sh" \
id_minter \
image_inferrer \
Expand All @@ -88,5 +89,10 @@ then
transformer_miro \
transformer_sierra \
transformer_tei

echo "Deploying λ pipeline services to catalogue-$PIPELINE_DATE"
"$ROOT/builds/deploy_lambda_services.sh" \
batcher \
relation_embedder
fi

41 changes: 41 additions & 0 deletions builds/deploy_lambda_services.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env bash

set -o errexit
set -o nounset
set -o pipefail

PIPELINE_NAMESPACE="catalogue-$PIPELINE_DATE"
REPOSITORY_URI="760097843905.dkr.ecr.eu-west-1.amazonaws.com"

for SERVICE_NAME in "$@"
do
IMAGE_URI="${REPOSITORY_URI}"/uk.ac.wellcome/"${SERVICE_NAME}":"env.${PIPELINE_DATE}"
FUNCTION_NAME="${PIPELINE_NAMESPACE}"-"${SERVICE_NAME}"

echo "Deploying ${IMAGE_URI} to ${FUNCTION_NAME}, @ $(date) ..."

echo "Current lambda configuration for ${FUNCTION_NAME}:"
aws lambda get-function-configuration \
--function-name "$FUNCTION_NAME" \
--no-cli-pager

echo "Updating lambda configuration ..."
echo "Using ${IMAGE_URI}:"
aws lambda update-function-code \
--function-name "$FUNCTION_NAME" \
--image-uri "${IMAGE_URI}" \
--no-cli-pager


echo "Updated lambda configuration, (waiting for update @ $(date)}):"
aws lambda wait function-updated \
--function-name "$FUNCTION_NAME" \
--no-cli-pager

echo "New lambda configuration complete (@ $(date)), config after change:"
aws lambda get-function-configuration \
--function-name "$FUNCTION_NAME" \
--no-cli-pager

echo "Done deploying ${FUNCTION_NAME} @ $(date)! 🚀"
done
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,15 @@ object ImageFsm {
}

case class InferredData(
// We split the feature vector so that it can fit into
// ES's dense vector type (max length 2048)
features1: List[Float],
features2: List[Float],
reducedFeatures: List[Float],
features: List[Float],
paletteEmbedding: List[Float],
averageColorHex: Option[String],
aspectRatio: Option[Float]
)

object InferredData {
def empty: InferredData = InferredData(
features1 = Nil,
features2 = Nil,
reducedFeatures = Nil,
features = Nil,
paletteEmbedding = Nil,
averageColorHex = None,
aspectRatio = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object Format extends Enum[Format] {
// for all CALM sourced works, hence the id here is prefixed
// with a "h" to namespace it within "ArchivesAndManuscripts".
override val id: String = "hdig"
override val label: String = "Archives - Digital"
override val label: String = "Born-digital archives"
}

case object Film extends Unlinked with Audiovisual {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ trait ImagesIndexFixtures extends IndexFixturesBase {
def withLocalImagesIndex[R]: Fixture[Index, R] = {
withLocalElasticSearchIndex[R](config =
getConfig(
mappings = "mappings.images_indexed.2024-08-20.json",
analysis = "analysis.works_indexed.2024-08-20.json"
mappings = "mappings.images_indexed.2024-11-14.json",
analysis = "analysis.works_indexed.2024-11-14.json"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ trait WorksIndexFixtures extends IndexFixturesBase {
def withLocalWorksIndex[R]: Fixture[Index, R] = {
withLocalElasticSearchIndex[R](config =
getConfig(
mappings = "mappings.works_indexed.2024-08-20.json",
analysis = "analysis.works_indexed.2024-08-20.json"
mappings = "mappings.works_indexed.2024-11-14.json",
analysis = "analysis.works_indexed.2024-11-14.json"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,9 @@ trait ImageGenerators

def createInferredData: InferredData = {
val features = randomUnitLengthVector(4096)
val (features1, features2) = features.splitAt(features.size / 2)
val reducedFeatures = randomUnitLengthVector(1024)
val paletteEmbedding = randomUnitLengthVector(1000)
InferredData(
features1 = features1.toList,
features2 = features2.toList,
reducedFeatures = reducedFeatures.toList,
features = features.toList,
paletteEmbedding = paletteEmbedding.toList,
averageColorHex = Some(randomHexString),
aspectRatio = inferredDataAspectRatio
Expand Down
46 changes: 46 additions & 0 deletions common/lambda/src/main/scala/weco/lambda/Downstream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package weco.lambda

import io.circe.Encoder
import software.amazon.awssdk.services.sns.SnsClient
import weco.messaging.sns.{SNSConfig, SNSMessageSender}
import weco.json.JsonUtil.toJson

import scala.util.Try

trait Downstream {
def notify(workId: String): Try[Unit]
def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit]
}

class SNSDownstream(snsConfig: SNSConfig) extends Downstream {
protected val msgSender = new SNSMessageSender(
snsClient = SnsClient.builder().build(),
snsConfig = snsConfig,
subject = "Sent from relation_embedder"
)

override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId))
override def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit] =
msgSender.sendT(batch)
}

object STDIODownstream extends Downstream {
override def notify(workId: String): Try[Unit] = Try(println(workId))
override def notify[T](t: T)(implicit encoder: Encoder[T]): Try[Unit] = Try(
println(toJson(t))
)
}

sealed trait DownstreamTarget
case class SNS(config: SNSConfig) extends DownstreamTarget
case object StdOut extends DownstreamTarget

object Downstream {
def apply(downstreamTarget: DownstreamTarget): Downstream = {
downstreamTarget match {
case SNS(config) => new SNSDownstream(config)
case StdOut => STDIODownstream
}
}
def apply(): Downstream = STDIODownstream
}
Loading

0 comments on commit 6df0a66

Please sign in to comment.