Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jacek-lewandowski committed May 8, 2024
1 parent e53cd59 commit 0f7ce52
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: ccm pip installation
uses: BSFishy/pip-action@v1
with:
packages: git+https://github.com/riptano/ccm.git@435f3210e16d0b648fbf33d6390d5ab4c9e630d4
packages: git+https://github.com/jacek-lewandowski/ccm.git@b0d329a7e112042f58a18634a8de8023c5afb4ec

- name: Setup Java
uses: actions/setup-java@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ trait AuthCluster extends SingleClusterFixture {
"authentication_options.enabled" -> "true"
)))
} else {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator" -> "PasswordAuthenticator"
)))
if (defaultConfig.getCassandraVersion.compareTo(CcmConfig.V5_0_0) >= 0) {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator.class_name" -> "PasswordAuthenticator"
)))
} else {
Seq(sslConf.copy(cassandraConfiguration = sslConf.cassandraConfiguration ++ Map(
"authenticator" -> "PasswordAuthenticator"
)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ object CcmBridge {

private val logger: Logger = LoggerFactory.getLogger(classOf[CcmBridge])

def execute(cli: CommandLine, javaHome: Option[String] = None): Seq[String] = {
def execute(cli: Seq[String]): Seq[String] = {
val cmdLine = new CommandLine(cli.head)
cli.tail.foreach(cmdLine.addArgument)
execute(cmdLine)
}

def execute(cli: CommandLine): Seq[String] = {
logger.info("Executing: " + cli)

val watchDog: ExecuteWatchdog = new ExecuteWatchdog(TimeUnit.MINUTES.toMillis(10))
Expand All @@ -92,9 +98,12 @@ object CcmBridge {
val streamHandler = new PumpStreamHandler(outStream, errStream)
executor.setStreamHandler(streamHandler)
executor.setWatchdog(watchDog)
val env = sys.env ++
(if (sys.env.contains("CCM_JAVA_HOME")) Map("JAVA_HOME" -> sys.env("CCM_JAVA_HOME")) else Map.empty) ++
javaHome.map(jh => Map("JAVA_HOME" -> jh)).getOrElse(Map.empty)
val env =
if (sys.env.contains("CCM_JAVA_HOME")) {
sys.env + ("JAVA_HOME" -> sys.env("CCM_JAVA_HOME"))
} else {
sys.env
}

logger.info(s"Running CCM with JAVA_HOME=${env.get("JAVA_HOME")}")
val retValue = executor.execute(cli, env.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ case class CcmConfig(
cassandraConfiguration: Map[String, Object] = Map("auto_snapshot" -> "false"),
dseConfiguration: Map[String, Object] = Map(),
dseRawYaml: Seq[String] = List(),
jvmArgs: Seq[String] = List(),
jvmArgs: Seq[String] = List("-Dcassandra.superuser_setup_delay_ms=0"),
ipPrefix: String = "127.0.0.",
createOptions: List[String] = List(),
dseWorkloads: List[String] = List(),
Expand All @@ -23,7 +23,7 @@ case class CcmConfig(
installDirectory: Option[String] = Option(System.getProperty("ccm.directory")),
installBranch: Option[String] = Option(System.getProperty("ccm.branch")),
dseEnabled: Boolean = Option(System.getProperty("ccm.dse")).exists(_.toLowerCase == "true"),
javaHome: Option[String] = None,
javaVersion: Option[Int] = None,
mode: ClusterMode = ClusterModes.fromEnvVar) {

def withSsl(keystorePath: String, keystorePassword: String): CcmConfig = {
Expand Down Expand Up @@ -69,17 +69,6 @@ case class CcmConfig(
}
}

private val javaVersion = if (dseEnabled) 8 else if (version.compareTo(V5_0_0) >= 0) 11 else 8

def getJavaHome: Option[String] = {
javaHome.orElse {
if (sys.env.contains(s"JAVA${javaVersion}_HOME"))
Some(sys.env(s"JAVA${javaVersion}_HOME"))
else
None
}
}

def ipOfNode(n: Int): String = {
ipPrefix + n
}
Expand Down Expand Up @@ -137,6 +126,7 @@ object CcmConfig {

// C* versions
val V5_0_0: Version = Version.parse("5.0-beta1")
val V4_1_0: Version = Version.parse("4.1.0")
val V4_0_0: Version = Version.parse("4.0.0")
val V3_6_0: Version = Version.parse("3.6.0")
val V3_10: Version = Version.parse("3.10")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ private[ccm] trait ClusterModeExecutor {

protected val dir: Path

protected val javaHome: Option[String]
protected val javaVersion: Option[Int] = config.javaVersion match {
case None if config.dseEnabled => Some(8)
case other => other
}

def create(clusterName: String): Unit

Expand All @@ -22,8 +25,8 @@ private[ccm] trait ClusterModeExecutor {
def remove(): Unit

def execute(args: String*): Seq[String] = synchronized {
val command = s"ccm ${args.mkString(" ")} --config-dir=${dir.toFile.getAbsolutePath}"
CcmBridge.execute(CommandLine.parse(command), javaHome)
val command = "ccm" +: args :+ s"--config-dir=${dir.toFile.getAbsolutePath}"
CcmBridge.execute(command)
}

def executeUnsanitized(args: String*): Seq[String] = synchronized {
Expand All @@ -33,7 +36,7 @@ private[ccm] trait ClusterModeExecutor {
}
cli.addArgument("--config-dir=" + dir.toFile.getAbsolutePath)

CcmBridge.execute(cli, javaHome)
CcmBridge.execute(cli)
}

def getLastRepositoryLogLines(linesCount: Int): Seq[String] = synchronized {
Expand All @@ -47,20 +50,20 @@ private[ccm] trait ClusterModeExecutor {
}

def getLastLogLines(path: String, linesCount: Int): Seq[String] = synchronized {
val command = s"tail -$linesCount $path"
CcmBridge.execute(CommandLine.parse(command), javaHome)
val command = Seq("tail", s"-$linesCount", path)
CcmBridge.execute(command)
}

/**
* Waits for the node to become alive. The first check is performed after the first interval.
*/
def waitForNode(nodeNo: Int, timeout: FiniteDuration, interval: Duration = 5.seconds): Boolean = {
def waitForNode(nodeNo: Int, timeout: FiniteDuration, interval: Duration = 1.seconds): Boolean = {
val deadline = timeout.fromNow
while (!deadline.isOverdue()) {
Thread.sleep(interval.toMillis)
if (isAlive(nodeNo, interval)) {
return true
}
Thread.sleep(interval.toMillis)
}
false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ private[ccm] class DebugModeExecutor(val config: CcmConfig) extends DefaultExecu
}
}

override val javaHome: Option[String] = config.getJavaHome

// stop nodes, don't remove logs
override def remove(): Unit = {
execute("stop")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ class DeveloperModeExecutor(val config: CcmConfig) extends DefaultExecutor with

override val dir: Path = Files.createDirectories(Paths.get("/tmp/scc_ccm"))

override val javaHome: Option[String] = config.getJavaHome

Runtime.getRuntime.addShutdownHook(new Thread("Serial shutdown hooks thread") {
override def run(): Unit = {
println(s"\nCCM is running in developer mode. Cluster $clusterName will not be shutdown. It has to be shutdown " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import java.nio.file.{Files, Path}
private[ccm] class ExistingModeExecutor(val config: CcmConfig) extends ClusterModeExecutor {
override protected val dir: Path = Files.createTempDirectory("test")

override val javaHome: Option[String] = config.getJavaHome

override def create(clusterName: String): Unit = {
// do nothing
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.datastax.spark.connector.ccm.mode

import java.io.{File, FileFilter}
import java.io.File
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.atomic.AtomicBoolean
import com.datastax.oss.driver.api.core.Version
Expand Down Expand Up @@ -32,16 +32,19 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor {

if (logsDir.exists() && logsDir.isDirectory) {
val stdErrFile = logsDir.listFiles().filter(_.getName.endsWith("stderr.log")).head
logger.error(s"Start command failed, here is the last $linesCount lines of startup-stderr file: \n" +
getLastLogLines(stdErrFile.getAbsolutePath, linesCount).mkString("\n"))
val stdOutFile = logsDir.listFiles().filter(_.getName.endsWith("stdout.log")).head
logger.error(s"logs dir: \n" + logsDir.listFiles().map(_.getName).mkString("\n"))
logger.error(s"Start command failed, here is the last $linesCount lines of startup-stderr and startup-stdout files: \n" +
getLastLogLines(stdErrFile.getAbsolutePath, linesCount).mkString("\n") + getLastLogLines(stdOutFile.getAbsolutePath, linesCount).mkString("\n"))
}
}
}

override def start(nodeNo: Int): Unit = {
val formattedJvmArgs = config.jvmArgs.map(arg => s" --jvm_arg=$arg").mkString(" ")
val formattedJvmArgs = config.jvmArgs.map(arg => s"--jvm_arg=$arg")
val formattedJvmVersion = javaVersion.map(v => s"--jvm-version=$v").toSeq
try {
execute(s"node$nodeNo", "start", formattedJvmArgs + "-v", "--skip-wait-other-notice")
execute(Seq(s"node$nodeNo", "start", "-v", "--skip-wait-other-notice") ++ formattedJvmArgs ++ formattedJvmVersion :_*)
waitForNode(nodeNo)
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -83,12 +86,12 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor {
if (created.compareAndSet(false, true)) {
val options = config.installDirectory
.map(dir => config.createOptions :+ s"--install-dir=${new File(dir).getAbsolutePath}")
.orElse(config.installBranch.map(branch => config.createOptions :+ s"-v git:${branch.trim().replaceAll("\"", "")}"))
.getOrElse(config.createOptions :+ s"-v ${adjustCassandraBetaVersion(config.version.toString)}")
.orElse(config.installBranch.map(branch => config.createOptions ++ Seq("-v", s"git:${branch.trim().replaceAll("\"", "")}")))
.getOrElse(config.createOptions ++ Seq("-v", adjustCassandraBetaVersion(config.version.toString)))

val dseFlag = if (config.dseEnabled) Some("--dse") else None

val createArgs = Seq("create", clusterName, "-i", config.ipPrefix, (options ++ dseFlag).mkString(" "))
val createArgs = Seq("create", clusterName, "-i", config.ipPrefix) ++ options ++ dseFlag

// Check installed Directory
val repositoryDir = Paths.get(
Expand Down Expand Up @@ -137,8 +140,10 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor {
config.cassandraConfiguration.foreach { case (key, value) =>
execute("updateconf", s"$key:$value")
}
if (config.getCassandraVersion.compareTo(Version.V2_2_0) >= 0) {
if (config.getCassandraVersion.compareTo(Version.V2_2_0) >= 0 && config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) < 0) {
execute("updateconf", "enable_user_defined_functions:true")
} else if (config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) >= 0) {
execute("updateconf", "user_defined_functions_enabled:true")
}
if (config.dseEnabled) {
config.dseConfiguration.foreach { case (key, value) =>
Expand All @@ -152,8 +157,10 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor {
}
} else {
// C* 4.0.0 has materialized views disabled by default
if (config.getCassandraVersion.compareTo(Version.parse("4.0-beta1")) >= 0) {
if (config.getCassandraVersion.compareTo(Version.V4_0_0) >= 0 && config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) < 0) {
execute("updateconf", "enable_materialized_views:true")
} else if (config.getCassandraVersion.compareTo(CcmConfig.V4_1_0) >= 0) {
execute("updateconf", "materialized_views_enabled:true")
}
}
}
Expand All @@ -163,8 +170,6 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor {
private[ccm] class StandardModeExecutor(val config: CcmConfig) extends DefaultExecutor {
override val dir: Path = Files.createTempDirectory("ccm")

override val javaHome: Option[String] = config.getJavaHome

// remove config directory on shutdown
dir.toFile.deleteOnExit()
// remove db artifacts
Expand Down

0 comments on commit 0f7ce52

Please sign in to comment.