diff --git a/.gitignore b/.gitignore
index 29bb61545c5..9f9ba715480 100644
--- a/.gitignore
+++ b/.gitignore
@@ -67,6 +67,7 @@ embedded_zookeeper/
/externals/kyuubi-spark-sql-engine/engine_operation_logs/
/externals/kyuubi-spark-sql-engine/spark-warehouse/
/work/
+/upload/
/docs/_build/
/kyuubi-common/metrics/
/kyuubi-server/metrics/
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 68014a5cf3b..525316b8e06 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -433,24 +433,25 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
### Server
-| Key | Default | Meaning | Type | Since |
-|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
-| kyuubi.server.administrators || Comma-separated list of Kyuubi service administrators. We use this config to grant admin permission to any service accounts when security mechanism is enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE, everyone is treated as administrator. | set | 1.8.0 |
-| kyuubi.server.info.provider | ENGINE | The server information provider name, some clients may rely on this information to check the server compatibilities and functionalities.
SERVER: Return Kyuubi server information. ENGINE: Return Kyuubi engine information. | string | 1.6.1 |
-| kyuubi.server.limit.batch.connections.per.ipaddress | <undefined> | Maximum kyuubi server batch connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 |
-| kyuubi.server.limit.batch.connections.per.user | <undefined> | Maximum kyuubi server batch connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 |
-| kyuubi.server.limit.batch.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server batch connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.7.0 |
-| kyuubi.server.limit.client.fetch.max.rows | <undefined> | Max rows limit for getting result row set operation. If the max rows specified by client-side is larger than the limit, request will fail directly. | int | 1.8.0 |
-| kyuubi.server.limit.connections.ip.deny.list || The client ip in the deny list will be denied to connect to kyuubi server. | set | 1.9.1 |
-| kyuubi.server.limit.connections.per.ipaddress | <undefined> | Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 |
-| kyuubi.server.limit.connections.per.user | <undefined> | Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 |
-| kyuubi.server.limit.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.6.0 |
-| kyuubi.server.limit.connections.user.deny.list || The user in the deny list will be denied to connect to kyuubi server, if the user has configured both user.unlimited.list and user.deny.list, the priority of the latter is higher. | set | 1.8.0 |
-| kyuubi.server.limit.connections.user.unlimited.list || The maximum connections of the user in the white list will not be limited. | set | 1.7.0 |
-| kyuubi.server.name | <undefined> | The name of Kyuubi Server. | string | 1.5.0 |
-| kyuubi.server.periodicGC.interval | PT30M | How often to trigger a garbage collection. | duration | 1.7.0 |
-| kyuubi.server.redaction.regex | <undefined> | Regex to decide which Kyuubi contain sensitive information. When this regex matches a property key or value, the value is redacted from the various logs. || 1.6.0 |
-| kyuubi.server.thrift.resultset.default.fetch.size | 1000 | The number of rows sent in one Fetch RPC call by the server to the client, if not specified by the client. Respect `hive.server2.thrift.resultset.default.fetch.size` hive conf. | int | 1.9.1 |
+| Key | Default | Meaning | Type | Since |
+|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------|
+| kyuubi.server.administrators || Comma-separated list of Kyuubi service administrators. We use this config to grant admin permission to any service accounts when security mechanism is enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE, everyone is treated as administrator. | set | 1.8.0 |
+| kyuubi.server.info.provider | ENGINE | The server information provider name, some clients may rely on this information to check the server compatibilities and functionalities. SERVER: Return Kyuubi server information. ENGINE: Return Kyuubi engine information. | string | 1.6.1 |
+| kyuubi.server.limit.batch.connections.per.ipaddress | <undefined> | Maximum kyuubi server batch connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 |
+| kyuubi.server.limit.batch.connections.per.user | <undefined> | Maximum kyuubi server batch connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 |
+| kyuubi.server.limit.batch.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server batch connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.7.0 |
+| kyuubi.server.limit.client.fetch.max.rows | <undefined> | Max rows limit for getting result row set operation. If the max rows specified by client-side is larger than the limit, request will fail directly. | int | 1.8.0 |
+| kyuubi.server.limit.connections.ip.deny.list || The client ip in the deny list will be denied to connect to kyuubi server. | set | 1.9.1 |
+| kyuubi.server.limit.connections.per.ipaddress | <undefined> | Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 |
+| kyuubi.server.limit.connections.per.user | <undefined> | Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 |
+| kyuubi.server.limit.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.6.0 |
+| kyuubi.server.limit.connections.user.deny.list || The user in the deny list will be denied to connect to kyuubi server, if the user has configured both user.unlimited.list and user.deny.list, the priority of the latter is higher. | set | 1.8.0 |
+| kyuubi.server.limit.connections.user.unlimited.list || The maximum connections of the user in the white list will not be limited. | set | 1.7.0 |
+| kyuubi.server.name | <undefined> | The name of Kyuubi Server. | string | 1.5.0 |
+| kyuubi.server.periodicGC.interval | PT30M | How often to trigger a garbage collection. | duration | 1.7.0 |
+| kyuubi.server.redaction.regex | <undefined> | Regex to decide which Kyuubi contain sensitive information. When this regex matches a property key or value, the value is redacted from the various logs. || 1.6.0 |
+| kyuubi.server.tempFile.expireTime | P14D | Expiration timout for cleanup server-side temporary files, e.g. operation logs. | duration | 1.10.0 |
+| kyuubi.server.thrift.resultset.default.fetch.size | 1000 | The number of rows sent in one Fetch RPC call by the server to the client, if not specified by the client. Respect `hive.server2.thrift.resultset.default.fetch.size` hive conf. | int | 1.9.1 |
### Session
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index d58a22e45ab..771bb65ee2d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -43,6 +43,7 @@ import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, Operati
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.TempFileCleanupUtils
import org.apache.kyuubi.util.reflect.DynFields
class ExecutePython(
@@ -398,7 +399,7 @@ object ExecutePython extends Logging {
val source = getClass.getClassLoader.getResourceAsStream(s"python/$pyfile")
val file = new File(pythonPath.toFile, pyfile)
- file.deleteOnExit()
+ TempFileCleanupUtils.deleteOnExit(file)
val sink = new FileOutputStream(file)
val buf = new Array[Byte](1024)
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 747351e41b5..57c5a27fdf7 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -123,6 +123,11 @@
HikariCP
+
+ com.google.guava
+ guava
+
+
org.apache.kyuubi
kyuubi-util-scala_${scala.binary.version}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 326b1601fff..f58bff3a37a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -38,6 +38,7 @@ import org.apache.hadoop.util.ShutdownHookManager
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.internal.Tests.IS_TESTING
+import org.apache.kyuubi.util.TempFileCleanupUtils
import org.apache.kyuubi.util.command.CommandLineUtils._
object Utils extends Logging {
@@ -138,6 +139,10 @@ object Utils extends Logging {
* Delete a directory recursively.
*/
def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true): Unit = {
+ if (f == null || !f.exists()) {
+ return
+ }
+
if (f.isDirectory) {
val files = f.listFiles
if (files != null && files.nonEmpty) {
@@ -164,7 +169,7 @@ object Utils extends Logging {
prefix: String = "kyuubi",
root: String = System.getProperty("java.io.tmpdir")): Path = {
val dir = createDirectory(root, prefix)
- dir.toFile.deleteOnExit()
+ TempFileCleanupUtils.deleteOnExit(dir)
dir
}
@@ -211,9 +216,8 @@ object Utils extends Logging {
} finally {
source.close()
}
- val file = filePath.toFile
- file.deleteOnExit()
- file
+ TempFileCleanupUtils.deleteOnExit(filePath)
+ filePath.toFile
} catch {
case e: Exception =>
error(
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a88b5f615e7..ebb28d41503 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3113,6 +3113,14 @@ object KyuubiConf {
.timeConf
.createWithDefaultString("PT30M")
+ val SERVER_TEMP_FILE_EXPIRE_TIME: ConfigEntry[Long] =
+ buildConf("kyuubi.server.tempFile.expireTime")
+ .doc("Expiration timout for cleanup server-side temporary files, e.g. operation logs.")
+ .version("1.10.0")
+ .serverOnly
+ .timeConf
+ .createWithDefaultString("P14D")
+
val SERVER_ADMINISTRATORS: ConfigEntry[Set[String]] =
buildConf("kyuubi.server.administrators")
.doc("Comma-separated list of Kyuubi service administrators. " +
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index b3bd46d35a4..e77a726d8c4 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, Fe
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
-import org.apache.kyuubi.util.ThriftUtils
+import org.apache.kyuubi.util.{TempFileCleanupUtils, ThriftUtils}
object OperationLog extends Logging {
final private val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
@@ -49,19 +49,23 @@ object OperationLog extends Logging {
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
/**
- * The operation log root directory, this directory will delete when JVM exit.
+ * The operation log root directory, this directory will be deleted
+ * either after the duration of `kyuubi.server.tempFile.expireTime`
+ * or when JVM exit.
*/
- def createOperationLogRootDirectory(session: Session): Unit = {
- session.sessionManager.operationLogRoot.foreach { operationLogRoot =>
+ def createOperationLogRootDirectory(session: Session): Path = {
+ session.sessionManager.operationLogRoot.map { operationLogRoot =>
val path = Paths.get(operationLogRoot, session.handle.identifier.toString)
try {
Files.createDirectories(path)
- path.toFile.deleteOnExit()
+ TempFileCleanupUtils.deleteOnExit(path)
+ path
} catch {
case e: IOException =>
error(s"Failed to create operation log root directory: $path", e)
+ null
}
- }
+ }.orNull
}
/**
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala
new file mode 100644
index 00000000000..a53259e7a13
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.service
+
+import java.nio.file.{Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.TempFileService.tempFileCounter
+import org.apache.kyuubi.util.{TempFileCleanupUtils, ThreadUtils}
+
+class TempFileService(name: String) extends AbstractService(name) {
+ def this() = this(classOf[TempFileService].getSimpleName)
+
+ final private var expiringFiles: Cache[String, String] = _
+ private lazy val cleanupScheduler =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-cleanup-scheduler")
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ super.initialize(conf)
+ val expireTimeInMs = conf.get(KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME)
+ expiringFiles = CacheBuilder.newBuilder()
+ .expireAfterWrite(expireTimeInMs, TimeUnit.MILLISECONDS)
+ .removalListener((notification: RemovalNotification[String, String]) => {
+ val pathStr = notification.getValue
+ debug(s"Remove expired temp file: $pathStr")
+ cleanupFilePath(pathStr)
+ })
+ .build[String, String]()
+
+ cleanupScheduler.scheduleAtFixedRate(
+ () => expiringFiles.cleanUp(),
+ 0,
+ Math.max(expireTimeInMs / 10, 100),
+ TimeUnit.MILLISECONDS)
+ }
+
+ override def stop(): Unit = {
+ expiringFiles.asMap().values().forEach(cleanupFilePath)
+ super.stop()
+ }
+
+ private def cleanupFilePath(pathStr: String): Unit = {
+ try {
+ val path = Paths.get(pathStr)
+ TempFileCleanupUtils.cancelDeleteOnExit(path)
+ Utils.deleteDirectoryRecursively(path.toFile)
+ } catch {
+ case e: Throwable => error(s"Failed to delete file $pathStr", e)
+ }
+ }
+
+ /**
+ * add the file path to the expiration list
+ * ensuring the path will be deleted
+ * either after duration
+ * or on the JVM exit
+ *
+ * @param path the path of file or directory
+ */
+ def addPathToExpiration(path: Path): Unit = {
+ require(path != null)
+ expiringFiles.put(
+ s"${tempFileCounter.incrementAndGet()}-${System.currentTimeMillis()}",
+ path.toString)
+ TempFileCleanupUtils.deleteOnExit(path)
+ }
+}
+
+object TempFileService {
+ private lazy val tempFileCounter = new AtomicLong(0)
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 14e59078fb4..1fe5188ad6a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.session
+import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
@@ -259,8 +260,10 @@ abstract class AbstractSession(
}
}
+ protected var operationalLogRootDir: Option[Path] = None
+
override def open(): Unit = {
- OperationLog.createOperationLogRootDirectory(this)
+ operationalLogRootDir = Option(OperationLog.createOperationLogRootDirectory(this))
}
val isForAliveProbe: Boolean =
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
new file mode 100644
index 00000000000..9e27fe42698
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.util
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.kyuubi.Utils
+
+object TempFileCleanupUtils {
+ private lazy val deleteTargets = ConcurrentHashMap.newKeySet[String]()
+
+ private lazy val isCleanupShutdownHookInstalled = {
+ installFilesCleanupOnExitShutdownHook()
+ new AtomicBoolean(true)
+ }
+
+ private def installFilesCleanupOnExitShutdownHook(): Unit = {
+ Utils.addShutdownHook(() => {
+ deleteTargets.forEach { pathStr =>
+ try {
+ Utils.deleteDirectoryRecursively(Paths.get(pathStr).toFile)
+ } catch {
+ case _: Exception =>
+ }
+ }
+ deleteTargets.clear()
+ })
+ }
+
+ def deleteOnExit(file: File): Unit = {
+ require(file != null)
+ deleteOnExit(file.toPath)
+ }
+
+ def deleteOnExit(path: Path): Unit = {
+ require(path != null)
+ isCleanupShutdownHookInstalled.get()
+ deleteTargets.add(path.toString)
+ }
+
+ def cancelDeleteOnExit(path: Path): Unit = {
+ require(path != null)
+ isCleanupShutdownHookInstalled.get()
+ deleteTargets.remove(path.toString)
+ }
+
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index ace7ba9d46e..f8c9ffa1b4c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
-import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
+import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState, TempFileService}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
@@ -202,6 +202,8 @@ class KyuubiServer(name: String) extends Serverable(name) {
throw new UnsupportedOperationException(s"Frontend protocol $other is not supported yet.")
}
+ final var tempFileService: TempFileService = _
+
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()
addService(kinit)
@@ -209,6 +211,9 @@ class KyuubiServer(name: String) extends Serverable(name) {
val periodicGCService = new PeriodicGCService
addService(periodicGCService)
+ tempFileService = new TempFileService
+ addService(tempFileService)
+
if (conf.get(MetricsConf.METRICS_ENABLED)) {
addService(new MetricsSystem)
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index b5e98845e6e..de69cf37711 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -44,6 +44,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, ApplicationState, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
import org.apache.kyuubi.server.KyuubiServer
+import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
@@ -568,6 +569,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
uploadFileFolderPath: JPath): Unit = {
try {
val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, fileName)
+ kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
request.setResource(tempFile.getPath)
} catch {
case e: Exception =>
@@ -599,10 +601,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val tempFilePaths = fileParts.map { filePart =>
val fileName = filePart.getContentDisposition.getFileName
try {
- Utils.writeToTempFile(
+ val tempFile = Utils.writeToTempFile(
filePart.getValueAs(classOf[InputStream]),
uploadFileFolderPath,
- fileName).getPath
+ fileName)
+ kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
+ tempFile.getPath
} catch {
case e: Exception =>
throw new RuntimeException(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d0e8e042f7b..6b4e2a2e7d5 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -118,6 +118,8 @@ class KyuubiSessionImpl(
// we should call super.open before running launch engine operation
super.open()
+ sessionManager.tempFileService.addPathToExpiration(operationalLogRootDir.get)
+
runOperation(launchEngineOp)
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9edc8218eb1..a20b4bc97c7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -36,8 +36,10 @@ import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor}
+import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef}
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
+import org.apache.kyuubi.service.TempFileService
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
@@ -71,6 +73,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
private val engineConnectionAliveChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
+ def tempFileService: TempFileService = kyuubiServer.tempFileService
+
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
addService(applicationManager)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
new file mode 100644
index 00000000000..4b7568c1c79
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import java.io.ByteArrayInputStream
+import java.time.Duration
+
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
+import org.apache.kyuubi.Utils.writeToTempFile
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME
+
+class TempFileServiceSuite extends WithKyuubiServer {
+ private val expirationInMs = 100
+
+ override protected val conf: KyuubiConf = KyuubiConf()
+ .set(SERVER_TEMP_FILE_EXPIRE_TIME, Duration.ofMillis(expirationInMs).toMillis)
+
+ test("file cleaned up after expiration") {
+ val tempFileService = KyuubiServer.kyuubiServer.tempFileService
+ (0 until 3).map { i =>
+ val dir = Utils.createTempDir()
+ writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir, s"$i.txt")
+ dir.toFile
+ }.map { dirFile =>
+ assert(dirFile.exists())
+ tempFileService.addPathToExpiration(dirFile.toPath)
+ dirFile
+ }.foreach { f =>
+ eventually(Timeout((expirationInMs * 2).millis)) {
+ assert(!f.exists())
+ }
+ }
+ }
+}