Skip to content

Commit

Permalink
Add reproducible ID support like in Almond
Browse files Browse the repository at this point in the history
  • Loading branch information
alexarchambault committed Jun 21, 2023
1 parent 18dbad8 commit c327a73
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.spark.sql.almondinternals

import java.util.concurrent.atomic.AtomicInteger
import java.util.{Locale, UUID}

object Id {

def generate(): String =
if (useRandomIds)
UUID.randomUUID().toString
else
idCount.incrementAndGet().toString

private lazy val useRandomIds: Boolean =
Option(System.getenv("ALMOND_USE_RANDOM_IDS"))
.orElse(sys.props.get("almond.ids.random"))
.forall(s => s == "1" || s.toLowerCase(Locale.ROOT) == "true")

private val idCount = new AtomicInteger(222222)

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.apache.spark.sql.almondinternals

import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{
ConcurrentHashMap,
Expand Down Expand Up @@ -29,7 +28,7 @@ final class ProgressSparkListener(

private val elems = new ConcurrentHashMap[Int, StageElem]

private val commTargetName = s"cancel-stage-${UUID.randomUUID()}"
private val commTargetName = s"cancel-stage-${Id.generate()}"

private var sentInitCode = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package org.apache.spark.sql.almondinternals

import java.io.{BufferedReader, File, FileReader}
import java.nio.file.{Files, StandardOpenOption}
import java.util.UUID

import almond.interpreter.api.{CommHandler, CommTarget, DisplayData, OutputHandler}

Expand All @@ -27,8 +26,8 @@ final class SendLog(

assert(backOffFactor >= 1.0)

private val commTarget = UUID.randomUUID().toString
private val commId = UUID.randomUUID().toString
private val commTarget = Id.generate()
private val commId = Id.generate()

@volatile private var gotAck = false
@volatile private var keepReading = true
Expand Down Expand Up @@ -194,7 +193,7 @@ object SendLog {

lazy val sendLog = new SendLog(f, commHandler, prefix = Option(prefix))

val id = UUID.randomUUID().toString
val id = Id.generate()
val data = DisplayData(
Map(DisplayData.ContentType.text -> "", DisplayData.ContentType.html -> ""),
idOpt = Some(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import almond.interpreter.api.OutputHandler

import java.util.concurrent.ThreadFactory
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.duration.DurationInt
Expand All @@ -25,7 +24,7 @@ final class StageElem(
else s"<code>${name.take(idx)}</code> at <code>${name.drop(idx + sep.length)}</code>"
}

val displayId = s"stage-info-${UUID.randomUUID()}"
val displayId = s"stage-info-${Id.generate()}"
val titleDisplayId = s"$displayId-title"

val startedTasks = new AtomicInteger
Expand Down

0 comments on commit c327a73

Please sign in to comment.