Skip to content

Commit

Permalink
es event logger
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Oct 27, 2023
1 parent cfd90e0 commit 39adfff
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 17 deletions.
10 changes: 10 additions & 0 deletions dev/dependencyList
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ commons-collections/3.2.2//commons-collections-3.2.2.jar
commons-lang/2.6//commons-lang-2.6.jar
commons-lang3/3.13.0//commons-lang3-3.13.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
config/1.4.2//config-1.4.2.jar
derby/10.14.2.0//derby-10.14.2.0.jar
elastic4s-client-core_2.12/7.17.4//elastic4s-client-core_2.12-7.17.4.jar
elastic4s-client-esjava_2.12/7.17.4//elastic4s-client-esjava_2.12-7.17.4.jar
elastic4s-core_2.12/7.17.4//elastic4s-core_2.12-7.17.4.jar
elastic4s-domain_2.12/7.17.4//elastic4s-domain_2.12-7.17.4.jar
elastic4s-handlers_2.12/7.17.4//elastic4s-handlers_2.12-7.17.4.jar
elastic4s-json-builder_2.12/7.17.4//elastic4s-json-builder_2.12-7.17.4.jar
elasticsearch-rest-client/8.3.3//elasticsearch-rest-client-8.3.3.jar
error_prone_annotations/2.14.0//error_prone_annotations-2.14.0.jar
failsafe/2.4.4//failsafe-2.4.4.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
Expand Down Expand Up @@ -61,7 +69,9 @@ hive-storage-api/2.7.0//hive-storage-api-2.7.0.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
httpasyncclient/4.1.4//httpasyncclient-4.1.4.jar
httpclient/4.5.14//httpclient-4.5.14.jar
httpcore-nio/4.4.12//httpcore-nio-4.4.12.jar
httpcore/4.4.16//httpcore-4.4.16.jar
httpmime/4.5.14//httpmime-4.5.14.jar
j2objc-annotations/1.3//j2objc-annotations-1.3.jar
Expand Down
33 changes: 19 additions & 14 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2206,6 +2206,49 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMillis(5000).toMillis)

val SERVER_EVENT_ELASTICSEARCH_SERVER_URL: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.url")
.doc("The url of ElasticSearch endpoints" +
" that server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_INDEX: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.index")
.doc("The index of server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.backend.server.event.elasticsearch.index.autocreate.enabled")
.doc("Whether auto create the index of server events go for the built-in ElasticSearch" +
" logger")
.version("1.9.0")
.serverOnly
.booleanConf
.createWithDefault(true)

val SERVER_EVENT_ELASTICSEARCH_USER: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.user")
.doc("The user for ElasticSearch that server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_PASSWORD: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.password")
.doc("The password for ElasticSearch that server events go for" +
" the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.backend.server.event.loggers")
.doc("A comma-separated list of server history loggers, where session/operation etc" +
Expand All @@ -2218,6 +2261,7 @@ object KyuubiConf {
s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
s" For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`" +
s" </li>" +
s" <li>ELASTICSEARCH: (experimental)</li>" +
s" <li>JDBC: to be done</li>" +
s" <li>CUSTOM: User-defined event handlers.</li></ul>" +
" Note that: Kyuubi supports custom event handlers with the Java SPI." +
Expand All @@ -2231,7 +2275,7 @@ object KyuubiConf {
.transformToUpperCase
.toSequence()
.checkValue(
_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA")),
_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA", "ELASTICSEARCH")),
"Unsupported event loggers")
.createWithDefault(Nil)

Expand Down
10 changes: 10 additions & 0 deletions kyuubi-events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ trait EventHandlerRegister extends Logging {
throw new KyuubiException(s"Unsupported kafka event logger.")
}

protected def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
throw new KyuubiException(s"Unsupported elasticsearch event logger.")
}

private def loadEventHandler(
eventLoggerType: EventLoggerType,
kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
Expand All @@ -71,6 +76,9 @@ trait EventHandlerRegister extends Logging {
case EventLoggerType.KAFKA =>
createKafkaEventHandler(kyuubiConf) :: Nil

case EventLoggerType.ELASTICSEARCH =>
createElasticSearchEventHandler(kyuubiConf) :: Nil

case EventLoggerType.CUSTOM =>
EventHandlerLoader.loadCustom(kyuubiConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ object EventLoggerType extends Enumeration {

type EventLoggerType = Value

val SPARK, JSON, JDBC, CUSTOM, KAFKA = Value
val SPARK, JSON, JDBC, CUSTOM, KAFKA, ELASTICSEARCH = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.events.handler

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.sksamuel.elastic4s._
import com.sksamuel.elastic4s.ElasticApi.indexInto
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http._
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.requests.mappings.MappingDefinition
import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicMapping
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.util.Preconditions.checkArgument
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.events.KyuubiEvent

/**
* This event logger logs events to ElasticSearch.
*/
class ElasticSearchLoggingEventHandler(
indexId: String,
serverUrl: String,
username: Option[String] = None,
password: Option[String] = None,
kyuubiConf: KyuubiConf) extends EventHandler[KyuubiEvent] with Logging {

private val isAutoCreateIndex: Boolean =
kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED)

private val esClient: ElasticClient = {
val props = ElasticProperties(serverUrl)
lazy val provider = {
val provider = new BasicCredentialsProvider
val credentials =
new UsernamePasswordCredentials(username.getOrElse(""), password.getOrElse(""))
provider.setCredentials(AuthScope.ANY, credentials)
provider
}
val javaClient = JavaClient(
props,
(requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder,
(httpClientBuilder: HttpAsyncClientBuilder) => {
httpClientBuilder.setDefaultCredentialsProvider(provider)
})
ElasticClient(javaClient)
}

private def checkIndexExisted: String => Boolean = (indexId: String) => {
checkArgument(
StringUtils.isNotBlank(indexId),
"index name must be configured for ElasticSearchEventHandler, please ensure %s is set",
SERVER_EVENT_ELASTICSEARCH_INDEX.key)
// check
esClient.execute {
indexExists(indexId)
}.await.result.isExists
}

private def checkAndEnsureIndex(indexId: String): Unit = {
if (!checkIndexExisted(indexId) && isAutoCreateIndex) {
esClient.execute {
createIndex(indexId)
.mapping(MappingDefinition(dynamic = Some(DynamicMapping.Dynamic)))
}.await
if (!checkIndexExisted(indexId)) {
throwExceptionIndexNotfound(indexId)
}
} else {
throwExceptionIndexNotfound(indexId)
}
}

checkAndEnsureIndex(indexId)

private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

override def apply(event: KyuubiEvent): Unit = {

try {
val fields = {
val entries = objectMapper.readValue(event.toJson, classOf[Map[String, Any]])
entries.map(p => {
val key = p._1
val value = p._2
val isNumber = (obj: Any) => {
(obj.isInstanceOf[Byte] || obj.isInstanceOf[Short] ||
obj.isInstanceOf[Int] ||
obj.isInstanceOf[Long]).||(obj.isInstanceOf[Float]) ||
obj.isInstanceOf[Double]
}
if (value.isInstanceOf[String] || isNumber(value)) {
(key, value)
} else {
(key, objectMapper.writeValueAsString(value))
}
})
}
esClient.execute {
indexInto(indexId)
.fields(fields)
.refresh(RefreshPolicy.Immediate)
}.await
} catch {
case e: Exception =>
error("Failed to send event in ElasticSearchEventHandler", e)
}
}

override def close(): Unit = {
esClient.close()
}

private def throwExceptionIndexNotfound(indexId: String) =
throw new RuntimeException(s"the index '$indexId' is not found on ElasticSearch")
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.events

package object handler {
trait EventHandler[T <: KyuubiEvent] extends AutoCloseable {

def apply(event: T): Unit

def close(): Unit = {}
Expand Down
11 changes: 11 additions & 0 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
Expand All @@ -417,6 +422,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-elasticsearch_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.net.InetAddress

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler, ServerKafkaLoggingEventHandler}
import org.apache.kyuubi.events.handler._
import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
import org.apache.kyuubi.util.KyuubiHadoopUtils

Expand Down Expand Up @@ -53,6 +53,26 @@ object ServerEventHandlerRegister extends EventHandlerRegister {
closeTimeoutInMs)
}

override def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
val indexId = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX).getOrElse {
throw new IllegalArgumentException(
s"${SERVER_EVENT_ELASTICSEARCH_INDEX.key} must be configured")
}
val serverUrl = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_SERVER_URL).getOrElse {
throw new IllegalArgumentException(
s"${SERVER_EVENT_ELASTICSEARCH_SERVER_URL.key} must be configured")
}
val username = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_USER)
val password = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_PASSWORD)
ServerElasticSearchLoggingEventHandler(
indexId,
serverUrl,
username,
password,
kyuubiConf)
}

override protected def getLoggers(conf: KyuubiConf): Seq[String] = {
conf.get(SERVER_EVENT_LOGGERS)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.events.handler

import org.apache.kyuubi.config.KyuubiConf

case class ServerElasticSearchLoggingEventHandler(
indexId: String,
serverUrl: String,
username: Option[String],
password: Option[String],
kyuubiConf: KyuubiConf)
extends ElasticSearchLoggingEventHandler(indexId, serverUrl, username, password, kyuubiConf)
Loading

0 comments on commit 39adfff

Please sign in to comment.