diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index d7bda5bbe721..49165b14429e 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -178,6 +178,13 @@ class HadoopMapReduceCommitProtocol( val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) committer.setupJob(jobContext) + try { + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + fs.deleteOnExit(stagingDir) + } catch { + case e: Throwable => + logWarning(log"Exception while setting clean logic ${MDC(JOB_ID, jobContext.getJobID)}", e) + } } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {