Skip to content

Add metrics for batch pending max elapse time #6829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

101 changes: 51 additions & 50 deletions docs/monitor/metrics.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,16 @@ object KyuubiConf {
.intConf
.createWithDefault(65536)

val METADATA_SEARCH_WINDOW: OptionalConfigEntry[Long] =
buildConf("kyuubi.metadata.search.window")
.doc("The time window to restrict user queries to metadata within a specific period. " +
"For example, if the window is set to P7D, only metadata from the past 7 days can be " +
"queried. If not set, it allows searching all metadata information in the metadata store.")
.version("1.10.1")
.timeConf
.checkValue(_ > 0, "must be positive number")
.createOptional

val ENGINE_EXEC_WAIT_QUEUE_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.backend.engine.exec.pool.wait.queue.size")
.doc("Size of the wait queue for the operation execution thread pool in SQL engine" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object MetricsConstants {
final val OPERATION_TOTAL: String = OPERATION + "total"
final val OPERATION_STATE: String = OPERATION + "state"
final val OPERATION_EXEC_TIME: String = OPERATION + "exec_time"
final val OPERATION_BATCH_PENDING_MAX_ELAPSE: String = OPERATION + "batch_pending_max_elapse"

final private val BACKEND_SERVICE = KYUUBI + "backend_service."
final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ class BatchJobSubmission(
Utils.deleteDirectoryRecursively(session.resourceUploadFolderPath.toFile)
}
}

def getPendingElapsedTime: Long = {
if (state == OperationState.PENDING) {
System.currentTimeMillis() - createTime
} else {
0L
}
}
}

object BatchJobSubmission {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import org.eclipse.jetty.servlet.{ErrorPageErrorHandler, FilterHolder}
import org.apache.kyuubi.{KyuubiException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.server.api.v1.ApiRootResource
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
import org.apache.kyuubi.service.authentication.{AuthTypes, AuthUtils}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.{JavaUtils, ThreadUtils}
import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay

Expand Down Expand Up @@ -202,6 +204,14 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}
}

private def getBatchPendingMaxElapse(): Long = {
val batchPendingElapseTimes = sessionManager.allSessions().map {
case session: KyuubiBatchSession => session.batchJobSubmissionOp.getPendingElapsedTime
case _ => 0L
}
if (batchPendingElapseTimes.isEmpty) 0L else batchPendingElapseTimes.max
}

def waitForServerStarted(): Unit = {
// block until the HTTP server is started, otherwise, we may get
// the wrong HTTP server port -1
Expand All @@ -220,6 +230,9 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
isStarted.set(true)
startBatchChecker()
recoverBatchSessions()
MetricsSystem.tracing { ms =>
ms.registerGauge(OPERATION_BATCH_PENDING_MAX_ELAPSE, getBatchPendingMaxElapse, 0)
}
} catch {
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
fe.getConf.get(ENGINE_SECURITY_ENABLED)
private lazy val resourceFileMaxSize = fe.getConf.get(BATCH_RESOURCE_FILE_MAX_SIZE)
private lazy val extraResourceFileMaxSize = fe.getConf.get(BATCH_EXTRA_RESOURCE_FILE_MAX_SIZE)
private lazy val metadataSearchWindow = fe.getConf.get(METADATA_SEARCH_WINDOW)

private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
Expand Down Expand Up @@ -420,13 +421,15 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}

val createTimeFilter =
math.max(createTime, metadataSearchWindow.map(System.currentTimeMillis() - _).getOrElse(0L))
val filter = MetadataFilter(
sessionType = SessionType.BATCH,
engineType = batchType,
username = batchUser,
state = batchState,
requestName = batchName,
createTime = createTime,
createTime = createTimeFilter,
endTime = endTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but createTime and endTime are confusing, should be minCreateTime and maxEndTime, and say if they are inclusive or exclusive in the comments, with an explanation for specific values

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: the predication name might be more intuitive if we use greatThanXXX lessThanOrEqualsXXX

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address it in another PR

val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size, desc)
new GetBatchesResponse(from, batches.size, batches.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,13 @@ class MetadataManager extends AbstractService("MetadataManager") {
from: Int,
size: Int,
desc: Boolean = false): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, desc)).map(
buildBatch)
withMetadataRequestMetrics(_metadataStore.getMetadataList(
filter,
from,
size,
// if create_file field is set, order by create_time, which is faster, otherwise by key_id
orderBy = if (filter.createTime > 0) Some("create_time") else Some("key_id"),
direction = if (desc) "DESC" else "ASC")).map(buildBatch)
}

def countBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ trait MetadataStore extends Closeable {
* @param from the metadata offset.
* @param size the size to get.
* @param desc the order of metadata list.
* @param orderBy the order by column, default is the auto increment primary key, `key_id`.
* @param direction the order direction, default is `ASC`.
* @return selected metadata list.
*/
def getMetadataList(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata]
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata]

/**
* Count the metadata list with filter conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata] = {
orderBy: Option[String] = Some("key_id"),
direction: String = "ASC"): Seq[Metadata] = {
val queryBuilder = new StringBuilder
val params = ListBuffer[Any]()
queryBuilder.append("SELECT ")
queryBuilder.append(METADATA_COLUMNS)
queryBuilder.append(s" FROM $METADATA_TABLE")
queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
queryBuilder.append(" ORDER BY key_id ")
queryBuilder.append(if (desc) "DESC " else "ASC ")
orderBy.foreach(o => queryBuilder.append(s" ORDER BY $o $direction "))
queryBuilder.append(dialect.limitClause(size, from))
val query = queryBuilder.toString
JdbcUtils.withConnection { connection =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.queryParam("from", "0")
.queryParam("size", "1")
.queryParam("desc", "true")
.queryParam("createTime", "1")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
Expand Down
Loading