From 76d84e2ebb40cf5eda5bce01759ff6c4812bb683 Mon Sep 17 00:00:00 2001 From: Guillaume Martres Date: Fri, 23 Feb 2024 17:12:19 +0100 Subject: [PATCH 1/5] Initial attempt to parallelize This seems to be 20% faster on datalog.benchmarks.examples.cspa10k_optimized.interpreted_indexed_sel__0____EOL --- src/main/scala/datalog/execution/ir/IROp.scala | 9 ++++++++- .../scala/datalog/execution/ir/IRTreeGenerator.scala | 10 +++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/scala/datalog/execution/ir/IROp.scala b/src/main/scala/datalog/execution/ir/IROp.scala index cc3eb1e1..ff3421df 100644 --- a/src/main/scala/datalog/execution/ir/IROp.scala +++ b/src/main/scala/datalog/execution/ir/IROp.scala @@ -124,6 +124,10 @@ case class DoWhileOp(toCmp: DB, override val children:IROp[Any]*)(using JITOptio }) () } +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration.Duration +given ExecutionContext = ExecutionContext.global + /** * @param code * @param children: [Any*] @@ -132,7 +136,10 @@ case class SequenceOp(override val code: OpCode, override val children:IROp[Any] override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[Any]]): Any = opFns.map(o => o(storageManager)) override def run(storageManager: StorageManager): Any = - children.map(o => o.run(storageManager)) + if code == OpCode.EVAL_SN then + Await.result(Future.sequence(children.map(o => Future(o.run(storageManager)))), Duration.Inf) + else + children.map(o => o.run(storageManager)) } case class UpdateDiscoveredOp()(using JITOptions) extends IROp[Any] { diff --git a/src/main/scala/datalog/execution/ir/IRTreeGenerator.scala b/src/main/scala/datalog/execution/ir/IRTreeGenerator.scala index d4c069a0..906b13a8 100644 --- a/src/main/scala/datalog/execution/ir/IRTreeGenerator.scala +++ b/src/main/scala/datalog/execution/ir/IRTreeGenerator.scala @@ -29,11 +29,15 @@ class IRTreeGenerator(using val ctx: InterpreterContext)(using JITOptions) { .map(r => val res = semiNaiveEvalRule(ruleMap(r)) ResetDeltaOp(r, res.asInstanceOf[IROp[Any]]) - ) :+ InsertDeltaNewIntoDerived() + ) SequenceOp( - OpCode.EVAL_SN, - queries:_*, + OpCode.SEQ, + SequenceOp( + OpCode.EVAL_SN, + queries:_*, + ), + InsertDeltaNewIntoDerived() ) } From b979510ac3e2ad8ea672af6dfd5b1cba45d71ab8 Mon Sep 17 00:00:00 2001 From: Guillaume Martres Date: Fri, 23 Feb 2024 17:41:50 +0100 Subject: [PATCH 2/5] Refactor parallelism logic to make it easier to experiment Enabling inParallel in other ops doesn't improve performance on datalog.benchmarks.examples.cspa10k_optimized.interpreted_indexed_sel__0____EOL on my laptop and in fact can make it worse. This needs to be tested more thoroughly on a bigger dataset on a machine where all cores are the same (my laptop has a mix of performance and efficiency cores which will likely add a lot of noise to any benchmark result). --- .../scala/datalog/execution/ir/IROp.scala | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/scala/datalog/execution/ir/IROp.scala b/src/main/scala/datalog/execution/ir/IROp.scala index ff3421df..059de8c8 100644 --- a/src/main/scala/datalog/execution/ir/IROp.scala +++ b/src/main/scala/datalog/execution/ir/IROp.scala @@ -10,7 +10,7 @@ import datalog.tools.Debug.debug import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.reflect.ClassTag import scala.quoted.* import scala.util.{Failure, Success} @@ -51,7 +51,17 @@ abstract class IROp[T](val children: IROp[T]*)(using val jitOptions: JITOptions, */ def run(storageManager: StorageManager): T = throw new Exception(s"Error: calling run on likely rel op: $code") + +} +object IROp { + given ExecutionContext = ExecutionContext.global + def runSequence[T](storageManager: StorageManager, seq: Seq[IROp[T]], inParallel: Boolean = false): Seq[T] = + if inParallel == false then + seq.map(o => o.run(storageManager)) + else + Await.result(Future.sequence(seq.map(o => Future(o.run(storageManager)))), Duration.Inf) } +import IROp.* /** * @param children: SequenceOp[SequenceOp.NaiveEval, DoWhileOp] @@ -124,10 +134,6 @@ case class DoWhileOp(toCmp: DB, override val children:IROp[Any]*)(using JITOptio }) () } -import scala.concurrent.{ ExecutionContext, Future } -import scala.concurrent.duration.Duration -given ExecutionContext = ExecutionContext.global - /** * @param code * @param children: [Any*] @@ -136,10 +142,7 @@ case class SequenceOp(override val code: OpCode, override val children:IROp[Any] override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[Any]]): Any = opFns.map(o => o(storageManager)) override def run(storageManager: StorageManager): Any = - if code == OpCode.EVAL_SN then - Await.result(Future.sequence(children.map(o => Future(o.run(storageManager)))), Duration.Inf) - else - children.map(o => o.run(storageManager)) + runSequence(storageManager, children, inParallel = code == OpCode.EVAL_SN) } case class UpdateDiscoveredOp()(using JITOptions) extends IROp[Any] { @@ -265,7 +268,7 @@ case class UnionOp(override val code: OpCode, override val children:IROp[EDB]*)( override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[EDB]]): EDB = storageManager.union(opFns.map(o => o(storageManager))) override def run(storageManager: StorageManager): EDB = - storageManager.union(children.map(o => o.run(storageManager))) + storageManager.union(runSequence(storageManager, children, inParallel = false)) } /** @@ -296,7 +299,7 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children // ) // TODO: change children.length from 3 if (jitOptions.sortOrder == SortOrder.Unordered || jitOptions.sortOrder == SortOrder.Badluck || children.length < 3 || jitOptions.granularity.flag != OpCode.OTHER) // If not only interpreting, then don't optimize since we are waiting for the optimized version to compile - storageManager.union(children.map((s: ProjectJoinFilterOp) => s.run(storageManager))) + storageManager.union(runSequence(storageManager, children, inParallel = false)) else val (sortedChildren, newK) = JoinIndexes.getPresort( children, @@ -305,7 +308,7 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children k, storageManager ) - storageManager.union(sortedChildren.map((s: ProjectJoinFilterOp) => s.run(storageManager))) + storageManager.union(runSequence(storageManager, sortedChildren, inParallel = false)) } /** * @param children: [Union|Scan, Scan] From bc13fc50a18f6ec527d68296b9c4255f809f5724 Mon Sep 17 00:00:00 2001 From: Guillaume Martres Date: Fri, 23 Feb 2024 19:48:09 +0100 Subject: [PATCH 3/5] Only run N-1 children in separate threads. --- .../scala/datalog/execution/ir/IROp.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/main/scala/datalog/execution/ir/IROp.scala b/src/main/scala/datalog/execution/ir/IROp.scala index 059de8c8..a1b88121 100644 --- a/src/main/scala/datalog/execution/ir/IROp.scala +++ b/src/main/scala/datalog/execution/ir/IROp.scala @@ -8,7 +8,7 @@ import datalog.tools.Debug import datalog.tools.Debug.debug import java.util.concurrent.atomic.AtomicReference -import scala.collection.mutable +import scala.collection.{immutable, mutable} import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} import scala.reflect.ClassTag @@ -55,11 +55,20 @@ abstract class IROp[T](val children: IROp[T]*)(using val jitOptions: JITOptions, } object IROp { given ExecutionContext = ExecutionContext.global - def runSequence[T](storageManager: StorageManager, seq: Seq[IROp[T]], inParallel: Boolean = false): Seq[T] = + def runSequence[T: ClassTag](storageManager: StorageManager, seq: Seq[IROp[T]], inParallel: Boolean = false): Seq[T] = + if seq.length == 1 then + return immutable.ArraySeq.unsafeWrapArray(Array(seq.head.run(storageManager))) if inParallel == false then - seq.map(o => o.run(storageManager)) - else - Await.result(Future.sequence(seq.map(o => Future(o.run(storageManager)))), Duration.Inf) + return seq.map(_.run(storageManager)) + val futures = immutable.ArraySeq.newBuilder[Future[T]] + futures.sizeHint(seq.length) + // Spawn threads for the N - 1 first children + seq.view.init.foreach: irOp => + futures += Future(irOp.run(storageManager)) + // Run the last child on the current thread. + val last = seq.last.run(storageManager) + futures += Future(last) + Await.result(Future.sequence(futures.result()), Duration.Inf) } import IROp.* From c2f128a387b8d3514d1f866bc94e818c1f368f06 Mon Sep 17 00:00:00 2001 From: Guillaume Martres Date: Fri, 23 Feb 2024 20:07:58 +0100 Subject: [PATCH 4/5] wip --- .../datalog/execution/LambdaCompiler.scala | 17 +++++++++- .../scala/datalog/execution/ir/IROp.scala | 32 +++++++++++-------- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/main/scala/datalog/execution/LambdaCompiler.scala b/src/main/scala/datalog/execution/LambdaCompiler.scala index 0361ed35..ffef0fd0 100644 --- a/src/main/scala/datalog/execution/LambdaCompiler.scala +++ b/src/main/scala/datalog/execution/LambdaCompiler.scala @@ -9,6 +9,8 @@ import org.glavo.classfile.CodeBuilder import java.lang.invoke.MethodType import java.util.concurrent.atomic.AtomicInteger import scala.collection.{immutable, mutable} +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} import scala.quoted.* /** @@ -90,7 +92,20 @@ class LambdaCompiler(val storageManager: StorageManager)(using JITOptions) exten case SequenceOp(label, children:_*) => val cOps: Array[CompiledFn[Any]] = children.map(compile).toArray - cOps.length match + assert(false, "This is never triggered") + if label == OpCode.EVAL_SN then + // TODO: optimize by directly using the underlying Java stuff. + sm => { + given ExecutionContext = ExecutionContext.global + val futures = new mutable.ArraySeq.ofRef(new Array[Future[Any]](cOps.length)) + var i = 0 + while (i < cOps.length) { + futures(i) = Future(cOps(i)(sm)) + i += 1 + } + Await.result(Future.sequence(futures), Duration.Inf) + } + else cOps.length match case 1 => cOps(0) case 2 => diff --git a/src/main/scala/datalog/execution/ir/IROp.scala b/src/main/scala/datalog/execution/ir/IROp.scala index a1b88121..a551eb83 100644 --- a/src/main/scala/datalog/execution/ir/IROp.scala +++ b/src/main/scala/datalog/execution/ir/IROp.scala @@ -55,18 +55,18 @@ abstract class IROp[T](val children: IROp[T]*)(using val jitOptions: JITOptions, } object IROp { given ExecutionContext = ExecutionContext.global - def runSequence[T: ClassTag](storageManager: StorageManager, seq: Seq[IROp[T]], inParallel: Boolean = false): Seq[T] = + def runFns[T: ClassTag](storageManager: StorageManager, seq: Seq[StorageManager => T], inParallel: Boolean = false): Seq[T] = if seq.length == 1 then - return immutable.ArraySeq.unsafeWrapArray(Array(seq.head.run(storageManager))) + return immutable.ArraySeq.unsafeWrapArray(Array(seq.head(storageManager))) if inParallel == false then - return seq.map(_.run(storageManager)) + return seq.map(_(storageManager)) val futures = immutable.ArraySeq.newBuilder[Future[T]] futures.sizeHint(seq.length) // Spawn threads for the N - 1 first children - seq.view.init.foreach: irOp => - futures += Future(irOp.run(storageManager)) + seq.view.init.foreach: op => + futures += Future(op(storageManager)) // Run the last child on the current thread. - val last = seq.last.run(storageManager) + val last = seq.last(storageManager) futures += Future(last) Await.result(Future.sequence(futures.result()), Duration.Inf) } @@ -148,10 +148,12 @@ case class DoWhileOp(toCmp: DB, override val children:IROp[Any]*)(using JITOptio * @param children: [Any*] */ case class SequenceOp(override val code: OpCode, override val children:IROp[Any]*)(using JITOptions) extends IROp[Any](children:_*) { + val inParallel: Boolean = code == OpCode.EVAL_SN + override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[Any]]): Any = - opFns.map(o => o(storageManager)) + runFns(storageManager, opFns, inParallel = inParallel) override def run(storageManager: StorageManager): Any = - runSequence(storageManager, children, inParallel = code == OpCode.EVAL_SN) + runFns(storageManager, children.map(_.run), inParallel = inParallel) } case class UpdateDiscoveredOp()(using JITOptions) extends IROp[Any] { @@ -274,10 +276,12 @@ case class UnionOp(override val code: OpCode, override val children:IROp[EDB]*)( var compiledFnIndexed: Future[CompiledFnIndexed[EDB]] = null var blockingCompiledFnIndexed: CompiledFnIndexed[EDB] = null + val inParallel = false + override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[EDB]]): EDB = - storageManager.union(opFns.map(o => o(storageManager))) + storageManager.union(runFns(storageManager, opFns, inParallel = inParallel)) override def run(storageManager: StorageManager): EDB = - storageManager.union(runSequence(storageManager, children, inParallel = false)) + storageManager.union(runFns(storageManager, children.map(_.run), inParallel = inParallel)) } /** @@ -290,8 +294,10 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children var compiledFnIndexed: Future[CompiledFnIndexed[EDB]] = null // var compiledFnIndexed: java.util.concurrent.Future[CompiledFnIndexed[EDB]] = null // for now not filled out bc not planning on compiling higher than this + val inParallel: Boolean = false + override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[EDB]]): EDB = - storageManager.union(opFns.map(o => o(storageManager))) + storageManager.union(runFns(storageManager, opFns, inParallel = inParallel)) override def run(storageManager: StorageManager): EDB = @@ -308,7 +314,7 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children // ) // TODO: change children.length from 3 if (jitOptions.sortOrder == SortOrder.Unordered || jitOptions.sortOrder == SortOrder.Badluck || children.length < 3 || jitOptions.granularity.flag != OpCode.OTHER) // If not only interpreting, then don't optimize since we are waiting for the optimized version to compile - storageManager.union(runSequence(storageManager, children, inParallel = false)) + storageManager.union(runFns(storageManager, children.map(_.run), inParallel = inParallel)) else val (sortedChildren, newK) = JoinIndexes.getPresort( children, @@ -317,7 +323,7 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children k, storageManager ) - storageManager.union(runSequence(storageManager, sortedChildren, inParallel = false)) + storageManager.union(runFns(storageManager, sortedChildren.map(_.run), inParallel = inParallel)) } /** * @param children: [Union|Scan, Scan] From dfba8efae15bfe7e266edc14acf8eab7b106fd94 Mon Sep 17 00:00:00 2001 From: Guillaume Martres Date: Fri, 23 Feb 2024 20:23:17 +0100 Subject: [PATCH 5/5] Parallelize the lambda backend This is 15% faster on datalog.benchmarks.examples.cspa10k_optimized.jit_indexed_sel__0_blocking_DELTA_lambda_EOL for me. --- .../datalog/execution/LambdaCompiler.scala | 22 +++++++---------- .../scala/datalog/execution/ir/IROp.scala | 24 +++++++++++-------- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/main/scala/datalog/execution/LambdaCompiler.scala b/src/main/scala/datalog/execution/LambdaCompiler.scala index ffef0fd0..379bfb26 100644 --- a/src/main/scala/datalog/execution/LambdaCompiler.scala +++ b/src/main/scala/datalog/execution/LambdaCompiler.scala @@ -12,6 +12,7 @@ import scala.collection.{immutable, mutable} import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} import scala.quoted.* +import scala.reflect.{classTag, ClassTag} /** * Separate out compile logic from StagedExecutionEngine @@ -19,7 +20,9 @@ import scala.quoted.* class LambdaCompiler(val storageManager: StorageManager)(using JITOptions) extends StagedCompiler(storageManager) { given staging.Compiler = jitOptions.dotty /** Convert a Seq of lambdas into a lambda returning a Seq. */ - def seqToLambda[T](seq: Seq[StorageManager => T]): StorageManager => Seq[T] = + def seqToLambda[T](seq: Seq[StorageManager => T], inParallel: Boolean = false): StorageManager => Seq[T] = + if inParallel then + return sm => IROp.runFns(sm, seq, inParallel)(using classTag[AnyRef].asInstanceOf[ClassTag[T]]) seq match case seq: immutable.ArraySeq.ofRef[_] => val arr = unsafeArrayToLambda(seq.unsafeArray) @@ -93,18 +96,9 @@ class LambdaCompiler(val storageManager: StorageManager)(using JITOptions) exten case SequenceOp(label, children:_*) => val cOps: Array[CompiledFn[Any]] = children.map(compile).toArray assert(false, "This is never triggered") - if label == OpCode.EVAL_SN then + if irTree.runInParallel then // TODO: optimize by directly using the underlying Java stuff. - sm => { - given ExecutionContext = ExecutionContext.global - val futures = new mutable.ArraySeq.ofRef(new Array[Future[Any]](cOps.length)) - var i = 0 - while (i < cOps.length) { - futures(i) = Future(cOps(i)(sm)) - i += 1 - } - Await.result(Future.sequence(futures), Duration.Inf) - } + sm => IROp.runFns(sm, immutable.ArraySeq.unsafeWrapArray(cOps), inParallel = true) else cOps.length match case 1 => cOps(0) @@ -187,11 +181,11 @@ class LambdaCompiler(val storageManager: StorageManager)(using JITOptions) exten else (children, k) - val compiledOps = seqToLambda(sortedChildren.map(compile)) + val compiledOps = seqToLambda(sortedChildren.map(compile), inParallel = irTree.runInParallel) sm => sm.union(compiledOps(sm)) case UnionOp(label, children: _*) => - val compiledOps = seqToLambda(children.map(compile)) + val compiledOps = seqToLambda(children.map(compile), inParallel = irTree.runInParallel) sm => sm.union(compiledOps(sm)) case DiffOp(children: _*) => diff --git a/src/main/scala/datalog/execution/ir/IROp.scala b/src/main/scala/datalog/execution/ir/IROp.scala index a551eb83..94c0829c 100644 --- a/src/main/scala/datalog/execution/ir/IROp.scala +++ b/src/main/scala/datalog/execution/ir/IROp.scala @@ -40,6 +40,9 @@ abstract class IROp[T](val children: IROp[T]*)(using val jitOptions: JITOptions, var blockingCompiledFn: CompiledFn[T] = null // for when we're blocking and not ahead-of-time, so might as well skip the future var compiledSnippetContinuationFn: (StorageManager, Seq[StorageManager => T]) => T = null + /** Should the children of this op be run in parallel? */ + val runInParallel: Boolean = false + /** * Add continuation to revert control flow to the interpret method, which checks for optimizations/deoptimizations */ @@ -148,12 +151,12 @@ case class DoWhileOp(toCmp: DB, override val children:IROp[Any]*)(using JITOptio * @param children: [Any*] */ case class SequenceOp(override val code: OpCode, override val children:IROp[Any]*)(using JITOptions) extends IROp[Any](children:_*) { - val inParallel: Boolean = code == OpCode.EVAL_SN + override val runInParallel: Boolean = code == OpCode.EVAL_SN override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[Any]]): Any = - runFns(storageManager, opFns, inParallel = inParallel) + runFns(storageManager, opFns, inParallel = runInParallel) override def run(storageManager: StorageManager): Any = - runFns(storageManager, children.map(_.run), inParallel = inParallel) + runFns(storageManager, children.map(_.run), inParallel = runInParallel) } case class UpdateDiscoveredOp()(using JITOptions) extends IROp[Any] { @@ -276,12 +279,12 @@ case class UnionOp(override val code: OpCode, override val children:IROp[EDB]*)( var compiledFnIndexed: Future[CompiledFnIndexed[EDB]] = null var blockingCompiledFnIndexed: CompiledFnIndexed[EDB] = null - val inParallel = false + override val runInParallel = true override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[EDB]]): EDB = - storageManager.union(runFns(storageManager, opFns, inParallel = inParallel)) + storageManager.union(runFns(storageManager, opFns, inParallel = runInParallel)) override def run(storageManager: StorageManager): EDB = - storageManager.union(runFns(storageManager, children.map(_.run), inParallel = inParallel)) + storageManager.union(runFns(storageManager, children.map(_.run), inParallel = runInParallel)) } /** @@ -294,10 +297,11 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children var compiledFnIndexed: Future[CompiledFnIndexed[EDB]] = null // var compiledFnIndexed: java.util.concurrent.Future[CompiledFnIndexed[EDB]] = null // for now not filled out bc not planning on compiling higher than this - val inParallel: Boolean = false + + override val runInParallel = true override def run_continuation(storageManager: StorageManager, opFns: Seq[CompiledFn[EDB]]): EDB = - storageManager.union(runFns(storageManager, opFns, inParallel = inParallel)) + storageManager.union(runFns(storageManager, opFns, inParallel = runInParallel)) override def run(storageManager: StorageManager): EDB = @@ -314,7 +318,7 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children // ) // TODO: change children.length from 3 if (jitOptions.sortOrder == SortOrder.Unordered || jitOptions.sortOrder == SortOrder.Badluck || children.length < 3 || jitOptions.granularity.flag != OpCode.OTHER) // If not only interpreting, then don't optimize since we are waiting for the optimized version to compile - storageManager.union(runFns(storageManager, children.map(_.run), inParallel = inParallel)) + storageManager.union(runFns(storageManager, children.map(_.run), inParallel = runInParallel)) else val (sortedChildren, newK) = JoinIndexes.getPresort( children, @@ -323,7 +327,7 @@ case class UnionSPJOp(rId: RelationId, var k: JoinIndexes, override val children k, storageManager ) - storageManager.union(runFns(storageManager, sortedChildren.map(_.run), inParallel = inParallel)) + storageManager.union(runFns(storageManager, sortedChildren.map(_.run), inParallel = runInParallel)) } /** * @param children: [Union|Scan, Scan]