Skip to content

Commit

Permalink
small code tide-up
Browse files Browse the repository at this point in the history
adding unit test for the assigned tables
  • Loading branch information
stheppi committed Apr 6, 2017
1 parent 77753c3 commit 02304bc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.kafka.connect.source.SourceConnector
import org.apache.kafka.connect.util.ConnectorUtils

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

/**
* <h1>CassandraSourceConnector</h1>
Expand All @@ -37,7 +36,7 @@ import scala.collection.JavaConverters._
*/
class CassandraSourceConnector extends SourceConnector with StrictLogging {

private var configProps : Option[util.Map[String, String]] = None
private var configProps: Option[util.Map[String, String]] = None
private val configDef = CassandraConfigSource.sourceConfig

/**
Expand All @@ -52,13 +51,11 @@ class CassandraSourceConnector extends SourceConnector with StrictLogging {
*
* @param maxTasks The max number of task workers be can spawn.
* @return a List of configuration properties per worker.
* */
**/
override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
val raw = configProps.get.get(CassandraConfigConstants.SOURCE_KCQL_QUERY).split(";")

val tables = raw.map({
r => Config.parse(r).getSource
}).toList
val tables = raw.map { r => Config.parse(r).getSource }.toList

val numGroups = Math.min(tables.size, maxTasks)

Expand All @@ -67,20 +64,19 @@ class CassandraSourceConnector extends SourceConnector with StrictLogging {

//setup the config for each task and set assigned tables
groups
.filterNot(g => g.isEmpty)
.map(g=> {
val taskConfigs = new java.util.HashMap[String,String]
.withFilter(g => g.nonEmpty)
.map { g =>
val taskConfigs = new java.util.HashMap[String, String](configProps.get)
taskConfigs.put(CassandraConfigConstants.ASSIGNED_TABLES, g.mkString(","))
taskConfigs.putAll(configProps.get)
taskConfigs.toMap.asJava
})
taskConfigs
}
}

/**
* Start the sink and set to configuration.
*
* @param props A map of properties for the connector and worker.
* */
**/
override def start(props: util.Map[String, String]): Unit = {
logger.info(s"Starting Cassandra source task with ${props.toString}.")
configProps = Some(props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package com.datamountaineer.streamreactor.connect.cassandra.config
import com.datamountaineer.streamreactor.connect.cassandra.TestConfig
import org.scalatest.{Matchers, WordSpec}

import scala.collection.JavaConversions._

/**
* Created by [email protected] on 28/04/16.
* stream-reactor
*/
class TestCassandraSourceSettings extends WordSpec with Matchers with TestConfig {
"CassandraSettings should return setting for a source" in {
val taskConfig = CassandraConfigSource(getCassandraConfigSourcePropsBulk)
val taskConfig = CassandraConfigSource(getCassandraConfigSourcePropsBulk)
val assigned = List(TABLE1, TABLE2)
val settings = CassandraSettings.configureSource(taskConfig).toList
settings.size shouldBe 2
Expand All @@ -36,4 +38,19 @@ class TestCassandraSourceSettings extends WordSpec with Matchers with TestConfig
settings(1).routes.getTarget shouldBe TOPIC2
settings(1).bulkImportMode shouldBe true
}

"CassandraSettings should return setting for a source with one table" in {
val map = Map(
CassandraConfigConstants.CONTACT_POINTS -> CONTACT_POINT,
CassandraConfigConstants.KEY_SPACE -> CASSANDRA_KEYSPACE,
CassandraConfigConstants.USERNAME -> USERNAME,
CassandraConfigConstants.PASSWD -> PASSWD,
CassandraConfigConstants.SOURCE_KCQL_QUERY -> "INSERT INTO cassandra-source SELECT * FROM orders PK created",
CassandraConfigConstants.IMPORT_MODE -> CassandraConfigConstants.INCREMENTAL,
CassandraConfigConstants.POLL_INTERVAL -> "1000"
)
val taskConfig = CassandraConfigSource(map)
val settings = CassandraSettings.configureSource(taskConfig).toList
settings.size shouldBe 1
}
}

0 comments on commit 02304bc

Please sign in to comment.