Skip to content

Commit 1e93dd2

Browse files
committed
save
metrics pending doc limit fix
1 parent 2ab2789 commit 1e93dd2

File tree

10 files changed

+112
-59
lines changed

10 files changed

+112
-59
lines changed

docs/configuration/settings.md

+2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
8383
| kyuubi.batch.application.starvation.timeout | PT3M | Threshold above which to warn batch application may be starved. | duration | 1.7.0 |
8484
| kyuubi.batch.conf.ignore.list || A comma-separated list of ignored keys for batch conf. If the batch conf contains any of them, the key and the corresponding value will be removed silently during batch job submission. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering. You can also pre-define some config for batch job submission with the prefix: kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master` for the Spark batch job with key `kyuubi.batchConf.spark.spark.master`. | set | 1.6.0 |
8585
| kyuubi.batch.extra.resource.file.max.size | 0 | The maximum size in bytes of each uploaded extra resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 |
86+
| kyuubi.batch.pending.check.window | PT24H | The time window to check the batch pending max elapse time from metadata store. | duration | 1.10.1 |
8687
| kyuubi.batch.resource.file.max.size | 0 | The maximum size in bytes of the uploaded resource file when creating batch. 0 or negative value means no limit. | long | 1.10.0 |
88+
| kyuubi.batch.search.window | PT72H | The time window to search the batch from metadata store. | duration | 1.10.1 |
8789
| kyuubi.batch.session.idle.timeout | PT6H | Batch session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.6.2 |
8890

8991
### Credentials

docs/monitor/metrics.md

+51-50
Large diffs are not rendered by default.

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

+16
Original file line numberDiff line numberDiff line change
@@ -1930,6 +1930,22 @@ object KyuubiConf {
19301930
.stringConf
19311931
.createWithDefault("1")
19321932

1933+
val BATCH_PENDING_CHECK_WINDOW: ConfigEntry[Long] =
1934+
buildConf("kyuubi.batch.pending.check.window")
1935+
.doc("The time window to check the batch pending max elapse time from metadata store.")
1936+
.version("1.10.1")
1937+
.timeConf
1938+
.checkValue(_ > 0, "must be positive number")
1939+
.createWithDefault(Duration.ofDays(1).toMillis)
1940+
1941+
val BATCH_SEARCH_WINDOW: ConfigEntry[Long] =
1942+
buildConf("kyuubi.batch.search.window")
1943+
.doc("The time window to search the batch from metadata store.")
1944+
.version("1.10.1")
1945+
.timeConf
1946+
.checkValue(_ > 0, "must be positive number")
1947+
.createWithDefault(Duration.ofDays(3).toMillis)
1948+
19331949
val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
19341950
buildConf("kyuubi.backend.server.exec.pool.size")
19351951
.doc("Number of threads in the operation execution thread pool of Kyuubi server")

kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ object MetricsConstants {
6464
final val OPERATION_TOTAL: String = OPERATION + "total"
6565
final val OPERATION_STATE: String = OPERATION + "state"
6666
final val OPERATION_EXEC_TIME: String = OPERATION + "exec_time"
67+
final val OPERATION_BATCH_PENDING_MAX_ELAPSE: String = OPERATION + "batch_pending_max_elapse"
6768

6869
final private val BACKEND_SERVICE = KYUUBI + "backend_service."
6970
final val BS_FETCH_LOG_ROWS_RATE = BACKEND_SERVICE + "fetch_log_rows_rate"

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala

+19
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,12 @@ import org.eclipse.jetty.servlet.{ErrorPageErrorHandler, FilterHolder}
3131
import org.apache.kyuubi.{KyuubiException, Utils}
3232
import org.apache.kyuubi.config.KyuubiConf
3333
import org.apache.kyuubi.config.KyuubiConf._
34+
import org.apache.kyuubi.metrics.MetricsConstants
35+
import org.apache.kyuubi.metrics.MetricsSystem
36+
import org.apache.kyuubi.operation.OperationState
3437
import org.apache.kyuubi.server.api.v1.ApiRootResource
3538
import org.apache.kyuubi.server.http.authentication.{AuthenticationFilter, KyuubiHttpAuthenticationFactory}
39+
import org.apache.kyuubi.server.metadata.api.MetadataFilter
3640
import org.apache.kyuubi.server.ui.{JettyServer, JettyUtils}
3741
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service, ServiceUtils}
3842
import org.apache.kyuubi.service.authentication.{AuthTypes, AuthUtils}
@@ -202,6 +206,15 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
202206
}
203207
}
204208

209+
private def getBatchPendingMaxElapse(): Long = {
210+
val filter = MetadataFilter(
211+
state = OperationState.PENDING.toString,
212+
kyuubiInstance = connectionUrl,
213+
createTime = System.currentTimeMillis() - conf.get(BATCH_PENDING_CHECK_WINDOW))
214+
sessionManager.getBatchesFromMetadataStore(filter, 0, 1, desc = false, orderByKeyId = false)
215+
.headOption.map { batch => System.currentTimeMillis() - batch.getCreateTime }.getOrElse(0L)
216+
}
217+
205218
def waitForServerStarted(): Unit = {
206219
// block until the HTTP server is started, otherwise, we may get
207220
// the wrong HTTP server port -1
@@ -220,6 +233,12 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
220233
isStarted.set(true)
221234
startBatchChecker()
222235
recoverBatchSessions()
236+
MetricsSystem.tracing { ms =>
237+
ms.registerGauge(
238+
MetricsConstants.OPERATION_BATCH_PENDING_MAX_ELAPSE,
239+
getBatchPendingMaxElapse,
240+
0)
241+
}
223242
} catch {
224243
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)
225244
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
6767
fe.getConf.get(ENGINE_SECURITY_ENABLED)
6868
private lazy val resourceFileMaxSize = fe.getConf.get(BATCH_RESOURCE_FILE_MAX_SIZE)
6969
private lazy val extraResourceFileMaxSize = fe.getConf.get(BATCH_EXTRA_RESOURCE_FILE_MAX_SIZE)
70+
private lazy val batchSearchWindow = fe.getConf.get(BATCH_SEARCH_WINDOW)
7071

7172
private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
7273
fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
@@ -426,9 +427,11 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
426427
username = batchUser,
427428
state = batchState,
428429
requestName = batchName,
429-
createTime = createTime,
430+
createTime = math.max(createTime, System.currentTimeMillis() - batchSearchWindow),
430431
endTime = endTime)
431-
val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size, desc)
432+
// order by key_id(primary key auto increment column) is slower
433+
val batches =
434+
sessionManager.getBatchesFromMetadataStore(filter, from, size, desc, orderByKeyId = false)
432435
new GetBatchesResponse(from, batches.size, batches.asJava)
433436
}
434437

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,14 @@ class MetadataManager extends AbstractService("MetadataManager") {
138138
filter: MetadataFilter,
139139
from: Int,
140140
size: Int,
141-
desc: Boolean = false): Seq[Batch] = {
142-
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, desc)).map(
141+
desc: Boolean = false,
142+
orderByKeyId: Boolean = true): Seq[Batch] = {
143+
withMetadataRequestMetrics(_metadataStore.getMetadataList(
144+
filter,
145+
from,
146+
size,
147+
desc,
148+
orderByKeyId)).map(
143149
buildBatch)
144150
}
145151

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,16 @@ trait MetadataStore extends Closeable {
5858
* @param from the metadata offset.
5959
* @param size the size to get.
6060
* @param desc the order of metadata list.
61+
* @param orderByKeyId the result order by auto increment key id,
62+
* which is stable but might slow.
6163
* @return selected metadata list.
6264
*/
6365
def getMetadataList(
6466
filter: MetadataFilter,
6567
from: Int,
6668
size: Int,
67-
desc: Boolean = false): Seq[Metadata]
69+
desc: Boolean = false,
70+
orderByKeyId: Boolean = true): Seq[Metadata]
6871

6972
/**
7073
* Count the metadata list with filter conditions.

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -257,14 +257,15 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
257257
filter: MetadataFilter,
258258
from: Int,
259259
size: Int,
260-
desc: Boolean = false): Seq[Metadata] = {
260+
desc: Boolean = false,
261+
orderByKeyId: Boolean = true): Seq[Metadata] = {
261262
val queryBuilder = new StringBuilder
262263
val params = ListBuffer[Any]()
263264
queryBuilder.append("SELECT ")
264265
queryBuilder.append(METADATA_COLUMNS)
265266
queryBuilder.append(s" FROM $METADATA_TABLE")
266267
queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
267-
queryBuilder.append(" ORDER BY key_id ")
268+
queryBuilder.append(" ORDER BY ").append(if (orderByKeyId) "key_id " else "create_time ")
268269
queryBuilder.append(if (desc) "DESC " else "ASC ")
269270
queryBuilder.append(dialect.limitClause(size, from))
270271
val query = queryBuilder.toString

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
280280
filter: MetadataFilter,
281281
from: Int,
282282
size: Int,
283-
desc: Boolean = false): Seq[Batch] = {
284-
metadataManager.map(_.getBatches(filter, from, size, desc)).getOrElse(Seq.empty)
283+
desc: Boolean = false,
284+
orderByKeyId: Boolean = true): Seq[Batch] = {
285+
metadataManager.map(_.getBatches(filter, from, size, desc, orderByKeyId)).getOrElse(Seq.empty)
285286
}
286287

287288
def getBatchMetadata(batchId: String): Option[Metadata] = {

0 commit comments

Comments
 (0)