Skip to content

Commit

Permalink
resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
kenoir committed Jan 6, 2025
2 parents ac0edb4 + a25e871 commit f1f6c14
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ class BatcherWorkerService[MsgDestination](
source
.map {
case (msg: SQSMessage, notificationMessage: NotificationMessage) =>
(msg, notificationMessage.body)
PathFromSQS(notificationMessage.body, msg)
}
.groupedWithin(maxProcessedPaths, flushInterval)
.map(_.toList.unzip)
.mapAsync(1) {
case (msgs, paths) =>
paths =>
info(s"Processing ${paths.size} input paths")
processPaths(msgs, paths)
processPaths(paths)
}
.flatMapConcat(identity)
}
Expand All @@ -46,16 +45,14 @@ class BatcherWorkerService[MsgDestination](
* corresponding batches have been succesfully sent.
*/
private def processPaths(
msgs: List[SQSMessage],
paths: List[String]
paths: Seq[PathFromSQS]
): Future[Source[SQSMessage, NotUsed]] =
pathsProcessor(paths)
.map {
failedIndices =>
val failedIdxSet = failedIndices.toSet
Source(msgs).zipWithIndex
.collect {
case (msg, idx) if !failedIdxSet.contains(idx) => msg
}
failedPaths =>
val failedPathSet = failedPaths.toSet
Source(paths.collect {
case path if !failedPathSet.contains(path) => path.referent
}.toList)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ object CLIMain extends App {
Flow[ByteString].map(_.utf8String)

private val pathsProcessor = new PathsProcessor(STDIODownstream, 40)
private val pathsProcessorFlow
: Flow[Seq[String], Future[Seq[Long]], NotUsed] =
Flow[Seq[String]].map {
paths: Seq[String] =>
pathsProcessor(
paths.toList
)
private val pathsProcessorFlow: Flow[Seq[Path], Future[Seq[Path]], NotUsed] =
Flow[Seq[Path]].map {
paths: Seq[Path] => pathsProcessor(paths)
}

private val toPathFlow: Flow[String, Path, NotUsed] =
Flow[String].map(PathFromString)

stdinSource
.via(lineDelimiter)
.via(toStringFlow)
.via(toPathFlow)
// this number is pretty arbitrary, but grouping of some kind is needed in order to
// provide a list to the next step, rather than individual paths
.grouped(10000)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package weco.pipeline.batcher
import software.amazon.awssdk.services.sqs.model.{Message => SQSMessage}

sealed trait Path extends Ordered[Path] {
val path: String
override def toString: String = path
override def compare(that: Path): Int = this.path compare that.path
}

sealed trait PathWithReferent[T] extends Path {
val referent: T
}

case class PathFromSQS(val path: String, val referent: SQSMessage)
extends PathWithReferent[SQSMessage]

case class PathFromString(val path: String) extends Path
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package weco.pipeline.batcher
import grizzled.slf4j.Logging
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import weco.lambda.Downstream
import weco.json.JsonUtil._

Expand All @@ -18,8 +16,7 @@ import scala.util.{Failure, Success}
* The maximum number of selectors to include in a single Batch
*/
class PathsProcessor(downstream: Downstream, maxBatchSize: Int)(
implicit ec: ExecutionContext,
materializer: Materializer
implicit ec: ExecutionContext
) extends Logging {

/** Takes a list of strings, each representing a path to be processed by
Expand All @@ -35,24 +32,41 @@ class PathsProcessor(downstream: Downstream, maxBatchSize: Int)(
* SQS/SNS-driven. Should just be the actual failed paths, and the caller
* should build a map to work it out if it wants to)
*/
def apply(paths: List[String]): Future[Seq[Long]] = {
def apply[T <: Path](paths: Seq[T]): Future[Seq[Path]] = {
info(s"Processing ${paths.size} paths with max batch size $maxBatchSize")

generateBatches(maxBatchSize, paths)
.mapAsyncUnordered(10) {
case (batch, msgIndices) =>
Future
.sequence {
generateBatches(maxBatchSize, paths).map {
case (batch, msgPaths) =>
notifyDownstream(downstream, batch, msgPaths)
}
}
.flatMap {
results =>
Future {
downstream.notify(batch) match {
case Success(_) => None
case Failure(err) =>
error(s"Failed processing batch $batch with error: $err")
Some(msgIndices)
}
results.collect {
case Some(failedPaths) => failedPaths
}.flatten
}
}
.collect { case Some(failedIndices) => failedIndices }
.mapConcat(identity)
.runWith(Sink.seq)
}

private def notifyDownstream(
downstream: Downstream,
batch: Batch,
msgPaths: List[Path]
)(
implicit ec: ExecutionContext
): Future[Option[List[Path]]] = {
Future {
downstream.notify(batch) match {
case Success(_) => None
case Failure(err) =>
error(s"Failed processing batch $batch with error: $err")
Some(msgPaths)
}
}
}

/** Given a list of input paths, generate the minimal set of selectors
Expand All @@ -61,10 +75,35 @@ class PathsProcessor(downstream: Downstream, maxBatchSize: Int)(
*/
private def generateBatches(
maxBatchSize: Int,
paths: List[String]
): Source[(Batch, List[Long]), NotUsed] = {
paths: Seq[Path]
): Seq[(Batch, List[Path])] = {
val selectors = Selector.forPaths(paths)
val groupedSelectors = selectors.groupBy(_._1.rootPath)

logSelectors(paths, selectors, groupedSelectors)

groupedSelectors.map {
case (rootPath, selectorsAndPaths) =>
// For batches consisting of a really large number of selectors, we
// should just send the whole tree: this avoids really long queries
// in the relation embedder, or duplicate work of creating the archives
// cache multiple times, and it is likely pretty much all the nodes will
// be denormalised anyway.
val (selectors, inputPaths) = selectorsAndPaths.unzip(identity)
val batch =
if (selectors.size > maxBatchSize)
Batch(rootPath, List(Selector.Tree(rootPath)))
else
Batch(rootPath, selectors)
batch -> inputPaths
}.toSeq
}

private def logSelectors(
paths: Seq[Path],
selectors: List[(Selector, Path)],
groupedSelectors: Map[String, List[(Selector, Path)]]
): Unit = {
info(
s"Generated ${selectors.size} selectors spanning ${groupedSelectors.size} trees from ${paths.size} paths."
)
Expand All @@ -81,21 +120,6 @@ class PathsProcessor(downstream: Downstream, maxBatchSize: Int)(
s"Selectors for root path $rootPath: ${selectors.map(_._1).mkString(", ")}"
)
}
Source(groupedSelectors.toList).map {
case (rootPath, selectorsAndIndices) =>
// For batches consisting of a really large number of selectors, we
// should just send the whole tree: this avoids really long queries
// in the relation embedder, or duplicate work of creating the archives
// cache multiple times, and it is likely pretty much all the nodes will
// be denormalised anyway.
val (selectors, msgIndices) = selectorsAndIndices.unzip(identity)
val batch =
if (selectors.size > maxBatchSize)
Batch(rootPath, List(Selector.Tree(rootPath)))
else
Batch(rootPath, selectors)
batch -> msgIndices
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ sealed trait Selector {
* use these to filter out any unnecessary selectors when another broader one
* already exists.
*/
def superSelectors: List[Selector] = {
private def superSelectors: List[Selector] = {
import Selector._
val ancestorPaths = ancestors(path)
val ancestorDescendents = ancestorPaths.map(Descendents(_))
Expand All @@ -34,21 +34,50 @@ sealed trait Selector {
}
}

def shouldSupress(otherSelectors: Set[Selector]): Boolean =
protected def shouldSuppress(otherSelectors: Set[Selector]): Boolean =
superSelectors.exists(otherSelectors.contains)
}

object Selector {

type Path = String
private type SelectorPath = String

case class Tree(path: Path) extends Selector
case class Tree(path: SelectorPath) extends Selector

case class Node(path: Path) extends Selector
case class Node(path: SelectorPath) extends Selector

case class Children(path: Path) extends Selector
case class Children(path: SelectorPath) extends Selector

case class Descendents(path: Path) extends Selector
case class Descendents(path: SelectorPath) extends Selector

/** Given a list of input paths, return a list of selectors representing the
* nodes which should be sent to the `relation_embedder` for denormalisation.
* The output also includes the original path, as when sending a SNS
* notification we need to map this back to the original input path in case
* of failure.
*
* The generation of selectors uses the logic documented in `forPath`,
* followed by filtering of any selectors which are already accounted for by
* other broader selectors.
*
* For example, given the following tree:
* {{{
* A
* |
* |----------
* | | |
* B C E
* | |---- |------
* | | | | | | | |
* D X Y Z 1 2 3 4
* }}}
* We would not need to include selectors for `Node(X)` or `Children(Y)` if
* for example either the selectors `Tree(A)` or `Descendents(C)` also
* existed.
*/
def forPaths(paths: Seq[Path]): List[(Selector, Path)] = {
discardRedundantSelectors(mapSelectorsToPaths(paths)).toList
}

/** Given an input path, return a list of selectors representing the nodes
* which should be sent to the `relation_embedder` for denormalisation.
Expand All @@ -60,8 +89,16 @@ object Selector {
*
* To see why this is the case, consider the following tree:
*
* A \| \|------------- \| | | B C E \| |------ |--------- \| | | | | | | | D
* X Y Z 1 2 3 4
* {{{
* A
* |
* |-------------
* | | |
* B C E
* | |------ |---------
* | | | | | | | |
* D X Y Z 1 2 3 4
* }}}
*
* Given node `C` as an input, we need to denormalise the parent `A` as it
* contains `C` as a child relation. We need to denormalise the input `C`
Expand All @@ -75,66 +112,49 @@ object Selector {
* parent downwards this would result in the siblings descendents being
* denormalised unnecessarily.
*/
def forPath(path: Path): List[Selector] =
private def forPath(path: SelectorPath): List[Selector] =
parent(path)
.map {
parent =>
List(Node(parent), Children(parent), Descendents(path))
}
.getOrElse(List(Tree(path)))

/** Given a list of input paths, return a list of selectors representing the
* nodes which should be sent to the `relation_embedder` for denormalisation.
* The output also includes the index of the original path in the list, as
* when sending a SNS notification we need to map this back to the original
* input path in case of failure.
*
* The generation of selectors uses the logic documented in `forPath`,
* followed by filtering of any selectors which are already accounted for by
* other broader selectors.
*
* For example, given the following tree:
*
* A \| \|------------- \| | | B C E \| |------ |--------- \| | | | | | | | D
* X Y Z 1 2 3 4
*
* We would not need to include selectors for `Node(X)` or `Children(Y)` if
* for example either the selectors `Tree(A)` or `Descendents(C)` also
* existed.
*/
def forPaths(paths: List[Path]): List[(Selector, Long)] = {
val selectors = paths.zipWithIndex
.flatMap {
case (path, idx) =>
Selector.forPath(path).map(selector => (selector, idx.toLong))
}
private def mapSelectorsToPaths(paths: Seq[Path]): Map[Selector, Path] =
paths
.flatMap(
path => Selector.forPath(path.path).map(selector => (selector, path))
)
.groupBy(_._1)
.map(_._2.head)

val selectorSet = selectors.keySet
selectors.collect {
case (selector, idx) if !selector.shouldSupress(selectorSet) =>
(selector, idx)
}.toList
private def discardRedundantSelectors(
selectorMap: Map[Selector, Path]
): Map[Selector, Path] = {
val selectorSet = selectorMap.keySet
selectorMap.collect {
case (selector, path) if !selector.shouldSuppress(selectorSet) =>
(selector, path)
}
}

private def parent(path: Path): Option[Path] =
private def parent(path: SelectorPath): Option[SelectorPath] =
tokenize(path).dropRight(1) match {
case Nil => None
case tokens => Some(join(tokens))
}

private def ancestors(path: Path): List[Path] = {
private def ancestors(path: SelectorPath): List[SelectorPath] = {
val tokens = tokenize(path).dropRight(1)
(1 to tokens.length).map {
i =>
join(tokens.slice(0, i))
}.toList
}

private def tokenize(path: Path): List[String] =
private def tokenize(path: SelectorPath): List[String] =
path.split("/").toList

private def join(tokens: List[String]): Path =
private def join(tokens: List[String]): String =
tokens.mkString("/")
}
Loading

0 comments on commit f1f6c14

Please sign in to comment.