From 0f7ce52ed0de11194c8a264fdf665b2f901d7a6e Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Wed, 8 May 2024 09:57:23 +0200 Subject: [PATCH] wip --- .github/workflows/main.yml | 2 +- .../spark/connector/cluster/Fixtures.scala | 12 ++++++-- .../spark/connector/ccm/CcmBridge.scala | 17 ++++++++--- .../spark/connector/ccm/CcmConfig.scala | 16 ++-------- .../ccm/mode/ClusterModeExecutor.scala | 19 +++++++----- .../ccm/mode/DebugModeExecutor.scala | 2 -- .../ccm/mode/DeveloperModeExecutor.scala | 2 -- .../ccm/mode/ExistingModeExecutor.scala | 2 -- .../ccm/mode/StandardModeExecutor.scala | 29 +++++++++++-------- 9 files changed, 54 insertions(+), 47 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c9a72fe1f..bdc777a07 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,7 +24,7 @@ jobs: - name: ccm pip installation uses: BSFishy/pip-action@v1 with: - packages: git+https://github.com/riptano/ccm.git@435f3210e16d0b648fbf33d6390d5ab4c9e630d4 + packages: git+https://github.com/jacek-lewandowski/ccm.git@b0d329a7e112042f58a18634a8de8023c5afb4ec - name: Setup Java uses: actions/setup-java@v4 diff --git a/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala b/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala index ccd26118b..da6c42ec4 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cluster/Fixtures.scala @@ -136,9 +136,15 @@ trait AuthCluster extends SingleClusterFixture { "authentication_options.enabled" -> "true" ))) } else { - Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map( - "authenticator" -> "PasswordAuthenticator" - ))) + if (defaultConfig.getCassandraVersion.compareTo(CcmConfig.V5_0_0) >= 0) { + Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map( + "authenticator.class_name" -> "PasswordAuthenticator" + ))) + } else { + Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map( + "authenticator" -> "PasswordAuthenticator" + ))) + } } } diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala index df9b9ed68..f81692ba3 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala @@ -71,7 +71,13 @@ object CcmBridge { private val logger: Logger = LoggerFactory.getLogger(classOf[CcmBridge]) - def execute(cli: CommandLine, javaHome: Option[String] = None): Seq[String] = { + def execute(cli: Seq[String]): Seq[String] = { + val cmdLine = new CommandLine(cli.head) + cli.tail.foreach(cmdLine.addArgument) + execute(cmdLine) + } + + def execute(cli: CommandLine): Seq[String] = { logger.info("Executing: " + cli) val watchDog: ExecuteWatchdog = new ExecuteWatchdog(TimeUnit.MINUTES.toMillis(10)) @@ -92,9 +98,12 @@ object CcmBridge { val streamHandler = new PumpStreamHandler(outStream, errStream) executor.setStreamHandler(streamHandler) executor.setWatchdog(watchDog) - val env = sys.env ++ - (if (sys.env.contains("CCM_JAVA_HOME")) Map("JAVA_HOME" -> sys.env("CCM_JAVA_HOME")) else Map.empty) ++ - javaHome.map(jh => Map("JAVA_HOME" -> jh)).getOrElse(Map.empty) + val env = + if (sys.env.contains("CCM_JAVA_HOME")) { + sys.env + ("JAVA_HOME" -> sys.env("CCM_JAVA_HOME")) + } else { + sys.env + } logger.info(s"Running CCM with JAVA_HOME=${env.get("JAVA_HOME")}") val retValue = executor.execute(cli, env.asJava) diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala index 38cb65f5c..33f28db23 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala @@ -14,7 +14,7 @@ case class CcmConfig( cassandraConfiguration: Map[String, Object] = Map("auto_snapshot" -> "false"), dseConfiguration: Map[String, Object] = Map(), dseRawYaml: Seq[String] = List(), - jvmArgs: Seq[String] = List(), + jvmArgs: Seq[String] = List("-Dcassandra.superuser_setup_delay_ms=0"), ipPrefix: String = "127.0.0.", createOptions: List[String] = List(), dseWorkloads: List[String] = List(), @@ -23,7 +23,7 @@ case class CcmConfig( installDirectory: Option[String] = Option(System.getProperty("ccm.directory")), installBranch: Option[String] = Option(System.getProperty("ccm.branch")), dseEnabled: Boolean = Option(System.getProperty("ccm.dse")).exists(_.toLowerCase == "true"), - javaHome: Option[String] = None, + javaVersion: Option[Int] = None, mode: ClusterMode = ClusterModes.fromEnvVar) { def withSsl(keystorePath: String, keystorePassword: String): CcmConfig = { @@ -69,17 +69,6 @@ case class CcmConfig( } } - private val javaVersion = if (dseEnabled) 8 else if (version.compareTo(V5_0_0) >= 0) 11 else 8 - - def getJavaHome: Option[String] = { - javaHome.orElse { - if (sys.env.contains(s"JAVA${javaVersion}_HOME")) - Some(sys.env(s"JAVA${javaVersion}_HOME")) - else - None - } - } - def ipOfNode(n: Int): String = { ipPrefix + n } @@ -137,6 +126,7 @@ object CcmConfig { // C* versions val V5_0_0: Version = Version.parse("5.0-beta1") + val V4_1_0: Version = Version.parse("4.1.0") val V4_0_0: Version = Version.parse("4.0.0") val V3_6_0: Version = Version.parse("3.6.0") val V3_10: Version = Version.parse("3.10") diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala index f02e32182..272faafd3 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala @@ -13,7 +13,10 @@ private[ccm] trait ClusterModeExecutor { protected val dir: Path - protected val javaHome: Option[String] + protected val javaVersion: Option[Int] = config.javaVersion match { + case None if config.dseEnabled => Some(8) + case other => other + } def create(clusterName: String): Unit @@ -22,8 +25,8 @@ private[ccm] trait ClusterModeExecutor { def remove(): Unit def execute(args: String*): Seq[String] = synchronized { - val command = s"ccm ${args.mkString(" ")} --config-dir=${dir.toFile.getAbsolutePath}" - CcmBridge.execute(CommandLine.parse(command), javaHome) + val command = "ccm" +: args :+ s"--config-dir=${dir.toFile.getAbsolutePath}" + CcmBridge.execute(command) } def executeUnsanitized(args: String*): Seq[String] = synchronized { @@ -33,7 +36,7 @@ private[ccm] trait ClusterModeExecutor { } cli.addArgument("--config-dir=" + dir.toFile.getAbsolutePath) - CcmBridge.execute(cli, javaHome) + CcmBridge.execute(cli) } def getLastRepositoryLogLines(linesCount: Int): Seq[String] = synchronized { @@ -47,20 +50,20 @@ private[ccm] trait ClusterModeExecutor { } def getLastLogLines(path: String, linesCount: Int): Seq[String] = synchronized { - val command = s"tail -$linesCount $path" - CcmBridge.execute(CommandLine.parse(command), javaHome) + val command = Seq("tail", s"-$linesCount", path) + CcmBridge.execute(command) } /** * Waits for the node to become alive. The first check is performed after the first interval. */ - def waitForNode(nodeNo: Int, timeout: FiniteDuration, interval: Duration = 5.seconds): Boolean = { + def waitForNode(nodeNo: Int, timeout: FiniteDuration, interval: Duration = 1.seconds): Boolean = { val deadline = timeout.fromNow while (!deadline.isOverdue()) { - Thread.sleep(interval.toMillis) if (isAlive(nodeNo, interval)) { return true } + Thread.sleep(interval.toMillis) } false; } diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DebugModeExecutor.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DebugModeExecutor.scala index df8a66a4d..ee559c19a 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DebugModeExecutor.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DebugModeExecutor.scala @@ -29,8 +29,6 @@ private[ccm] class DebugModeExecutor(val config: CcmConfig) extends DefaultExecu } } - override val javaHome: Option[String] = config.getJavaHome - // stop nodes, don't remove logs override def remove(): Unit = { execute("stop") diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DeveloperModeExecutor.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DeveloperModeExecutor.scala index 7760952d5..f13f268cd 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DeveloperModeExecutor.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/DeveloperModeExecutor.scala @@ -10,8 +10,6 @@ class DeveloperModeExecutor(val config: CcmConfig) extends DefaultExecutor with override val dir: Path = Files.createDirectories(Paths.get("/tmp/scc_ccm")) - override val javaHome: Option[String] = config.getJavaHome - Runtime.getRuntime.addShutdownHook(new Thread("Serial shutdown hooks thread") { override def run(): Unit = { println(s"\nCCM is running in developer mode. Cluster $clusterName will not be shutdown. It has to be shutdown " + diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ExistingModeExecutor.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ExistingModeExecutor.scala index 6995c631d..76e6432e0 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ExistingModeExecutor.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ExistingModeExecutor.scala @@ -10,8 +10,6 @@ import java.nio.file.{Files, Path} private[ccm] class ExistingModeExecutor(val config: CcmConfig) extends ClusterModeExecutor { override protected val dir: Path = Files.createTempDirectory("test") - override val javaHome: Option[String] = config.getJavaHome - override def create(clusterName: String): Unit = { // do nothing } diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala index cf9c49df1..7094ae0f2 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala @@ -1,6 +1,6 @@ package com.datastax.spark.connector.ccm.mode -import java.io.{File, FileFilter} +import java.io.File import java.nio.file.{Files, Path, Paths} import java.util.concurrent.atomic.AtomicBoolean import com.datastax.oss.driver.api.core.Version @@ -32,16 +32,19 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { if (logsDir.exists() && logsDir.isDirectory) { val stdErrFile = logsDir.listFiles().filter(_.getName.endsWith("stderr.log")).head - logger.error(s"Start command failed, here is the last $linesCount lines of startup-stderr file: \n" + - getLastLogLines(stdErrFile.getAbsolutePath, linesCount).mkString("\n")) + val stdOutFile = logsDir.listFiles().filter(_.getName.endsWith("stdout.log")).head + logger.error(s"logs dir: \n" + logsDir.listFiles().map(_.getName).mkString("\n")) + logger.error(s"Start command failed, here is the last $linesCount lines of startup-stderr and startup-stdout files: \n" + + getLastLogLines(stdErrFile.getAbsolutePath, linesCount).mkString("\n") + getLastLogLines(stdOutFile.getAbsolutePath, linesCount).mkString("\n")) } } } override def start(nodeNo: Int): Unit = { - val formattedJvmArgs = config.jvmArgs.map(arg => s" --jvm_arg=$arg").mkString(" ") + val formattedJvmArgs = config.jvmArgs.map(arg => s"--jvm_arg=$arg") + val formattedJvmVersion = javaVersion.map(v => s"--jvm-version=$v").toSeq try { - execute(s"node$nodeNo", "start", formattedJvmArgs + "-v", "--skip-wait-other-notice") + execute(Seq(s"node$nodeNo", "start", "-v", "--skip-wait-other-notice") ++ formattedJvmArgs ++ formattedJvmVersion :_*) waitForNode(nodeNo) } catch { case NonFatal(e) => @@ -83,12 +86,12 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { if (created.compareAndSet(false, true)) { val options = config.installDirectory .map(dir => config.createOptions :+ s"--install-dir=${new File(dir).getAbsolutePath}") - .orElse(config.installBranch.map(branch => config.createOptions :+ s"-v git:${branch.trim().replaceAll("\"", "")}")) - .getOrElse(config.createOptions :+ s"-v ${adjustCassandraBetaVersion(config.version.toString)}") + .orElse(config.installBranch.map(branch => config.createOptions ++ Seq("-v", s"git:${branch.trim().replaceAll("\"", "")}"))) + .getOrElse(config.createOptions ++ Seq("-v", adjustCassandraBetaVersion(config.version.toString))) val dseFlag = if (config.dseEnabled) Some("--dse") else None - val createArgs = Seq("create", clusterName, "-i", config.ipPrefix, (options ++ dseFlag).mkString(" ")) + val createArgs = Seq("create", clusterName, "-i", config.ipPrefix) ++ options ++ dseFlag // Check installed Directory val repositoryDir = Paths.get( @@ -137,8 +140,10 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { config.cassandraConfiguration.foreach { case (key, value) => execute("updateconf", s"$key:$value") } - if (config.getCassandraVersion.compareTo(Version.V2_2_0) >= 0) { + if (config.getCassandraVersion.compareTo(Version.V2_2_0) >= 0 && config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) < 0) { execute("updateconf", "enable_user_defined_functions:true") + } else if (config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) >= 0) { + execute("updateconf", "user_defined_functions_enabled:true") } if (config.dseEnabled) { config.dseConfiguration.foreach { case (key, value) => @@ -152,8 +157,10 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { } } else { // C* 4.0.0 has materialized views disabled by default - if (config.getCassandraVersion.compareTo(Version.parse("4.0-beta1")) >= 0) { + if (config.getCassandraVersion.compareTo(Version.V4_0_0) >= 0 && config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) < 0) { execute("updateconf", "enable_materialized_views:true") + } else if (config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) >= 0) { + execute("updateconf", "materialized_views_enabled:true") } } } @@ -163,8 +170,6 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor { private[ccm] class StandardModeExecutor(val config: CcmConfig) extends DefaultExecutor { override val dir: Path = Files.createTempDirectory("ccm") - override val javaHome: Option[String] = config.getJavaHome - // remove config directory on shutdown dir.toFile.deleteOnExit() // remove db artifacts