diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala index 61b4732f77..2d60765616 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala @@ -137,6 +137,5 @@ object BatchProcessor { bulkWriter = batchWriter, downstream = Downstream(Some(config)) ) - } } diff --git a/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala b/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala index 4ea59f3adb..9cc89a91ca 100644 --- a/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala @@ -87,7 +87,7 @@ class CalmTransformerEndToEndTest new TransformerWorker[CalmSourcePayload, CalmSourceData, String]( transformer = CalmTransformer, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new CalmSourceDataRetriever(store) ) worker.run() diff --git a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala index d4e8223c28..c52ccca65e 100644 --- a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala +++ b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala @@ -55,7 +55,7 @@ class TransformerMain[Payload <: SourcePayload, SourceData]( new TransformerWorker( transformer = transformer, pipelineStream = pipelineStream, - retriever = sourceWorkRetriever, + transformedWorkRetriever = sourceWorkRetriever, sourceDataRetriever = sourceDataRetriever ) } diff --git a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala index 0ea2007e02..e2362cdb66 100644 --- a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala +++ b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala @@ -36,61 +36,22 @@ trait SourceDataRetriever[Payload, SourceData] { ): Either[ReadError, Identified[Version[String, Int], SourceData]] } -/** A TransformerWorker: - * - Takes an SQS stream that emits VHS keys - * - Gets the record of type `SourceData` - * - Runs it through a transformer and transforms the `SourceData` to - * `Work[Source]` - * - Emits the message via `MessageSender` to SNS - */ -final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( - transformer: Transformer[SourceData], - retriever: Retriever[Work[Source]], - pipelineStream: PipelineStorageStream[NotificationMessage, Work[ - Source - ], SenderDest], - sourceDataRetriever: SourceDataRetriever[Payload, SourceData] -)(implicit ec: ExecutionContext, decoder: Decoder[Payload]) - extends Logging - with Runnable { +trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] extends Logging { type Result[T] = Either[TransformerWorkerError, T] type StoreKey = Version[String, Int] - def name: String = this.getClass.getSimpleName + implicit val ec: ExecutionContext + implicit val decoder: Decoder[Payload] - def run(): Future[Done] = - pipelineStream.foreach( - name, - (notification: NotificationMessage) => - process(notification).map { - case Left(err) => - // We do some slightly nicer logging here to give context to the errors - err match { - case DecodePayloadError(_, notificationMsg) => - error(s"$name: DecodePayloadError from $notificationMsg") - case StoreReadError(_, key) => - error(s"$name: StoreReadError on $key") - case TransformerError(t, sourceData, key) => - error(s"$name: TransformerError on $sourceData with $key ($t)") - } - - throw err + val sourceDataRetriever: SourceDataRetriever[Payload, SourceData] + val transformer: Transformer[SourceData] + val transformedWorkRetriever: Retriever[Work[Source]] - case Right(None) => - debug( - s"$name: no transformed Work returned for $notification (this means the Work is already in the pipeline)" - ) - Nil + val transformerName: String - case Right(Some((work, key))) => - info(s"$name: from $key transformed work with id ${work.id}") - List(work) - } - ) - - def process( - message: NotificationMessage - ): Future[Result[Option[(Work[Source], StoreKey)]]] = + def processEvent( + message: NotificationMessage + ): Future[Result[Option[(Work[Source], StoreKey)]]] = Future { for { payload <- decodePayload(message) @@ -107,8 +68,8 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( }.flatMap { compareToStored } private def compareToStored( - workResult: Result[(Work[Source], StoreKey)] - ): Future[Result[Option[(Work[Source], StoreKey)]]] = + workResult: Result[(Work[Source], StoreKey)] + ): Future[Result[Option[(Work[Source], StoreKey)]]] = workResult match { // Once we've transformed the Work, we query forward -- is this a work we've @@ -123,7 +84,7 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( // Calm. The records get a new modifiedDate from Sierra, but none of the data // we care about for the pipeline is changed. case Right((transformedWork, key)) => - retriever + transformedWorkRetriever .apply(workIndexable.id(transformedWork)) .map { storedWork => @@ -131,7 +92,7 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( Right(Some((transformedWork, key))) } else { info( - s"$name: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending" + s"$transformerName: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending" ) Right(None) } @@ -146,10 +107,10 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( } private def work( - sourceData: SourceData, - version: Int, - key: StoreKey - ): Result[Work[Source]] = + sourceData: SourceData, + version: Int, + key: StoreKey + ): Result[Work[Source]] = transformer(id = key.id, sourceData, version) match { case Right(result) => Right(result) case Left(err) => @@ -182,9 +143,9 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( } private def shouldSend( - transformedWork: Work[Source], - storedWork: Work[Source] - ): Boolean = { + transformedWork: Work[Source], + storedWork: Work[Source] + ): Boolean = { if (transformedWork.version < storedWork.version) { debug( s"${transformedWork.id}: transformed Work is older than the stored Work" @@ -226,9 +187,9 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( } private def areEquivalent( - transformedWork: Work[Source], - storedWork: Work[Source] - ): Boolean = { + transformedWork: Work[Source], + storedWork: Work[Source] + ): Boolean = { // Sometimes we get updates from our sources even though the data hasn't necessarily changed. // One example of that is the Sierra Calm sync script that triggers an update to // every sierra catalogued in calm every night. It can be very expensive if we let @@ -247,3 +208,59 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( modifiedTransformedWork == modifiedSourceWork } } + +/** A TransformerWorker: + * - Takes an SQS stream that emits VHS keys + * - Gets the record of type `SourceData` + * - Runs it through a transformer and transforms the `SourceData` to + * `Work[Source]` + * - Emits the message via `MessageSender` to SNS + */ +final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( + val transformer: Transformer[SourceData], + val transformedWorkRetriever: Retriever[Work[Source]], + pipelineStream: PipelineStorageStream[NotificationMessage, Work[ + Source + ], SenderDest], + val sourceDataRetriever: SourceDataRetriever[Payload, SourceData] +)(implicit val ec: ExecutionContext, val decoder: Decoder[Payload]) + extends Logging + with TransformerEventProcessor[Payload, SourceData] + with Runnable { + + lazy val transformerName: String = this.getClass.getSimpleName + + def run(): Future[Done] = + pipelineStream.foreach( + transformerName, + (notification: NotificationMessage) => + process(notification).map { + case Left(err) => + // We do some slightly nicer logging here to give context to the errors + err match { + case DecodePayloadError(_, notificationMsg) => + error(s"$transformerName: DecodePayloadError from $notificationMsg") + case StoreReadError(_, key) => + error(s"$transformerName: StoreReadError on $key") + case TransformerError(t, sourceData, key) => + error(s"$transformerName: TransformerError on $sourceData with $key ($t)") + } + + throw err + + case Right(None) => + debug( + s"$transformerName: no transformed Work returned for $notification (this means the Work is already in the pipeline)" + ) + Nil + + case Right(Some((work, key))) => + info(s"$transformerName: from $key transformed work with id ${work.id}") + List(work) + } + ) + + def process( + message: NotificationMessage + ): Future[Result[Option[(Work[Source], StoreKey)]]] = processEvent(message) +} diff --git a/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala b/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala index e42184c126..fea6c93859 100644 --- a/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala +++ b/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala @@ -451,7 +451,7 @@ class TransformerWorkerTest val worker = new TransformerWorker( transformer = transformer, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new ExampleSourcePayloadLookup(sourceStore = store) ) diff --git a/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala b/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala index dfa4d5285a..6df06b87ea 100644 --- a/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala @@ -94,7 +94,7 @@ class MetsTransformerEndToEndTest new TransformerWorker[MetsSourcePayload, MetsSourceData, String]( transformer = new MetsXmlTransformer(store), pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new MetsSourceDataRetriever ) worker.run() diff --git a/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala b/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala index 9171a74b32..5d39379858 100644 --- a/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala @@ -98,7 +98,7 @@ class MiroTransformerEndToEndTest ]( transformer = new MiroRecordTransformer, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new MiroSourceDataRetriever(store) ) worker.run() diff --git a/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala b/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala index 16a88d0976..b623092b00 100644 --- a/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala @@ -102,7 +102,7 @@ class SierraTransformerEndToEndTest version: Int ) => SierraTransformer(transformable, version).toEither, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new SierraSourceDataRetriever(store) ) worker.run() diff --git a/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala b/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala index f18cd06446..262e22a11f 100644 --- a/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala @@ -94,7 +94,7 @@ class TeiTransformerEndToEndTest new TransformerWorker[TeiSourcePayload, TeiMetadata, String]( transformer = new TeiTransformer(store), pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new TeiSourceDataRetriever ) worker.run()