Skip to content

Commit

Permalink
es event logger
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Oct 28, 2023
1 parent cfd90e0 commit 8ead671
Show file tree
Hide file tree
Showing 15 changed files with 425 additions and 17 deletions.
2 changes: 2 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ org.apache.kafka:kafka-clients
org.lz4:lz4-java
org.xerial.snappy:snappy-java
org.xerial:sqlite-jdbc
com.sksamuel.elastic4s:elastic4s-client-esjava
org.elasticsearch.client:elasticsearch-rest-client

BSD
------------
Expand Down
16 changes: 16 additions & 0 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -1384,3 +1384,19 @@ decompression for Java, which can be obtained at:
* license/LICENSE.lz4-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/lz4/lz4-java

This product depends on 'elastic4s', Scala client for ElasticSearch,
which can be obtained at:

* LICENSE:
* license/LICENSE.elastic4s.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/sksamuel/elastic4s

This product depends on 'elasticsearch-rest-client', the low-level Java REST client for ElasticSearch,
which can be obtained at:

* LICENSE:
* license/LICENSE.elasticsearch.txt (Apache License 2.0)
* HOMEPAGE:
* https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/java-rest-low.html
10 changes: 10 additions & 0 deletions dev/dependencyList
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ commons-collections/3.2.2//commons-collections-3.2.2.jar
commons-lang/2.6//commons-lang-2.6.jar
commons-lang3/3.13.0//commons-lang3-3.13.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
config/1.4.2//config-1.4.2.jar
derby/10.14.2.0//derby-10.14.2.0.jar
elastic4s-client-core_2.12/8.9.1//elastic4s-client-core_2.12-8.9.1.jar
elastic4s-client-esjava_2.12/8.9.1//elastic4s-client-esjava_2.12-8.9.1.jar
elastic4s-core_2.12/8.9.1//elastic4s-core_2.12-8.9.1.jar
elastic4s-domain_2.12/8.9.1//elastic4s-domain_2.12-8.9.1.jar
elastic4s-handlers_2.12/8.9.1//elastic4s-handlers_2.12-8.9.1.jar
elastic4s-json-builder_2.12/8.9.1//elastic4s-json-builder_2.12-8.9.1.jar
elasticsearch-rest-client/8.9.0//elasticsearch-rest-client-8.9.0.jar
error_prone_annotations/2.14.0//error_prone_annotations-2.14.0.jar
failsafe/2.4.4//failsafe-2.4.4.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
Expand Down Expand Up @@ -61,7 +69,9 @@ hive-storage-api/2.7.0//hive-storage-api-2.7.0.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
httpasyncclient/4.1.5//httpasyncclient-4.1.5.jar
httpclient/4.5.14//httpclient-4.5.14.jar
httpcore-nio/4.4.13//httpcore-nio-4.4.13.jar
httpcore/4.4.16//httpcore-4.4.16.jar
httpmime/4.5.14//httpmime-4.5.14.jar
j2objc-annotations/1.3//j2objc-annotations-1.3.jar
Expand Down
33 changes: 19 additions & 14 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2206,6 +2206,49 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMillis(5000).toMillis)

val SERVER_EVENT_ELASTICSEARCH_SERVER_URL: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.url")
.doc("The url of ElasticSearch endpoints" +
" that server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_INDEX: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.index")
.doc("The index of server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.backend.server.event.elasticsearch.index.autocreate.enabled")
.doc("Whether auto create the index of server events go for the built-in ElasticSearch" +
" logger")
.version("1.9.0")
.serverOnly
.booleanConf
.createWithDefault(true)

val SERVER_EVENT_ELASTICSEARCH_USER: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.user")
.doc("The user for ElasticSearch that server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_PASSWORD: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.password")
.doc("The password for ElasticSearch that server events go for" +
" the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.backend.server.event.loggers")
.doc("A comma-separated list of server history loggers, where session/operation etc" +
Expand All @@ -2218,6 +2261,9 @@ object KyuubiConf {
s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
s" For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`" +
s" </li>" +
s" <li>ELASTICSEARCH: the events will be sent to the " +
s" index `${SERVER_EVENT_ELASTICSEARCH_INDEX.key}` on ElasticSearch" +
s"</li>" +
s" <li>JDBC: to be done</li>" +
s" <li>CUSTOM: User-defined event handlers.</li></ul>" +
" Note that: Kyuubi supports custom event handlers with the Java SPI." +
Expand All @@ -2231,7 +2277,7 @@ object KyuubiConf {
.transformToUpperCase
.toSequence()
.checkValue(
_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA")),
_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA", "ELASTICSEARCH")),
"Unsupported event loggers")
.createWithDefault(Nil)

Expand Down
10 changes: 10 additions & 0 deletions kyuubi-events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ trait EventHandlerRegister extends Logging {
throw new KyuubiException(s"Unsupported kafka event logger.")
}

protected def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
throw new KyuubiException(s"Unsupported elasticsearch event logger.")
}

private def loadEventHandler(
eventLoggerType: EventLoggerType,
kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
Expand All @@ -71,6 +76,9 @@ trait EventHandlerRegister extends Logging {
case EventLoggerType.KAFKA =>
createKafkaEventHandler(kyuubiConf) :: Nil

case EventLoggerType.ELASTICSEARCH =>
createElasticSearchEventHandler(kyuubiConf) :: Nil

case EventLoggerType.CUSTOM =>
EventHandlerLoader.loadCustom(kyuubiConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ object EventLoggerType extends Enumeration {

type EventLoggerType = Value

val SPARK, JSON, JDBC, CUSTOM, KAFKA = Value
val SPARK, JSON, JDBC, CUSTOM, KAFKA, ELASTICSEARCH = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.events.handler

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, RequestFailure}
import com.sksamuel.elastic4s.ElasticApi.indexInto
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.requests.mappings.MappingDefinition
import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicMapping
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.util.Preconditions.checkArgument
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.events.KyuubiEvent
import org.apache.kyuubi.events.handler.ElasticSearchLoggingEventHandler.getElasticClient

/**
* This event logger logs events to ElasticSearch.
*/
class ElasticSearchLoggingEventHandler(
indexId: String,
serverUrl: String,
username: Option[String] = None,
password: Option[String] = None,
kyuubiConf: KyuubiConf) extends EventHandler[KyuubiEvent] with Logging {

private val isAutoCreateIndex: Boolean =
kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED)

private val esClient: ElasticClient = getElasticClient(serverUrl, username, password)

private def checkIndexExisted: String => Boolean = (indexId: String) => {
checkArgument(
StringUtils.isNotBlank(indexId),
"index name must be configured for ElasticSearchEventHandler, please ensure %s is set",
SERVER_EVENT_ELASTICSEARCH_INDEX.key)
// check
esClient.execute {
indexExists(indexId)
}.await.result.isExists
}

private def checkAndEnsureIndex(indexId: String): Unit = {
if (!checkIndexExisted(indexId) && isAutoCreateIndex) {
esClient.execute {
createIndex(indexId)
.mapping(MappingDefinition(dynamic = Some(DynamicMapping.Dynamic)))
}.await
if (!checkIndexExisted(indexId)) {
throwExceptionIndexNotfound(indexId)
}
} else {
throwExceptionIndexNotfound(indexId)
}
}

checkAndEnsureIndex(indexId)

private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

override def apply(event: KyuubiEvent): Unit = {
val fields = {
objectMapper.readValue(event.toJson, classOf[Map[String, Any]]).map {
case (key: String, value: Any) =>
val isNumber = (obj: Any) => {
obj.isInstanceOf[Byte] || obj.isInstanceOf[Short] ||
obj.isInstanceOf[Int] || obj.isInstanceOf[Long] ||
obj.isInstanceOf[Float] || obj.isInstanceOf[Double]
}
if (value.isInstanceOf[String] || isNumber(value)) {
(key, value)
} else {
(key, objectMapper.writeValueAsString(value))
}
}
}
esClient.execute {
indexInto(indexId).fields(fields).refresh(RefreshPolicy.Immediate)
}.await match {
case f: RequestFailure =>
error(s"Failed to send event in ElasticSearchLoggingEventHandler, ${f.error}")
case _ =>
}
}

override def close(): Unit = {
esClient.close()
}

private def throwExceptionIndexNotfound(indexId: String) =
throw new RuntimeException(s"the index '$indexId' is not found on ElasticSearch")
}

object ElasticSearchLoggingEventHandler {
def getElasticClient(
serverUrl: String,
user: Option[String],
password: Option[String]): ElasticClient = {
val props = ElasticProperties(serverUrl)
val credentialsProvider = {
val provider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(user.orNull, password.orNull)
provider.setCredentials(AuthScope.ANY, credentials)
provider
}
val javaClient = JavaClient.apply(
props,
(httpClientBuilder: HttpAsyncClientBuilder) => {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
})
ElasticClient(javaClient)
}
}
11 changes: 11 additions & 0 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
Expand All @@ -417,6 +422,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-elasticsearch_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.net.InetAddress

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler, ServerKafkaLoggingEventHandler}
import org.apache.kyuubi.events.handler._
import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
import org.apache.kyuubi.util.KyuubiHadoopUtils

Expand Down Expand Up @@ -53,6 +53,26 @@ object ServerEventHandlerRegister extends EventHandlerRegister {
closeTimeoutInMs)
}

override def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
val indexId = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX).getOrElse {
throw new IllegalArgumentException(
s"${SERVER_EVENT_ELASTICSEARCH_INDEX.key} must be configured")
}
val serverUrl = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_SERVER_URL).getOrElse {
throw new IllegalArgumentException(
s"${SERVER_EVENT_ELASTICSEARCH_SERVER_URL.key} must be configured")
}
val username = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_USER)
val password = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_PASSWORD)
ServerElasticSearchLoggingEventHandler(
indexId,
serverUrl,
username,
password,
kyuubiConf)
}

override protected def getLoggers(conf: KyuubiConf): Seq[String] = {
conf.get(SERVER_EVENT_LOGGERS)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.events.handler

import org.apache.kyuubi.config.KyuubiConf

case class ServerElasticSearchLoggingEventHandler(
indexId: String,
serverUrl: String,
username: Option[String],
password: Option[String],
kyuubiConf: KyuubiConf)
extends ElasticSearchLoggingEventHandler(indexId, serverUrl, username, password, kyuubiConf)
Loading

0 comments on commit 8ead671

Please sign in to comment.