diff --git a/hive/build.gradle.kts b/hive/build.gradle.kts index ee07fea..0b32959 100644 --- a/hive/build.gradle.kts +++ b/hive/build.gradle.kts @@ -2,10 +2,9 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar group = "io.tellery.connectors" -version = "0.0.1-SNAPSHOT" +version = "0.5.0" repositories { - mavenLocal() maven { url = uri("https://maven.pkg.github.com/tellery/tellery") credentials { @@ -18,12 +17,12 @@ repositories { plugins { idea - kotlin("jvm") version "1.4.20" + kotlin("jvm") version "1.5.21" id("com.github.johnrengelman.shadow") version "5.2.0" } dependencies { - implementation("io.tellery:connector-interface:0.4.0-SNAPSHOT") + implementation("io.tellery:connector-interface:0.5.0-SNAPSHOT") implementation("org.apache.hive:hive-jdbc:2.1.0") { exclude(group = "org.slf4j", module = "") exclude(group = "log4j", module = "log4j") diff --git a/hive/src/HiveConnector.kt b/hive/src/HiveConnector.kt index 679a0fc..1e30d37 100644 --- a/hive/src/HiveConnector.kt +++ b/hive/src/HiveConnector.kt @@ -1,9 +1,9 @@ package io.tellery.connectors.contrib -import io.tellery.annotations.Connector -import io.tellery.annotations.HandleImport import io.tellery.annotations.Config import io.tellery.annotations.Config.ConfigType +import io.tellery.annotations.Connector +import io.tellery.annotations.HandleImport import io.tellery.connectors.JDBCConnector import io.tellery.entities.* import io.tellery.utils.S3Storage @@ -15,16 +15,65 @@ import java.sql.Types @Connector( - type="Hive/SparkSQL", + type = "Hive/SparkSQL", configs = [ - Config(name="Endpoint", type= ConfigType.STRING, description="The endpoint of your hive / sparkSQL thrift server", hint="localhost",required=true), - Config(name="Port", type= ConfigType.NUMBER, description="the port number", hint="10001",required=true), - Config(name="S3AccessKey", type=ConfigType.STRING, description="S3 Access Key ID(for uploading csv)"), - Config(name="S3SecretKey", type=ConfigType.STRING, description="S3 Secret Access Key (for uploading csv)", secret=true), - Config(name="S3Region", type=ConfigType.STRING, description="S3 region (be the same as your Redshift cluster", hint="us-east-1"), - Config(name="S3Bucket", type=ConfigType.STRING, description="S3 bucket (where uploaded csv stores)", hint="tellery"), - Config(name="S3KeyPrefix", type=ConfigType.STRING, description="S3 key prefix prepends to uploaded csv"), -]) + Config( + name = "Endpoint", + type = ConfigType.STRING, + description = "The endpoint of your hive / sparkSQL thrift server", + hint = "localhost", + required = true + ), + Config( + name = "Port", + type = ConfigType.NUMBER, + description = "The port number", + hint = "10001", + required = true + ), + Config( + name = "Username", + type = ConfigType.STRING, + description = "Your hadoop username", + hint = "your_username", + ), + Config( + name = "Password", + type = ConfigType.STRING, + description = "Your hadoop password", + hint = "", + secret = true, + ), + Config( + name = "S3 Access Key", + type = ConfigType.STRING, + description = "S3 Access Key ID(for uploading csv)" + ), + Config( + name = "S3 Secret Key", + type = ConfigType.STRING, + description = "S3 Secret Access Key (for uploading csv)", + secret = true + ), + Config( + name = "S3 Region", + type = ConfigType.STRING, + description = "S3 region (be the same as your Redshift cluster", + hint = "us-east-1" + ), + Config( + name = "S3 Bucket", + type = ConfigType.STRING, + description = "S3 bucket (where uploaded csv stores)", + hint = "tellery" + ), + Config( + name = "S3 Key Prefix", + type = ConfigType.STRING, + description = "S3 key prefix prepends to uploaded csv" + ), + ] +) class HiveConnector : JDBCConnector() { override val driverClassName = "org.apache.hive.jdbc.HiveDriver" @@ -81,9 +130,13 @@ class HiveConnector : JDBCConnector() { return dbConnection.use { conn -> conn.createStatement().use { stmt -> try { - stmt.executeQuery(queryRemark(remark, "SHOW TABLES FROM `${ - dbName.replace("`", "``") - }`")).use { + stmt.executeQuery( + queryRemark( + remark, "SHOW TABLES FROM `${ + dbName.replace("`", "``") + }`" + ) + ).use { generateSequence { if (it.next()) { try { @@ -116,16 +169,22 @@ class HiveConnector : JDBCConnector() { return dbConnection.use { conn -> conn.createStatement().use { stmt -> try { - stmt.executeQuery(queryRemark( - remark, - "DESC `${ - dbName.replace("`", "``") - }`.`${ - collectionName.replace("`", "``") - }`")).use { + stmt.executeQuery( + queryRemark( + remark, + "DESC `${ + dbName.replace("`", "``") + }`.`${ + collectionName.replace("`", "``") + }`" + ) + ).use { generateSequence { if (it.next()) { - val (colName, dataType) = arrayOf(1, 2).map { colIndex -> it.getString(colIndex) } + val (colName, dataType) = arrayOf( + 1, + 2 + ).map { colIndex -> it.getString(colIndex) } if (colName.isNullOrBlank() && dataType.isNullOrBlank()) { null } else { @@ -184,7 +243,12 @@ class HiveConnector : JDBCConnector() { } @HandleImport("text/csv") - suspend fun importFromCSV(database: String, collection: String, schema: String?, content: ByteArray) { + suspend fun importFromCSV( + database: String, + collection: String, + schema: String?, + content: ByteArray + ) { val filename = "$database/${if (schema != null) "$schema." else ""}$collection.csv" val s3Path = this.s3Client!!.uploadFile(filename, content, "text/csv") @@ -197,7 +261,8 @@ class HiveConnector : JDBCConnector() { |OPTIONS( | path "$s3Path", | header "true", - | inferSchema "true" + | inferSchema "true", + | encoding "UTF-8" |) """.trimMargin() stmt.execute(sql)