Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test if cassandra sql enricher integration works on nu 1.13 #6754

Draft
wants to merge 12 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ val monocleV = "2.1.0"
val jmxPrometheusJavaagentV = "0.18.0"
val wireMockV = "2.35.0"
val findBugsV = "3.0.2"
val igniteV = "2.10.0"
val cassandraDriverV = "4.13.0"

// depending on scala version one of this jar lays in Flink lib dir
def flinkLibScalaDeps(scalaVersion: String, configurations: Option[String] = None) = forScalaVersion(
Expand Down Expand Up @@ -1647,13 +1649,19 @@ lazy val sqlComponents = (project in component("sql"))
.settings(
name := "nussknacker-sql",
libraryDependencies ++= Seq(
"com.zaxxer" % "HikariCP" % hikariCpV,
"com.zaxxer" % "HikariCP" % hikariCpV,
// It won't run on Java 16 as Hikari will fail while trying to load IgniteJdbcThinDriver https://issues.apache.org/jira/browse/IGNITE-14888
"org.apache.ignite" % "ignite-core" % "2.10.0" % Provided,
"org.apache.ignite" % "ignite-indexing" % "2.10.0" % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.hsqldb" % "hsqldb" % hsqldbV % "test",
),
"org.apache.ignite" % "ignite-core" % igniteV % "test",
"org.apache.ignite" % "ignite-indexing" % igniteV % "test",
"com.ing.data" % "cassandra-jdbc-wrapper" % cassandraDriverV % Compile,
"org.apache.cassandra" % "java-driver-core" % "4.18.1" % Compile,
"org.postgresql" % "postgresql" % postgresV % "test",
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.hsqldb" % "hsqldb" % hsqldbV % "test",
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % "test",
"com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % "test",
"com.dimafeng" %% "testcontainers-scala-cassandra" % testContainersScalaV % "test",
)
)
.dependsOn(
componentsUtils % Provided,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pl.touk.nussknacker.sql.db

import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.sql.db.schema.{DbParameterMetaData, TableDefinition, TableMetaData}

import java.sql.{Connection, PreparedStatement, ResultSet}
import scala.collection.mutable.ArrayBuffer
import scala.util.Using

object MetaDataProviderUtils extends LazyLogging {

def createTableMetaData(
tableName: String,
tableDefinition: TableDefinition,
getConnection: () => Connection
): TableMetaData = {
Using.resource(getConnection()) { connection =>
Using.resource(connection.prepareStatement(selectAllFromTableQuery(tableName))) { statement =>
TableMetaData(
Option(tableDefinition),
DbParameterMetaData(statement.getParameterMetaData.getParameterCount)
)
}
}
}

def getQueryResults[T](
connection: Connection,
query: String,
setArgs: List[PreparedStatement => Unit] = Nil
)(f: ResultSet => T): List[T] = {
Using.resource(connection.prepareStatement(query)) { statement =>
logger.debug(s"Executing query: $query")
setArgs.foreach(setArg => setArg(statement))
val resultSet = statement.executeQuery()
val arr = ArrayBuffer.empty[T]
while (resultSet.next()) {
arr += f(resultSet)
}
arr.toList
}
}

private def selectAllFromTableQuery(tableName: String) = s"SELECT * FROM $tableName"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.touk.nussknacker.sql.db.cassandra

import pl.touk.nussknacker.sql.db.MetaDataProviderUtils
import pl.touk.nussknacker.sql.db.query.UpdateResultStrategy
import pl.touk.nussknacker.sql.db.schema._

import java.sql.Connection
import scala.util.Using

class CassandraMetaDataProvider(getConnection: () => Connection) extends JdbcMetaDataProvider(getConnection) {

private val queryHelper = new CassandraQueryHelper(getConnection)

override def getQueryMetaData(query: String, resultStrategyName: String): TableMetaData = {
val updateResultStrategyName = UpdateResultStrategy.name
if (resultStrategyName != updateResultStrategyName) {
// TODO_PAWEL maybe only this one option should be available in dropdown in ui in this case?
throw new NotImplementedError(
s"Generic query typing is not implemented for Cassandra. You can use only '$updateResultStrategyName' result strategy because it does not require typing"
)
}
Using.resource(getConnection()) { connection =>
Using.resource(connection.prepareStatement(query)) { statement =>
TableMetaData(
None, // standard implementation also puts here none when query is update
DbParameterMetaData(statement.getParameterMetaData.getParameterCount)
)
}
}
}

override def getTableMetaData(tableName: String): TableMetaData = {
val tableDefinition = queryHelper.fetchTableMeta(tableName)
MetaDataProviderUtils.createTableMetaData(tableName, tableDefinition, getConnection)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pl.touk.nussknacker.sql.db.cassandra

import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.sql.db.MetaDataProviderUtils
import pl.touk.nussknacker.sql.db.schema.{ColumnDefinition, TableDefinition}

import java.sql.Connection
import scala.util.Using

class CassandraQueryHelper(getConnection: () => Connection) extends LazyLogging {

private val tableSchemaQuery =
"""
|SELECT * FROM system_schema.columns
|WHERE keyspace_name = ?
|AND table_name = ?
|""".stripMargin

def fetchTableMeta(tableName: String): TableDefinition = {
Using.resource(getConnection()) { connection =>
val columnTypes = MetaDataProviderUtils.getQueryResults(
connection = connection,
query = tableSchemaQuery,
setArgs = List(_.setString(1, connection.getSchema), _.setString(2, tableName))
) { r =>
ColumnType(columnName = r.getString("column_name"), cqlType = r.getString("type"))
}
val columnDefinitions = columnTypes.map(columnType => {
val javaClass = CassandraTypeToJavaTypeMapper.getJavaTypeFromCassandraType(columnType.cqlType)
ColumnDefinition(name = columnType.columnName, typing = Typed.typedClass(javaClass))
})
TableDefinition(columnDefinitions)
}
}

private case class ColumnType(
columnName: String,
// list of possible types is here https://cassandra.apache.org/doc/stable/cassandra/cql/types.html
cqlType: String
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pl.touk.nussknacker.sql.db.cassandra

import jdk.jshell.spi.ExecutionControl.NotImplementedException

// implementation based on documentation comment from the top of CassandraResultSet class.
// In case cassandra driver changes this class needs to be changed accordingly.
object CassandraTypeToJavaTypeMapper {

def getJavaTypeFromCassandraType(cassandraType: String): Class[_] = {
cassandraType match {
case "ascii" => classOf[java.lang.String]
case "bigint" => classOf[java.lang.Long]
case "blob" => classOf[java.nio.ByteBuffer]
case "boolean" => classOf[java.lang.Boolean]
case "counter" => classOf[java.lang.Long]
case "date" => classOf[java.sql.Date]
case "decimal" => classOf[java.math.BigDecimal]
case "double" => classOf[java.lang.Double]
case "duration" => classOf[com.datastax.oss.driver.api.core.data.CqlDuration]
case "float" => classOf[java.lang.Float]
case "inet" => classOf[java.net.InetAddress]
case "int" => classOf[java.lang.Integer]
case "list" => classOf[java.util.List[_]]
case "map" => classOf[java.util.Map[_, _]]
case "set" => classOf[java.util.Set[_]]
case "smallint" => classOf[java.lang.Short]
case "text" => classOf[java.lang.String]
case "time" => classOf[java.sql.Time]
case "timestamp" => classOf[java.sql.Timestamp]
case "timeuuid" => classOf[java.util.UUID]
case "tinyint" => classOf[java.lang.Byte]
case "tuple" => classOf[com.datastax.oss.driver.api.core.data.TupleValue]
case "udt" => classOf[com.datastax.oss.driver.api.core.data.UdtValue]
case "uuid" => classOf[java.util.UUID]
case "varchar" => classOf[java.lang.String]
case "varint" => classOf[java.math.BigInteger]
case "vector" => classOf[com.datastax.oss.driver.api.core.data.CqlVector[_]]
case _ => throw new NotImplementedException("Unknown or not implemented in cassandra driver cql type")
}
}

}
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package pl.touk.nussknacker.sql.db.ignite

import pl.touk.nussknacker.sql.db.MetaDataProviderUtils
import pl.touk.nussknacker.sql.db.schema._

import java.sql.Connection
import scala.util.Using

class IgniteMetaDataProvider(getConnection: () => Connection) extends JdbcMetaDataProvider(getConnection) {
private def query(tableName: String) = s"SELECT * FROM $tableName"

private val queryHelper = new IgniteQueryHelper(getConnection)

override def getQueryMetaData(query: String): TableMetaData = throw new NotImplementedError(
"Generic query typing is not implemented for Ignite"
)
// TODO_PAWEL probably it can work for updates, look at implementation of this method for cassandra db
override def getQueryMetaData(query: String, resultStrategyName: String): TableMetaData =
throw new NotImplementedError(
"Generic query typing is not implemented for Ignite"
)

override def getTableMetaData(tableName: String): TableMetaData = {
val tableDefinition =
queryHelper.fetchTablesMeta.getOrElse(tableName, throw new IllegalArgumentException("Table metadata not present"))
Using.resource(getConnection()) { connection =>
Using.resource(connection.prepareStatement(query(tableName))) { statement =>
TableMetaData(
Option(tableDefinition),
DbParameterMetaData(statement.getParameterMetaData.getParameterCount)
)
}
}
MetaDataProviderUtils.createTableMetaData(tableName, tableDefinition, getConnection)
}

override def getSchemaDefinition(): SchemaDefinition = SchemaDefinition(queryHelper.fetchTablesMeta.keys.toList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package pl.touk.nussknacker.sql.db.ignite
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.typed.TypedObjectDefinition
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.sql.db.MetaDataProviderUtils
import pl.touk.nussknacker.sql.db.schema.TableDefinition

import java.sql.{Connection, PreparedStatement, ResultSet}
import scala.collection.mutable.ArrayBuffer
import java.sql.Connection
import scala.util.Using

class IgniteQueryHelper(getConnection: () => Connection) extends LazyLogging {
Expand All @@ -21,38 +21,23 @@ class IgniteQueryHelper(getConnection: () => Connection) extends LazyLogging {

def fetchTablesMeta: Map[String, TableDefinition] = {
Using.resource(getConnection()) { connection =>
getIgniteQueryResults(
connection = connection,
query = tablesInSchemaQuery,
setArgs = List(_.setString(1, connection.getSchema))
) { r =>
(r.getString("TABLE_NAME"), r.getString("COLUMN_NAME"), r.getString("TYPE"), r.getBoolean("AFFINITY_COLUMN"))
}.groupBy { case (tableName, _, _, _) => tableName }
MetaDataProviderUtils
.getQueryResults(
connection = connection,
query = tablesInSchemaQuery,
setArgs = List(_.setString(1, connection.getSchema))
) { r =>
(r.getString("TABLE_NAME"), r.getString("COLUMN_NAME"), r.getString("TYPE"), r.getBoolean("AFFINITY_COLUMN"))
}
.groupBy { case (tableName, _, _, _) => tableName }
.map { case (tableName, entries) =>
val columnTypings = entries.map { case (_, columnName, klassName, _) =>
columnName -> Typed.typedClass(Class.forName(klassName))
}

tableName -> TableDefinition(typedObjectDefinition = TypedObjectDefinition(columnTypings.toMap))
tableName -> TableDefinition.applyList(columnTypings)
}
}
}

private def getIgniteQueryResults[T](
connection: Connection,
query: String,
setArgs: List[PreparedStatement => Unit] = Nil
)(f: ResultSet => T): List[T] = {
Using.resource(connection.prepareStatement(query)) { statement =>
logger.debug(s"Executing query: $query")
setArgs.foreach(setArg => setArg(statement))
val resultSet = statement.executeQuery()
val arr = ArrayBuffer.empty[T]
while (resultSet.next()) {
arr += f(resultSet)
}
arr.toList
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ trait QueryExecutor {

protected def toTypedMap(tableDef: TableDefinition, resultSet: ResultSet): TypedMap = {
val fields = tableDef.columnDefs.map { columnDef =>
columnDef.name -> resultSet.getObject(columnDef.no)
// we could here use method resultSet.getObject(Int) and pass column number as argument
// but in case of ignite db it is not certain which column index corresponds to which column.
columnDef.name -> resultSet.getObject(columnDef.name)
}.toMap
TypedMap(fields)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,16 @@ object ColumnDefinition {

def apply(columnNo: Int, resultMeta: ResultSetMetaData): ColumnDefinition =
ColumnDefinition(
no = columnNo,
name = resultMeta.getColumnName(columnNo),
typing = Typed(Class.forName(resultMeta.getColumnClassName(columnNo)))
)

def apply(columnNo: Int, typing: (String, TypingResult)): ColumnDefinition =
def apply(typing: (String, TypingResult)): ColumnDefinition =
ColumnDefinition(
no = columnNo,
name = typing._1,
typing = typing._2
)

}

final case class ColumnDefinition(no: Int, name: String, typing: TypingResult)
final case class ColumnDefinition(name: String, typing: TypingResult)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ trait DbMetaDataProvider {

def getTableMetaData(tableName: String): TableMetaData

def getQueryMetaData(query: String): TableMetaData
// possible values of resultStrategyName are defined in QueryResultStrategy implementations
def getQueryMetaData(query: String, resultStrategyName: String): TableMetaData

def getSchemaDefinition(): SchemaDefinition
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class JdbcMetaDataProvider(getConnection: () => Connection) extends DbMetaDataPr
DialectMetaData(metaData.getIdentifierQuoteString)
}

def getTableMetaData(tableName: String): TableMetaData = getQueryMetaData(query(tableName))
def getTableMetaData(tableName: String): TableMetaData = getQueryMetaDataImpl(query(tableName))

def getSchemaDefinition(): SchemaDefinition =
Using.resource(getConnection()) { connection =>
Expand All @@ -27,7 +27,10 @@ class JdbcMetaDataProvider(getConnection: () => Connection) extends DbMetaDataPr
SchemaDefinition(results)
}

override def getQueryMetaData(query: String): TableMetaData =
override def getQueryMetaData(query: String, resultStrategyName: String): TableMetaData =
getQueryMetaDataImpl(query)

private def getQueryMetaDataImpl(query: String): TableMetaData = {
Using.resource(getConnection()) { connection =>
Using.resource(connection.prepareStatement(query)) { statement =>
TableMetaData( // For updates getMetaData return null, so TableDefinition is None
Expand All @@ -36,5 +39,6 @@ class JdbcMetaDataProvider(getConnection: () => Connection) extends DbMetaDataPr
)
}
}
}

}
Loading
Loading