diff --git a/build.sc b/build.sc index 486f542..9510672 100644 --- a/build.sc +++ b/build.sc @@ -268,7 +268,8 @@ class AlmondSpark(val crossScalaVersion: String) extends CrossSbtModule with Amm core() ) def ivyDeps = super.ivyDeps() ++ Agg( - Deps.jsoniterScalaCore + Deps.jsoniterScalaCore, + Deps.scalatags ) def compileIvyDeps = super.compileIvyDeps() ++ Agg( Deps.ammoniteReplApi, @@ -277,7 +278,6 @@ class AlmondSpark(val crossScalaVersion: String) extends CrossSbtModule with Amm Deps.scalaKernelApi .exclude(("com.lihaoyi", s"ammonite-compiler_$crossScalaVersion")) .exclude(("com.lihaoyi", s"ammonite-repl-api_$crossScalaVersion")), - Deps.scalatags, Deps.sparkSql(scalaVersion()) ) def repositoriesTask = T.task { diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/Id.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/Id.scala new file mode 100644 index 0000000..92c65c6 --- /dev/null +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/Id.scala @@ -0,0 +1,21 @@ +package org.apache.spark.sql.almondinternals + +import java.util.concurrent.atomic.AtomicInteger +import java.util.{Locale, UUID} + +object Id { + + def generate(): String = + if (useRandomIds) + UUID.randomUUID().toString + else + idCount.incrementAndGet().toString + + private lazy val useRandomIds: Boolean = + Option(System.getenv("ALMOND_USE_RANDOM_IDS")) + .orElse(sys.props.get("almond.ids.random")) + .forall(s => s == "1" || s.toLowerCase(Locale.ROOT) == "true") + + private val idCount = new AtomicInteger(222222) + +} diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala index f01ccae..9cb27e1 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/ProgressSparkListener.scala @@ -1,6 +1,5 @@ package org.apache.spark.sql.almondinternals -import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ ConcurrentHashMap, @@ -29,7 +28,7 @@ final class ProgressSparkListener( private val elems = new ConcurrentHashMap[Int, StageElem] - private val commTargetName = s"cancel-stage-${UUID.randomUUID()}" + private val commTargetName = s"cancel-stage-${Id.generate()}" private var sentInitCode = false diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala index eb1cae7..7154d50 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/SendLog.scala @@ -2,7 +2,6 @@ package org.apache.spark.sql.almondinternals import java.io.{BufferedReader, File, FileReader} import java.nio.file.{Files, StandardOpenOption} -import java.util.UUID import almond.interpreter.api.{CommHandler, CommTarget, DisplayData, OutputHandler} @@ -27,8 +26,8 @@ final class SendLog( assert(backOffFactor >= 1.0) - private val commTarget = UUID.randomUUID().toString - private val commId = UUID.randomUUID().toString + private val commTarget = Id.generate() + private val commId = Id.generate() @volatile private var gotAck = false @volatile private var keepReading = true @@ -194,7 +193,7 @@ object SendLog { lazy val sendLog = new SendLog(f, commHandler, prefix = Option(prefix)) - val id = UUID.randomUUID().toString + val id = Id.generate() val data = DisplayData( Map(DisplayData.ContentType.text -> "", DisplayData.ContentType.html -> ""), idOpt = Some(id) diff --git a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala index 37ecc59..2cc866f 100644 --- a/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala +++ b/modules/almond-spark/src/main/scala/org/apache/spark/sql/almondinternals/StageElem.scala @@ -4,7 +4,6 @@ import almond.interpreter.api.OutputHandler import java.util.concurrent.ThreadFactory import java.util.concurrent.ScheduledThreadPoolExecutor -import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration.DurationInt @@ -25,7 +24,7 @@ final class StageElem( else s"${name.take(idx)} at ${name.drop(idx + sep.length)}" } - val displayId = s"stage-info-${UUID.randomUUID()}" + val displayId = s"stage-info-${Id.generate()}" val titleDisplayId = s"$displayId-title" val startedTasks = new AtomicInteger diff --git a/modules/almond-toree-spark/src/main/scala/almond/spark/ToreeSql.scala b/modules/almond-toree-spark/src/main/scala/almond/spark/ToreeSql.scala index c38f843..e1b2724 100644 --- a/modules/almond-toree-spark/src/main/scala/almond/spark/ToreeSql.scala +++ b/modules/almond-toree-spark/src/main/scala/almond/spark/ToreeSql.scala @@ -6,7 +6,7 @@ object ToreeSql { private def sqlMagic(content: String): String = { val tq = "\"\"\"" - s"""_root_.almond.display.Html(_root_.almond.dfrenderer.AlmondDataFrameRenderer.render(spark.sql($tq$content$tq), limit = $sqlLimit))""" + s"""_root_.almond.display.Html(_root_.almond.spark.DataFrameRenderer.render(spark.sql($tq$content$tq), limit = $sqlLimit))""" } def setup(): Unit = {