From c327a73ce1d09759aa0132d88a3f81af24df810d Mon Sep 17 00:00:00 2001 From: Alex Archambault Date: Wed, 21 Jun 2023 19:09:18 +0200 Subject: [PATCH] Add reproducible ID support like in Almond --- .../apache/spark/sql/almondinternals/Id.scala | 21 +++++++++++++++++++ .../ProgressSparkListener.scala | 3 +-- .../spark/sql/almondinternals/SendLog.scala | 7 +++---- .../spark/sql/almondinternals/StageElem.scala | 3 +-- 4 files changed, 26 insertions(+), 8 deletions(-) create mode 100644 modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/Id.scala diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/Id.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/Id.scala new file mode 100644 index 0000000..92c65c6 --- /dev/null +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/Id.scala @@ -0,0 +1,21 @@ +package org.apache.spark.sql.almondinternals + +import java.util.concurrent.atomic.AtomicInteger +import java.util.{Locale, UUID} + +object Id { + + def generate(): String = + if (useRandomIds) + UUID.randomUUID().toString + else + idCount.incrementAndGet().toString + + private lazy val useRandomIds: Boolean = + Option(System.getenv("ALMOND_USE_RANDOM_IDS")) + .orElse(sys.props.get("almond.ids.random")) + .forall(s => s == "1" || s.toLowerCase(Locale.ROOT) == "true") + + private val idCount = new AtomicInteger(222222) + +} diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala index f01ccae..9cb27e1 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala @@ -1,6 +1,5 @@ package org.apache.spark.sql.almondinternals -import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ ConcurrentHashMap, @@ -29,7 +28,7 @@ final class ProgressSparkListener( private val elems = new ConcurrentHashMap[Int, StageElem] - private val commTargetName = s"cancel-stage-${UUID.randomUUID()}" + private val commTargetName = s"cancel-stage-${Id.generate()}" private var sentInitCode = false diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala index eb1cae7..7154d50 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala @@ -2,7 +2,6 @@ package org.apache.spark.sql.almondinternals import java.io.{BufferedReader, File, FileReader} import java.nio.file.{Files, StandardOpenOption} -import java.util.UUID import almond.interpreter.api.{CommHandler, CommTarget, DisplayData, OutputHandler} @@ -27,8 +26,8 @@ final class SendLog( assert(backOffFactor >= 1.0) - private val commTarget = UUID.randomUUID().toString - private val commId = UUID.randomUUID().toString + private val commTarget = Id.generate() + private val commId = Id.generate() @volatile private var gotAck = false @volatile private var keepReading = true @@ -194,7 +193,7 @@ object SendLog { lazy val sendLog = new SendLog(f, commHandler, prefix = Option(prefix)) - val id = UUID.randomUUID().toString + val id = Id.generate() val data = DisplayData( Map(DisplayData.ContentType.text -> "", DisplayData.ContentType.html -> ""), idOpt = Some(id) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala index 37ecc59..2cc866f 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala @@ -4,7 +4,6 @@ import almond.interpreter.api.OutputHandler import java.util.concurrent.ThreadFactory import java.util.concurrent.ScheduledThreadPoolExecutor -import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration.DurationInt @@ -25,7 +24,7 @@ final class StageElem( else s"${name.take(idx)} at ${name.drop(idx + sep.length)}" } - val displayId = s"stage-info-${UUID.randomUUID()}" + val displayId = s"stage-info-${Id.generate()}" val titleDisplayId = s"$displayId-title" val startedTasks = new AtomicInteger