Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ElasticSearch event logger for server events #5488

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ org.apache.kafka:kafka-clients
org.lz4:lz4-java
org.xerial.snappy:snappy-java
org.xerial:sqlite-jdbc
com.sksamuel.elastic4s:elastic4s-client-esjava
org.elasticsearch.client:elasticsearch-rest-client

BSD
------------
Expand Down
16 changes: 16 additions & 0 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -1079,3 +1079,19 @@ decompression for Java, which can be obtained at:
* license/LICENSE.lz4-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/lz4/lz4-java

This product depends on 'elastic4s', Scala client for ElasticSearch,
which can be obtained at:

* LICENSE:
* license/LICENSE.elastic4s.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/sksamuel/elastic4s

This product depends on 'elasticsearch-rest-client', the low-level Java REST client for ElasticSearch,
which can be obtained at:

* LICENSE:
* license/LICENSE.elasticsearch.txt (Apache License 2.0)
* HOMEPAGE:
* https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/java-rest-low.html
11 changes: 11 additions & 0 deletions dev/dependencyList
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ checker-qual/3.42.0//checker-qual-3.42.0.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-codec/1.15//commons-codec-1.15.jar
commons-lang3/3.13.0//commons-lang3-3.13.0.jar
commons-logging/1.2//commons-logging-1.2.jar
config/1.4.3//config-1.4.3.jar
elastic4s-client-core_2.12/8.11.5//elastic4s-client-core_2.12-8.11.5.jar
elastic4s-client-esjava_2.12/8.11.5//elastic4s-client-esjava_2.12-8.11.5.jar
elastic4s-core_2.12/8.11.5//elastic4s-core_2.12-8.11.5.jar
elastic4s-domain_2.12/8.11.5//elastic4s-domain_2.12-8.11.5.jar
elastic4s-handlers_2.12/8.11.5//elastic4s-handlers_2.12-8.11.5.jar
elastic4s-json-builder_2.12/8.11.5//elastic4s-json-builder_2.12-8.11.5.jar
elasticsearch-rest-client/8.11.2//elasticsearch-rest-client-8.11.2.jar
error_prone_annotations/2.20.0//error_prone_annotations-2.20.0.jar
failsafe/3.3.2//failsafe-3.3.2.jar
failureaccess/1.0.1//failureaccess-1.0.1.jar
Expand All @@ -52,7 +61,9 @@ hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
httpasyncclient/4.1.5//httpasyncclient-4.1.5.jar
httpclient/4.5.14//httpclient-4.5.14.jar
httpcore-nio/4.4.13//httpcore-nio-4.4.13.jar
httpcore/4.4.16//httpcore-4.4.16.jar
httpmime/4.5.14//httpmime-4.5.14.jar
j2objc-annotations/1.3//j2objc-annotations-1.3.jar
Expand Down
33 changes: 19 additions & 14 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2437,6 +2437,49 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofMillis(5000).toMillis)

val SERVER_EVENT_ELASTICSEARCH_SERVER_URL: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.url")
.doc("The url of ElasticSearch endpoints" +
" that server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_INDEX: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.index")
.doc("The index of server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.backend.server.event.elasticsearch.index.autocreate.enabled")
.doc("Whether auto create the index of server events go for the built-in ElasticSearch" +
" logger")
.version("1.9.0")
.serverOnly
.booleanConf
.createWithDefault(true)

val SERVER_EVENT_ELASTICSEARCH_USER: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.user")
.doc("The user for ElasticSearch that server events go for the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_ELASTICSEARCH_PASSWORD: OptionalConfigEntry[String] =
buildConf("kyuubi.backend.server.event.elasticsearch.password")
.doc("The password for ElasticSearch that server events go for" +
" the built-in ElasticSearch logger")
.version("1.9.0")
.serverOnly
.stringConf
.createOptional

val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.backend.server.event.loggers")
.doc("A comma-separated list of server history loggers, where session/operation etc" +
Expand All @@ -2449,6 +2492,9 @@ object KyuubiConf {
s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
s" For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`" +
s" </li>" +
s" <li>ELASTICSEARCH: the events will be sent to the " +
s" index `${SERVER_EVENT_ELASTICSEARCH_INDEX.key}` on ElasticSearch" +
s"</li>" +
s" <li>JDBC: to be done</li>" +
s" <li>CUSTOM: User-defined event handlers.</li></ul>" +
" Note that: Kyuubi supports custom event handlers with the Java SPI." +
Expand All @@ -2462,7 +2508,7 @@ object KyuubiConf {
.transformToUpperCase
.toSequence()
.checkValue(
_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA")),
_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA", "ELASTICSEARCH")),
"Unsupported event loggers")
.createWithDefault(Nil)

Expand Down
10 changes: 10 additions & 0 deletions kyuubi-events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ trait EventHandlerRegister extends Logging {
throw new KyuubiException(s"Unsupported kafka event logger.")
}

protected def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
throw new KyuubiException(s"Unsupported elasticsearch event logger.")
}

private def loadEventHandler(
eventLoggerType: EventLoggerType,
kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
Expand All @@ -71,6 +76,9 @@ trait EventHandlerRegister extends Logging {
case EventLoggerType.KAFKA =>
createKafkaEventHandler(kyuubiConf) :: Nil

case EventLoggerType.ELASTICSEARCH =>
createElasticSearchEventHandler(kyuubiConf) :: Nil

case EventLoggerType.CUSTOM =>
EventHandlerLoader.loadCustom(kyuubiConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ object EventLoggerType extends Enumeration {

type EventLoggerType = Value

val SPARK, JSON, JDBC, CUSTOM, KAFKA = Value
val SPARK, JSON, JDBC, CUSTOM, KAFKA, ELASTICSEARCH = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.events.handler

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.Failure

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.ElasticApi.indexInto
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.requests.mappings.MappingDefinition
import com.sksamuel.elastic4s.requests.mappings.dynamictemplate.DynamicMapping
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.util.Preconditions.checkArgument
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.events.KyuubiEvent
import org.apache.kyuubi.events.handler.ElasticSearchLoggingEventHandler.getElasticClient
import org.apache.kyuubi.util.ThreadUtils

/**
* This event logger logs events to ElasticSearch.
*/
class ElasticSearchLoggingEventHandler(
indexId: String,
serverUrl: String,
username: Option[String] = None,
password: Option[String] = None,
kyuubiConf: KyuubiConf) extends EventHandler[KyuubiEvent] with Logging {

private val isAutoCreateIndex: Boolean =
kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX_AUTOCREATE_ENABLED)

private val esClient: ElasticClient = getElasticClient(serverUrl, username, password)

private val executorService =
ThreadUtils.newDaemonFixedThreadPool(1, "elasticsearch-event-handler-execution")
implicit private val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(executorService)

private def checkIndexExisted: String => Boolean = (indexId: String) => {
checkArgument(
StringUtils.isNotBlank(indexId),
"index name must be configured for ElasticSearchEventHandler, please ensure %s is set",
SERVER_EVENT_ELASTICSEARCH_INDEX.key)
// check
esClient.execute {
indexExists(indexId)
}.await.result.isExists
}

private def checkAndEnsureIndex(indexId: String): Unit = {
if (!checkIndexExisted(indexId) && isAutoCreateIndex) {
esClient.execute {
createIndex(indexId)
.mapping(MappingDefinition(dynamic = Some(DynamicMapping.Dynamic)))
}.await
if (!checkIndexExisted(indexId)) {
throwExceptionIndexNotfound(indexId)
}
} else {
throwExceptionIndexNotfound(indexId)
}
}

checkAndEnsureIndex(indexId)

private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

override def apply(event: KyuubiEvent): Unit = {
val fields = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to be a good solution that serializes the event into a json string and then deserializes it again into a map. elastic4s provides the indexInto(indexId).doc(XXX) method to directly index json string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried, but not working. Due to the nested fields that I have to serialize-deserialize-map again. Tried to implement a method for generating flat json, but not serializing properly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but not serializing properly.

Are there any specific errors?

objectMapper.readValue(event.toJson, classOf[Map[String, Any]]).map {
case (key: String, value: Any) =>
val isNumber = (obj: Any) => {
obj.isInstanceOf[Byte] || obj.isInstanceOf[Short] ||
obj.isInstanceOf[Int] || obj.isInstanceOf[Long] ||
obj.isInstanceOf[Float] || obj.isInstanceOf[Double]
}
if (value.isInstanceOf[String] || isNumber(value)) {
(key, value)
} else {
(key, objectMapper.writeValueAsString(value))
}
}
}
esClient.execute {
bowenliang123 marked this conversation as resolved.
Show resolved Hide resolved
indexInto(indexId).fields(fields).refresh(RefreshPolicy.Immediate)
}.onComplete {
case Failure(f) =>
error(s"Failed to send event in ElasticSearchLoggingEventHandler, ${f.getMessage}", f)
case _ =>
}
}

override def close(): Unit = {
executorService.shutdown()
esClient.close()
}

private def throwExceptionIndexNotfound(indexId: String) =
throw new KyuubiException(s"the index '$indexId' is not found on ElasticSearch")
}

object ElasticSearchLoggingEventHandler {
def getElasticClient(
serverUrl: String,
user: Option[String],
password: Option[String]): ElasticClient = {
val props = ElasticProperties(serverUrl)
val configCallback: HttpClientConfigCallback =
(httpAsyncClientBuilder: HttpAsyncClientBuilder) => {
(user, password) match {
case (Some(userStr), Some(passwordStr)) =>
val provider = new BasicCredentialsProvider
val credentials = new UsernamePasswordCredentials(userStr, passwordStr)
provider.setCredentials(AuthScope.ANY, credentials)
httpAsyncClientBuilder.setDefaultCredentialsProvider(provider)
case _ => httpAsyncClientBuilder
}
}
val javaClient = JavaClient.apply(props, configCallback)
ElasticClient(javaClient)
}
}
11 changes: 11 additions & 0 deletions kyuubi-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>com.sksamuel.elastic4s</groupId>
<artifactId>elastic4s-client-esjava_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
Expand All @@ -300,6 +305,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-elasticsearch_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.net.InetAddress

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler, ServerKafkaLoggingEventHandler}
import org.apache.kyuubi.events.handler._
import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
import org.apache.kyuubi.util.KyuubiHadoopUtils

Expand Down Expand Up @@ -53,6 +53,26 @@ object ServerEventHandlerRegister extends EventHandlerRegister {
closeTimeoutInMs)
}

override def createElasticSearchEventHandler(kyuubiConf: KyuubiConf)
: EventHandler[KyuubiEvent] = {
val indexId = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_INDEX).getOrElse {
throw new IllegalArgumentException(
s"${SERVER_EVENT_ELASTICSEARCH_INDEX.key} must be configured")
}
val serverUrl = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_SERVER_URL).getOrElse {
throw new IllegalArgumentException(
s"${SERVER_EVENT_ELASTICSEARCH_SERVER_URL.key} must be configured")
}
val username = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_USER)
val password = kyuubiConf.get(SERVER_EVENT_ELASTICSEARCH_PASSWORD)
ServerElasticSearchLoggingEventHandler(
indexId,
serverUrl,
username,
password,
kyuubiConf)
}

override protected def getLoggers(conf: KyuubiConf): Seq[String] = {
conf.get(SERVER_EVENT_LOGGERS)
}
Expand Down
Loading
Loading