Skip to content

Commit

Permalink
emd
Browse files Browse the repository at this point in the history
  • Loading branch information
berna396 committed Apr 4, 2024
1 parent 427fe7e commit 7089016
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 27 deletions.
8 changes: 2 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ libraryDependencies += "com.amazonaws" % "AWSGlueETL" % "1.0.0" % Provided
resolvers +="confluent" at "https://packages.confluent.io/maven/"

libraryDependencies ++= Seq(
("io.confluent" % "kafka-avro-serializer" % confluentVersion)
.exclude("com.fasterxml.jackson.module","jackson-module-scala_2.13")
.exclude("org.scala-lang.modules", "scala-collection-compat_2.13"),
("io.confluent" % "kafka-schema-registry" % confluentVersion)
.exclude("com.fasterxml.jackson.module", "jackson-module-scala_2.13")
.exclude("org.scala-lang.modules", "scala-collection-compat_2.13")
"io.confluent" % "kafka-avro-serializer" % confluentVersion,
"io.confluent" % "kafka-schema-registry" % confluentVersion
)

assembly / assemblyMergeStrategy := {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,29 @@
package com.metabolic.data.core.services.schema

import com.amazonaws.services.glue.model.AlreadyExistsException
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.{BinaryType, DataType, StringType}
import org.apache.spark.sql.{DataFrame, functions}
import org.apache.spark.sql.avro.functions.{from_avro, to_avro}
import org.apache.spark.sql.functions.{col, expr, lit, struct, udf}
import org.apache.spark.sql.functions.{col, expr, struct, udf}
import org.json.JSONObject
import scalaj.http.{Http, HttpResponse}

import java.nio.ByteBuffer
import java.util
import scala.collection.JavaConverters._
import scalaj.http.{Http, HttpResponse}

import java.util.Base64
import scala.compat.java8.JFunction.func
import scala.collection.JavaConverters._
class CCloudSchemaRegistryService(schemaRegistryUrl: String, srApiKey: String, srApiSecret: String) extends Logging {

private val props: util.Map[String, String] = Map(
"basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> s"$srApiKey:$srApiSecret"
).asJava


private val schemaRegistryClient = new CachedSchemaRegistryClient(new RestService(schemaRegistryUrl), 100, props)

// UDF function
private val binaryToStringUDF = udf((x: Array[Byte]) => BigInt(x).toString())

def deserializeWithAbris(topic: String, df: DataFrame): DataFrame = {
def deserialize(topic: String, df: DataFrame): DataFrame = {
// Get latest schema
val avroSchema = getLastSchemaVersion(topic + "-value")

Expand Down Expand Up @@ -78,7 +69,7 @@ class CCloudSchemaRegistryService(schemaRegistryUrl: String, srApiKey: String, s

}

def register(subject: String, schema: String): Option[Int] = {
private def register(subject: String, schema: String): Option[Int] = {
val body = schema
val request = s"$schemaRegistryUrl/subjects/$subject/versions"
logger.info(s"Register schema for subject $subject")
Expand All @@ -99,16 +90,16 @@ class CCloudSchemaRegistryService(schemaRegistryUrl: String, srApiKey: String, s
Some(id)
} else {
logger.info(s"Error registering subject $subject: ${httpResponse.code} ${httpResponse.body}")
None
throw new RuntimeException(s"Error registering subject $subject: ${httpResponse.code} ${httpResponse.body}")
}
} catch {
case e: Exception =>
logger.info("Error in registering schema: " + e.getMessage)
None
throw e
}
}

def getLastSchemaVersion(subject: String): Option[String] = {
private def getLastSchemaVersion(subject: String): Option[String] = {
val request = s"$schemaRegistryUrl/subjects/$subject/versions/latest"
logger.info(s"Getting schema for subject $subject")
val credentials = s"$srApiKey:$srApiSecret"
Expand All @@ -125,12 +116,12 @@ class CCloudSchemaRegistryService(schemaRegistryUrl: String, srApiKey: String, s
Some(schema)
} else {
logger.info(s"Error getting subject $subject: ${httpResponse.code} ${httpResponse.body}")
None
throw new RuntimeException(s"Error registering subject $subject: ${httpResponse.code} ${httpResponse.body}")
}
} catch {
case e: Exception =>
logger.info("Error in getting schema: " + e.getMessage)
None
throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t
private def deserialize(input: DataFrame): DataFrame = {
schemaRegistry match {
case Some("avro") => {
new CCloudSchemaRegistryService(schemaRegistryUrl, srApiKey, srApiSecret).deserializeWithAbris(topic, input)
new CCloudSchemaRegistryService(schemaRegistryUrl, srApiKey, srApiSecret).deserialize(topic, input)
}
case _ => {
input.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Expand Down

0 comments on commit 7089016

Please sign in to comment.