Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro serialization working in glue #34

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ scalaVersion := "2.12.17"
val sparkVersion = "3.3.2"
val awsVersion = "1.12.682"
val testContainersVersion = "0.40.12"
val confluentVersion = "7.2.3"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-avro" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"org.apache.kafka" % "kafka-clients" % "3.3.2",

Expand Down Expand Up @@ -40,6 +42,17 @@ resolvers += "aws-glue-etl-artifacts" at "https://aws-glue-etl-artifacts.s3.amaz
libraryDependencies += "com.amazonaws" % "AWSGlueETL" % "1.0.0" % Provided
*/

resolvers +="confluent" at "https://packages.confluent.io/maven/"

libraryDependencies ++= Seq(
("io.confluent" % "kafka-avro-serializer" % confluentVersion)
.exclude("com.fasterxml.jackson.module","jackson-module-scala_2.13")
.exclude("org.scala-lang.modules", "scala-collection-compat_2.13"),
("io.confluent" % "kafka-schema-registry" % confluentVersion)
.exclude("com.fasterxml.jackson.module", "jackson-module-scala_2.13")
.exclude("org.scala-lang.modules", "scala-collection-compat_2.13")
)

assembly / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.metabolic.data.core.services.schema

import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.avro.functions.{from_avro, to_avro}
import org.apache.spark.sql.functions.{col, expr, struct, udf}
import org.json.JSONObject
import scalaj.http.{Http, HttpResponse}

import java.nio.ByteBuffer
import java.util
import java.util.Base64
import scala.collection.JavaConverters._
class CCloudSchemaRegistryService(schemaRegistryUrl: String, srApiKey: String, srApiSecret: String) extends Logging {

private val props: util.Map[String, String] = Map(
"basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> s"$srApiKey:$srApiSecret"
).asJava

// UDF function
private val binaryToStringUDF = udf((x: Array[Byte]) => BigInt(x).toString())

def deserialize(topic: String, df: DataFrame): DataFrame = {
// Get latest schema
val avroSchema = getLastSchemaVersion(topic + "-value")

// Remove first 5 bytes from value
val dfFixed = df.withColumn("fixedValue", expr("substring(value, 6)"))

// Get schema id from value
val dfFixedId = dfFixed.withColumn("valueSchemaId", binaryToStringUDF(expr("value")))

// Deserialize data
val decoded_output = dfFixedId.select(
from_avro(col("fixedValue"), avroSchema.get)
.alias("value")
)
decoded_output.select("value.*")
}


def serialize(topic: String, df: DataFrame): DataFrame = {

val schemaAvro = new AvroSchema(SchemaConverters.toAvroType(df.schema, recordName = "Envelope", nameSpace = topic))
val schemaId = register(topic + "-value", schemaAvro.toString)


// Serialize data to Avro format
val serializedDF = df.select(to_avro(struct(df.columns.map(col): _*), schemaAvro.toString).alias("value"))

// Add magic byte & schema id to the serialized data
val addHeaderUDF = udf { (value: Array[Byte]) =>
val magicByte: Byte = 0x0 // Assuming no magic byte is used
val idBytes: Array[Byte] = ByteBuffer.allocate(4).putInt(schemaId.get).array()
ByteBuffer.allocate(1 + idBytes.length + value.length)
.put(magicByte)
.put(idBytes)
.put(value)
.array()
}

// Apply the UDF to add header to each row
val finalDF = serializedDF.withColumn("value", addHeaderUDF(col("value")))

finalDF

}

private def register(subject: String, schema: String): Option[Int] = {
val body = schema
val request = s"$schemaRegistryUrl/subjects/$subject/versions"
logger.info(s"Register schema for subject $subject")
val credentials = s"$srApiKey:$srApiSecret"
val base64Credentials = Base64.getEncoder.encodeToString(credentials.getBytes("utf-8"))

try {
val httpResponse: HttpResponse[String] = Http(request)
.header("content-type", "application/octet-stream")
.header("Authorization", s"Basic $base64Credentials")
.postData(body.getBytes)
.asString

if (httpResponse.code == 200) {
val jsonResponse = new JSONObject(httpResponse.body)
val id = jsonResponse.getInt("id")
logger.info(s"Schema registered for subject $subject with id: $id")
Some(id)
} else {
logger.info(s"Error registering subject $subject: ${httpResponse.code} ${httpResponse.body}")
throw new RuntimeException(s"Error registering subject $subject: ${httpResponse.code} ${httpResponse.body}")
}
} catch {
case e: Exception =>
logger.info("Error in registering schema: " + e.getMessage)
throw e
}
}

private def getLastSchemaVersion(subject: String): Option[String] = {
val request = s"$schemaRegistryUrl/subjects/$subject/versions/latest"
logger.info(s"Getting schema for subject $subject")
val credentials = s"$srApiKey:$srApiSecret"
val base64Credentials = Base64.getEncoder.encodeToString(credentials.getBytes("utf-8"))

try {
val httpResponse: HttpResponse[String] = Http(request)
.header("Authorization", s"Basic $base64Credentials")
.asString

if (httpResponse.code == 200) {
val jsonResponse = new JSONObject(httpResponse.body)
val schema = jsonResponse.getString("schema")
Some(schema)
} else {
logger.info(s"Error getting subject $subject: ${httpResponse.code} ${httpResponse.body}")
throw new RuntimeException(s"Error registering subject $subject: ${httpResponse.code} ${httpResponse.body}")
}
} catch {
case e: Exception =>
logger.info("Error in getting schema: " + e.getMessage)
throw e
}
}
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.metabolic.data.core.services.spark.filter

import DataLakeDepth.DataLakeDepth
import org.apache.commons.lang.NotImplementedException
import org.apache.logging.log4j.scala.Logging
import org.joda.time.DateTime

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.metabolic.data.core.services.spark.reader.stream

import com.metabolic.data.core.services.schema.CCloudSchemaRegistryService
import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import org.apache.spark.sql.functions.{col, schema_of_json}
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}

class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, topic: String, consumerGroup: String = "spark")
class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, topic: String,
schemaRegistryUrl: String, srApiKey: String, srApiSecret: String, schemaRegistry: Option[String], consumerGroup: String = "spark")
extends DataframeUnifiedReader {

override val input_identifier: String = topic
Expand Down Expand Up @@ -73,7 +74,7 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t
val input = setStreamAuthentication(plain)
.load()

input.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
deserialize(input)

}

Expand All @@ -91,12 +92,24 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t
val input = setDFAuthentication(plain)
.load()

input.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
deserialize(input)
}

private def deserialize(input: DataFrame): DataFrame = {
schemaRegistry match {
case Some("avro") => {
new CCloudSchemaRegistryService(schemaRegistryUrl, srApiKey, srApiSecret).deserialize(topic, input)
}
case _ => {
input.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
}
}
}

}

object KafkaReader {
def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String) = new KafkaReader(servers, apiKey, apiSecret, topic)
def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String, schemaRegistryUrl: String,
srApiKey: String, srApiSecret: String, schemaRegistry: Option[String]) =
new KafkaReader(servers, apiKey, apiSecret, topic, schemaRegistryUrl, srApiKey, srApiSecret, schemaRegistry)
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.metabolic.data.core.services.spark.writer.stream

import com.metabolic.data.core.services.schema.CCloudSchemaRegistryService
import com.metabolic.data.core.services.spark.writer.DataframeUnifiedWriter
import com.metabolic.data.mapper.domain.io.WriteMode
import com.metabolic.data.mapper.domain.io.WriteMode.WriteMode
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SaveMode}

class KafkaWriter(servers: Seq[String], apiKey: String, apiSecret: String, topic: String,
idColumnName: Option[String] = None, val checkpointLocation: String)
idColumnName: Option[String] = None, val checkpointLocation: String,
schemaRegistryUrl: String, srApiKey: String, srApiSecret: String, schemaRegistry: Option[String])
extends DataframeUnifiedWriter {

override val output_identifier: String = topic
Expand All @@ -16,9 +18,15 @@ class KafkaWriter(servers: Seq[String], apiKey: String, apiSecret: String, topic

override def writeStream(df: DataFrame): StreamingQuery = {

val kafkaDf = idColumnName match {
case Some(c) => df.selectExpr(s"$c as key", "to_json(struct(*)) as value")
case None => df.selectExpr("to_json(struct(*)) as value")
val kafkaDf = schemaRegistry match {
case Some("avro") =>
new CCloudSchemaRegistryService(schemaRegistryUrl, srApiKey, srApiSecret).serialize(output_identifier, df)
case _ => {
idColumnName match {
case Some(c) => df.selectExpr(s"$c as key", "to_json(struct(*)) as value")
case None => df.selectExpr("to_json(struct(*)) as value")
}
}
}

kafkaDf
Expand All @@ -39,9 +47,15 @@ class KafkaWriter(servers: Seq[String], apiKey: String, apiSecret: String, topic

override def writeBatch(df: DataFrame): Unit = {

val kafkaDf = idColumnName match {
case Some(c) => df.selectExpr(s"$c as key", "to_json(struct(*)) as value")
case None => df.selectExpr("to_json(struct(*)) as value")
val kafkaDf = schemaRegistry match {
case Some("avro") =>
new CCloudSchemaRegistryService(schemaRegistryUrl, srApiKey, srApiSecret).serialize(output_identifier, df)
case _ => {
idColumnName match {
case Some(c) => df.selectExpr(s"$c as key", "to_json(struct(*)) as value")
case None => df.selectExpr("to_json(struct(*)) as value")
}
}
}

kafkaDf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ object MetabolicReader extends Logging {
logger.info(s"Reading stream source ${streamSource.name} from ${streamSource.topic}")

streamSource.format match {
case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic, jobName)
case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic,
streamSource.schemaRegistryUrl, streamSource.srApiKey,
streamSource.srApiSecret, streamSource.schemaRegistry, jobName)
.read(spark, mode)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ object MetabolicWriter extends Logging {
logger.info(s"Writing Kafka sink ${streamSink.topic}")

new KafkaWriter(streamSink.servers, streamSink.apiKey, streamSink.apiSecret,
streamSink.topic, streamSink.idColumnName, checkpointPath)
streamSink.topic, streamSink.idColumnName, checkpointPath,
streamSink.schemaRegistryUrl, streamSink.srApiKey,
streamSink.srApiSecret, streamSink.schemaRegistry)
.write(_df, mode)

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
package com.metabolic.data.mapper.domain

case class KafkaConnection(servers: Option[Seq[String]], key: Option[String], secret: Option[String])
case class KafkaConnection(servers: Option[Seq[String]], key: Option[String], secret: Option[String],
schemaRegistryUrl: Option[String], srKey: Option[String], srSecret: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ case class StreamSink(name: String,
apiSecret: String,
topic: String,
idColumnName: Option[String],
schemaRegistryUrl: String,
srApiKey: String,
srApiSecret: String,
schemaRegistry: Option[String],
format: IOFormat = KAFKA,
ops: Seq[SinkOp])
extends Sink {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ case class StreamSource(name: String,
key: String,
secret: String,
topic: String,
schemaRegistryUrl: String,
srApiKey: String,
srApiSecret: String,
schemaRegistry: Option[String],
format: IOFormat = KAFKA,
ops: Seq[SourceOp] = Seq.empty)
extends Source
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ case class SinkFormatParser()(implicit val region: Regions) extends FormatParser
val servers = kafkaConfig.servers.get
val apiKey = kafkaConfig.key.get
val apiSecret = kafkaConfig.secret.get
val schemaRegistryUrl = kafkaConfig.schemaRegistryUrl.getOrElse("")
val srApiKey = kafkaConfig.srKey.getOrElse("")
val srApiSecret = kafkaConfig.srSecret.getOrElse("")
val schemaRegistry = if (config.hasPath("schemaRegistry")) {
Option(config.getString("schemaRegistry").toLowerCase())
} else None


val topic = config.getString("topic")

Expand All @@ -135,7 +142,9 @@ case class SinkFormatParser()(implicit val region: Regions) extends FormatParser
Option.empty
}

StreamSink(name, servers, apiKey, apiSecret, topic, idColumnName, IOFormat.KAFKA, ops = ops )
StreamSink(name, servers, apiKey, apiSecret, topic, idColumnName,
schemaRegistryUrl, srApiKey, srApiSecret, schemaRegistry,
IOFormat.KAFKA, ops = ops)
}

private def checkWriteMode(config: HoconConfig): WriteMode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,19 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars
val servers = kafkaConfig.servers.get
val apiKey = kafkaConfig.key.get
val apiSecret = kafkaConfig.secret.get
val schemaRegistryUrl = kafkaConfig.schemaRegistryUrl.getOrElse("")
val srApiKey = kafkaConfig.srKey.getOrElse("")
val srApiSecret = kafkaConfig.srSecret.getOrElse("")
val schemaRegistry = if (config.hasPath("schemaRegistry")) {
Option(config.getString("schemaRegistry").toLowerCase())
} else None


val topic = config.getString("topic")

StreamSource(name, servers, apiKey, apiSecret, topic, IOFormat.KAFKA, ops)
StreamSource(name, servers, apiKey, apiSecret, topic,
schemaRegistryUrl, srApiKey, srApiSecret, schemaRegistry,
IOFormat.KAFKA, ops = ops)
}

private def parseMetastoreSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = {
Expand Down
Loading
Loading