diff --git a/LICENSE-binary b/LICENSE-binary
index 748842a6191..d54692f7e6f 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -322,6 +322,8 @@ org.apache.kafka:kafka-clients
org.lz4:lz4-java
org.xerial.snappy:snappy-java
org.xerial:sqlite-jdbc
+com.sksamuel.elastic4s:elastic4s-client-esjava
+org.elasticsearch.client:elasticsearch-rest-client
BSD
------------
diff --git a/NOTICE-binary b/NOTICE-binary
index 40ec15010c4..07a28cc8f4e 100644
--- a/NOTICE-binary
+++ b/NOTICE-binary
@@ -1384,3 +1384,19 @@ decompression for Java, which can be obtained at:
* license/LICENSE.lz4-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/lz4/lz4-java
+
+This product depends on 'elastic4s', Scala client for ElasticSearch,
+which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.elastic4s.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/sksamuel/elastic4s
+
+This product depends on 'elasticsearch-rest-client', the low-level Java REST client for ElasticSearch,
+which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.elasticsearch.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/java-rest-low.html
diff --git a/dev/dependencyList b/dev/dependencyList
index ede67c96173..978f4804ebb 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -32,7 +32,15 @@ commons-collections/3.2.2//commons-collections-3.2.2.jar
commons-lang/2.6//commons-lang-2.6.jar
commons-lang3/3.13.0//commons-lang3-3.13.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
+config/1.4.2//config-1.4.2.jar
derby/10.14.2.0//derby-10.14.2.0.jar
+elastic4s-client-core_2.12/8.9.1//elastic4s-client-core_2.12-8.9.1.jar
+elastic4s-client-esjava_2.12/8.9.1//elastic4s-client-esjava_2.12-8.9.1.jar
+elastic4s-core_2.12/8.9.1//elastic4s-core_2.12-8.9.1.jar
+elastic4s-domain_2.12/8.9.1//elastic4s-domain_2.12-8.9.1.jar
+elastic4s-handlers_2.12/8.9.1//elastic4s-handlers_2.12-8.9.1.jar
+elastic4s-json-builder_2.12/8.9.1//elastic4s-json-builder_2.12-8.9.1.jar
+elasticsearch-rest-client/8.9.0//elasticsearch-rest-client-8.9.0.jar
error_prone_annotations/2.14.0//error_prone_annotations-2.14.0.jar
failsafe/2.4.4//failsafe-2.4.4.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
@@ -61,7 +69,9 @@ hive-storage-api/2.7.0//hive-storage-api-2.7.0.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
+httpasyncclient/4.1.5//httpasyncclient-4.1.5.jar
httpclient/4.5.14//httpclient-4.5.14.jar
+httpcore-nio/4.4.13//httpcore-nio-4.4.13.jar
httpcore/4.4.16//httpcore-4.4.16.jar
httpmime/4.5.14//httpmime-4.5.14.jar
j2objc-annotations/1.3//j2objc-annotations-1.3.jar
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 1bf1230e12a..21344d94e9b 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -58,20 +58,25 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
### Backend
-| Key | Default | Meaning | Type | Since |
-|--------------------------------------------------|---------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
-| kyuubi.backend.engine.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications | duration | 1.0.0 |
-| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications | duration | 1.0.0 |
-| kyuubi.backend.engine.exec.pool.size | 100 | Number of threads in the operation execution thread pool of SQL engine applications | int | 1.0.0 |
-| kyuubi.backend.engine.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool in SQL engine applications | int | 1.0.0 |
-| kyuubi.backend.server.event.json.log.path | file:///tmp/kyuubi/events | The location of server events go for the built-in JSON logger | string | 1.4.0 |
-| kyuubi.backend.server.event.kafka.close.timeout | PT5S | Period to wait for Kafka producer of server event handlers to close. | duration | 1.8.0 |
-| kyuubi.backend.server.event.kafka.topic | <undefined> | The topic of server events go for the built-in Kafka logger | string | 1.8.0 |
-| kyuubi.backend.server.event.loggers || A comma-separated list of server history loggers, where session/operation etc events go.
- JSON: the events will be written to the location of kyuubi.backend.server.event.json.log.path
- KAFKA: the events will be serialized in JSON format and sent to topic of `kyuubi.backend.server.event.kafka.topic`. Note: For the configs of Kafka producer, please specify them with the prefix: `kyuubi.backend.server.event.kafka.`. For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`
- JDBC: to be done
- CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a class which is a child of org.apache.kyuubi.events.handler.CustomEventHandlerProvider which has a zero-arg constructor. | seq | 1.4.0 |
-| kyuubi.backend.server.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server | duration | 1.0.0 |
-| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server | duration | 1.0.0 |
-| kyuubi.backend.server.exec.pool.size | 100 | Number of threads in the operation execution thread pool of Kyuubi server | int | 1.0.0 |
-| kyuubi.backend.server.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool of Kyuubi server | int | 1.0.0 |
+| Key | Default | Meaning | Type | Since |
+|--------------------------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
+| kyuubi.backend.engine.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications | duration | 1.0.0 |
+| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications | duration | 1.0.0 |
+| kyuubi.backend.engine.exec.pool.size | 100 | Number of threads in the operation execution thread pool of SQL engine applications | int | 1.0.0 |
+| kyuubi.backend.engine.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool in SQL engine applications | int | 1.0.0 |
+| kyuubi.backend.server.event.elasticsearch.index | <undefined> | The index of server events go for the built-in ElasticSearch logger | string | 1.9.0 |
+| kyuubi.backend.server.event.elasticsearch.index.autocreate.enabled | true | Whether auto create the index of server events go for the built-in ElasticSearch logger | boolean | 1.9.0 |
+| kyuubi.backend.server.event.elasticsearch.password | <undefined> | The password for ElasticSearch that server events go for the built-in ElasticSearch logger | string | 1.9.0 |
+| kyuubi.backend.server.event.elasticsearch.url | <undefined> | The url of ElasticSearch endpoints that server events go for the built-in ElasticSearch logger | string | 1.9.0 |
+| kyuubi.backend.server.event.elasticsearch.user | <undefined> | The user for ElasticSearch that server events go for the built-in ElasticSearch logger | string | 1.9.0 |
+| kyuubi.backend.server.event.json.log.path | file:///tmp/kyuubi/events | The location of server events go for the built-in JSON logger | string | 1.4.0 |
+| kyuubi.backend.server.event.kafka.close.timeout | PT5S | Period to wait for Kafka producer of server event handlers to close. | duration | 1.8.0 |
+| kyuubi.backend.server.event.kafka.topic | <undefined> | The topic of server events go for the built-in Kafka logger | string | 1.8.0 |
+| kyuubi.backend.server.event.loggers || A comma-separated list of server history loggers, where session/operation etc events go. - JSON: the events will be written to the location of kyuubi.backend.server.event.json.log.path
- KAFKA: the events will be serialized in JSON format and sent to topic of `kyuubi.backend.server.event.kafka.topic`. Note: For the configs of Kafka producer, please specify them with the prefix: `kyuubi.backend.server.event.kafka.`. For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`
- ELASTICSEARCH: the events will be sent to the index `kyuubi.backend.server.event.elasticsearch.index` on ElasticSearch
- JDBC: to be done
- CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a class which is a child of org.apache.kyuubi.events.handler.CustomEventHandlerProvider which has a zero-arg constructor. | seq | 1.4.0 |
+| kyuubi.backend.server.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server | duration | 1.0.0 |
+| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server | duration | 1.0.0 |
+| kyuubi.backend.server.exec.pool.size | 100 | Number of threads in the operation execution thread pool of Kyuubi server | int | 1.0.0 |
+| kyuubi.backend.server.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool of Kyuubi server | int | 1.0.0 |
### Batch
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a5c0aee0a32..b67e59b3939 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2206,6 +2206,49 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMillis(5000).toMillis)
+ val SERVER_EVENT_ELASTICSEARCH_SERVER_URL: OptionalConfigEntry[String] =
+ buildConf("kyuubi.backend.server.event.elasticsearch.url")
+ .doc("The url of ElasticSearch endpoints" +
+ " that server events go for the built-in ElasticSearch logger")
+ .version("1.9.0")
+ .serverOnly
+ .stringConf
+ .createOptional
+
+ val SERVER_EVENT_ELASTICSEARCH_INDEX: OptionalConfigEntry[String] =
+ buildConf("kyuubi.backend.server.event.elasticsearch.index")
+ .doc("The index of server events go for the built-in ElasticSearch logger")
+ .version("1.9.0")
+ .serverOnly
+ .stringConf
+ .createOptional
+
+ val SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED: ConfigEntry[Boolean] =
+ buildConf("kyuubi.backend.server.event.elasticsearch.index.autocreate.enabled")
+ .doc("Whether auto create the index of server events go for the built-in ElasticSearch" +
+ " logger")
+ .version("1.9.0")
+ .serverOnly
+ .booleanConf
+ .createWithDefault(true)
+
+ val SERVER_EVENT_ELASTICSEARCH_USER: OptionalConfigEntry[String] =
+ buildConf("kyuubi.backend.server.event.elasticsearch.user")
+ .doc("The user for ElasticSearch that server events go for the built-in ElasticSearch logger")
+ .version("1.9.0")
+ .serverOnly
+ .stringConf
+ .createOptional
+
+ val SERVER_EVENT_ELASTICSEARCH_PASSWORD: OptionalConfigEntry[String] =
+ buildConf("kyuubi.backend.server.event.elasticsearch.password")
+ .doc("The password for ElasticSearch that server events go for" +
+ " the built-in ElasticSearch logger")
+ .version("1.9.0")
+ .serverOnly
+ .stringConf
+ .createOptional
+
val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.backend.server.event.loggers")
.doc("A comma-separated list of server history loggers, where session/operation etc" +
@@ -2218,6 +2261,9 @@ object KyuubiConf {
s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
s" For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`" +
s" " +
+ s" ELASTICSEARCH: the events will be sent to the " +
+ s" index `${SERVER_EVENT_ELASTICSEARCH_INDEX.key}` on ElasticSearch" +
+ s"" +
s" JDBC: to be done" +
s" CUSTOM: User-defined event handlers." +
" Note that: Kyuubi supports custom event handlers with the Java SPI." +
@@ -2231,7 +2277,7 @@ object KyuubiConf {
.transformToUpperCase
.toSequence()
.checkValue(
- _.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA")),
+ _.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA", "ELASTICSEARCH")),
"Unsupported event loggers")
.createWithDefault(Nil)
diff --git a/kyuubi-events/pom.xml b/kyuubi-events/pom.xml
index 9b30b575017..343018857de 100644
--- a/kyuubi-events/pom.xml
+++ b/kyuubi-events/pom.xml
@@ -48,6 +48,16 @@
kafka-clients
+
+ com.sksamuel.elastic4s
+ elastic4s-client-esjava_${scala.binary.version}
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
org.apache.kyuubi
kyuubi-common_${scala.binary.version}
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
index f75e4be4f51..a05974bf9ec 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
@@ -55,6 +55,11 @@ trait EventHandlerRegister extends Logging {
throw new KyuubiException(s"Unsupported kafka event logger.")
}
+ protected def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
+ : EventHandler[KyuubiEvent] = {
+ throw new KyuubiException(s"Unsupported elasticsearch event logger.")
+ }
+
private def loadEventHandler(
eventLoggerType: EventLoggerType,
kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
@@ -71,6 +76,9 @@ trait EventHandlerRegister extends Logging {
case EventLoggerType.KAFKA =>
createKafkaEventHandler(kyuubiConf) :: Nil
+ case EventLoggerType.ELASTICSEARCH =>
+ createElasticSearchEventHandler(kyuubiConf) :: Nil
+
case EventLoggerType.CUSTOM =>
EventHandlerLoader.loadCustom(kyuubiConf)
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
index 987982371e7..55f75fc0c1c 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
@@ -21,5 +21,5 @@ object EventLoggerType extends Enumeration {
type EventLoggerType = Value
- val SPARK, JSON, JDBC, CUSTOM, KAFKA = Value
+ val SPARK, JSON, JDBC, CUSTOM, KAFKA, ELASTICSEARCH = Value
}
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/ElasticSearchLoggingEventHandler.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/ElasticSearchLoggingEventHandler.scala
new file mode 100644
index 00000000000..79c96d5a09e
--- /dev/null
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/ElasticSearchLoggingEventHandler.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events.handler
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, RequestFailure}
+import com.sksamuel.elastic4s.ElasticApi.indexInto
+import com.sksamuel.elastic4s.ElasticDsl._
+import com.sksamuel.elastic4s.http.JavaClient
+import com.sksamuel.elastic4s.requests.common.RefreshPolicy
+import com.sksamuel.elastic4s.requests.mappings.MappingDefinition
+import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicMapping
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.util.Preconditions.checkArgument
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.events.handler.ElasticSearchLoggingEventHandler.getElasticClient
+
+/**
+ * This event logger logs events to ElasticSearch.
+ */
+class ElasticSearchLoggingEventHandler(
+ indexId: String,
+ serverUrl: String,
+ username: Option[String] = None,
+ password: Option[String] = None,
+ kyuubiConf: KyuubiConf) extends EventHandler[KyuubiEvent] with Logging {
+
+ private val isAutoCreateIndex: Boolean =
+ kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED)
+
+ private val esClient: ElasticClient = getElasticClient(serverUrl, username, password)
+
+ private def checkIndexExisted: String => Boolean = (indexId: String) => {
+ checkArgument(
+ StringUtils.isNotBlank(indexId),
+ "index name must be configured for ElasticSearchEventHandler, please ensure %s is set",
+ SERVER_EVENT_ELASTICSEARCH_INDEX.key)
+ // check
+ esClient.execute {
+ indexExists(indexId)
+ }.await.result.isExists
+ }
+
+ private def checkAndEnsureIndex(indexId: String): Unit = {
+ if (!checkIndexExisted(indexId) && isAutoCreateIndex) {
+ esClient.execute {
+ createIndex(indexId)
+ .mapping(MappingDefinition(dynamic = Some(DynamicMapping.Dynamic)))
+ }.await
+ if (!checkIndexExisted(indexId)) {
+ throwExceptionIndexNotfound(indexId)
+ }
+ } else {
+ throwExceptionIndexNotfound(indexId)
+ }
+ }
+
+ checkAndEnsureIndex(indexId)
+
+ private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
+ override def apply(event: KyuubiEvent): Unit = {
+ val fields = {
+ objectMapper.readValue(event.toJson, classOf[Map[String, Any]]).map {
+ case (key: String, value: Any) =>
+ val isNumber = (obj: Any) => {
+ obj.isInstanceOf[Byte] || obj.isInstanceOf[Short] ||
+ obj.isInstanceOf[Int] || obj.isInstanceOf[Long] ||
+ obj.isInstanceOf[Float] || obj.isInstanceOf[Double]
+ }
+ if (value.isInstanceOf[String] || isNumber(value)) {
+ (key, value)
+ } else {
+ (key, objectMapper.writeValueAsString(value))
+ }
+ }
+ }
+ esClient.execute {
+ indexInto(indexId).fields(fields).refresh(RefreshPolicy.Immediate)
+ }.await match {
+ case f: RequestFailure =>
+ error(s"Failed to send event in ElasticSearchLoggingEventHandler, ${f.error}")
+ case _ =>
+ }
+ }
+
+ override def close(): Unit = {
+ esClient.close()
+ }
+
+ private def throwExceptionIndexNotfound(indexId: String) =
+ throw new RuntimeException(s"the index '$indexId' is not found on ElasticSearch")
+}
+
+object ElasticSearchLoggingEventHandler {
+ def getElasticClient(
+ serverUrl: String,
+ user: Option[String],
+ password: Option[String]): ElasticClient = {
+ val props = ElasticProperties(serverUrl)
+ val credentialsProvider = {
+ val provider = new BasicCredentialsProvider
+ val credentials = new UsernamePasswordCredentials(user.orNull, password.orNull)
+ provider.setCredentials(AuthScope.ANY, credentials)
+ provider
+ }
+ val javaClient = JavaClient.apply(
+ props,
+ (httpClientBuilder: HttpAsyncClientBuilder) => {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+ })
+ ElasticClient(javaClient)
+ }
+}
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 56155a27bec..cf663379652 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -405,6 +405,11 @@
kafka-clients
+
+ com.sksamuel.elastic4s
+ elastic4s-client-esjava_${scala.binary.version}
+
+
com.dimafeng
testcontainers-scala-scalatest_${scala.binary.version}
@@ -417,6 +422,12 @@
test
+
+ com.dimafeng
+ testcontainers-scala-elasticsearch_${scala.binary.version}
+ test
+
+
org.apache.hive
hive-exec
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
index ca6c776ac8c..9968449a12a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
@@ -20,7 +20,7 @@ import java.net.InetAddress
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler, ServerKafkaLoggingEventHandler}
+import org.apache.kyuubi.events.handler._
import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -53,6 +53,26 @@ object ServerEventHandlerRegister extends EventHandlerRegister {
closeTimeoutInMs)
}
+ override def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
+ : EventHandler[KyuubiEvent] = {
+ val indexId = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX).getOrElse {
+ throw new IllegalArgumentException(
+ s"${SERVER_EVENT_ELASTICSEARCH_INDEX.key} must be configured")
+ }
+ val serverUrl = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_SERVER_URL).getOrElse {
+ throw new IllegalArgumentException(
+ s"${SERVER_EVENT_ELASTICSEARCH_SERVER_URL.key} must be configured")
+ }
+ val username = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_USER)
+ val password = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_PASSWORD)
+ ServerElasticSearchLoggingEventHandler(
+ indexId,
+ serverUrl,
+ username,
+ password,
+ kyuubiConf)
+ }
+
override protected def getLoggers(conf: KyuubiConf): Seq[String] = {
conf.get(SERVER_EVENT_LOGGERS)
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerElasticSearchLoggingEventHandler.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerElasticSearchLoggingEventHandler.scala
new file mode 100644
index 00000000000..63e0d5c394d
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerElasticSearchLoggingEventHandler.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events.handler
+
+import org.apache.kyuubi.config.KyuubiConf
+
+case class ServerElasticSearchLoggingEventHandler(
+ indexId: String,
+ serverUrl: String,
+ username: Option[String],
+ password: Option[String],
+ kyuubiConf: KyuubiConf)
+ extends ElasticSearchLoggingEventHandler(indexId, serverUrl, username, password, kyuubiConf)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerElasticSearchLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerElasticSearchLoggingEventHandlerSuite.scala
new file mode 100644
index 00000000000..6ed52717de3
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerElasticSearchLoggingEventHandlerSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events.handler
+
+import java.util.UUID
+
+import com.dimafeng.testcontainers.ElasticsearchContainer
+import com.dimafeng.testcontainers.ElasticsearchContainer.defaultImage
+import com.dimafeng.testcontainers.scalatest.TestContainerForAll
+import com.sksamuel.elastic4s.ElasticDsl._
+
+import org.apache.kyuubi._
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.events.handler.ElasticSearchLoggingEventHandler.getElasticClient
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+abstract class ServerElasticSearchLoggingEventHandlerSuite extends WithKyuubiServer
+ with HiveJDBCTestHelper
+ with BatchTestHelper with TestContainerForAll {
+
+ protected val imageTag: String
+ override lazy val containerDef = ElasticsearchContainer.Def(s"$defaultImage:$imageTag")
+
+ override def startContainers(): containerDef.Container = {
+ val esContainer = containerDef.createContainer()
+ val jContainer = esContainer.container
+ jContainer.withPassword(esPassword)
+ jContainer.withEnv("xpack.security.enabled", "true")
+ esContainer.start()
+ esContainer
+ }
+
+ private val destIndex = "server-event-index"
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ private var esServerUrl: String = _
+ private val esUser = "elastic"
+ private val esPassword = UUID.randomUUID().toString
+
+ override protected val conf: KyuubiConf = {
+ KyuubiConf()
+ .set(KyuubiConf.SERVER_EVENT_LOGGERS, Seq("ELASTICSEARCH"))
+ .set(KyuubiConf.SERVER_EVENT_ELASTICSEARCH_INDEX, destIndex)
+ .set(KyuubiConf.SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED, true)
+ .set(KyuubiConf.SERVER_EVENT_ELASTICSEARCH_USER, esUser)
+ .set(KyuubiConf.SERVER_EVENT_ELASTICSEARCH_PASSWORD, esPassword)
+ }
+
+ override def beforeAll(): Unit = withContainers { esContainer =>
+ esServerUrl = s"http://${esContainer.httpHostAddress}"
+ conf.set(SERVER_EVENT_ELASTICSEARCH_SERVER_URL, esServerUrl)
+ super.beforeAll()
+ }
+
+ test("check server events sent to ElasticSearch index") {
+ withContainers { _ =>
+ val esClient = getElasticClient(esServerUrl, Some(esUser), Some(esPassword))
+ try {
+ val hits = esClient.execute {
+ search(destIndex).matchAllQuery()
+ }.await.result.hits.hits
+ assert(hits.length > 0)
+ } finally {
+ esClient.close()
+ }
+ }
+ }
+
+}
+
+class ServerElasticSearchLoggingEventHandlerSuiteForEs7
+ extends ServerElasticSearchLoggingEventHandlerSuite {
+ override val imageTag = "7.17.14"
+}
+
+class ServerElasticSearchLoggingEventHandlerSuiteForEs8
+ extends ServerElasticSearchLoggingEventHandlerSuite {
+ override val imageTag = "8.10.4"
+}
diff --git a/licenses-binary/LICENSE-elasticsearch.txt b/licenses-binary/LICENSE-elasticsearch.txt
new file mode 100644
index 00000000000..3c8b7f6ae5c
--- /dev/null
+++ b/licenses-binary/LICENSE-elasticsearch.txt
@@ -0,0 +1,6 @@
+Copyright 2013-2019 Elasticsearch
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
diff --git a/pom.xml b/pom.xml
index 75bfd86502d..f42d09f14b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
2.6
3.13.0
0.7.3
+ 8.9.1
delta-core
2.4.0
2.4.4
@@ -577,6 +578,12 @@
${testcontainers-scala.version}
+
+ com.dimafeng
+ testcontainers-scala-elasticsearch_${scala.binary.version}
+ ${testcontainers-scala.version}
+
+
io.fabric8
kubernetes-client
@@ -1240,6 +1247,13 @@
true
+
+ com.sksamuel.elastic4s
+ elastic4s-client-esjava_${scala.binary.version}
+ ${elastic4s.version}
+ true
+
+
com.github.scopt
scopt_${scala.binary.version}