diff --git a/pom.xml b/pom.xml index 059137a0..62a52dfc 100644 --- a/pom.xml +++ b/pom.xml @@ -12,13 +12,13 @@ org.apache.spark spark-core_2.11 - 2.3.0 + 2.3.2 provided org.apache.spark spark-network-common_2.11 - 2.3.0 + 2.3.2 com.intel.hpnl diff --git a/src/main/scala/org/apache/spark/network/pmof/RdmaTransferService.scala b/src/main/scala/org/apache/spark/network/pmof/RdmaTransferService.scala index 11d9ac63..a35e663d 100644 --- a/src/main/scala/org/apache/spark/network/pmof/RdmaTransferService.scala +++ b/src/main/scala/org/apache/spark/network/pmof/RdmaTransferService.scala @@ -5,7 +5,7 @@ import java.util.Random import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import org.apache.spark.network.BlockDataManager -import org.apache.spark.network.shuffle.{BlockFetchingListener, TempFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager} import org.apache.spark.serializer.JavaSerializer import org.apache.spark.shuffle.pmof.{MetadataResolver, PmofShuffleManager} import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockId} @@ -30,7 +30,7 @@ class RdmaTransferService(conf: SparkConf, val shuffleManager: PmofShuffleManage executId: String, blockIds: Array[String], blockFetchingListener: BlockFetchingListener, - tempFileManager: TempFileManager): Unit = {} + tempFileManager: DownloadFileManager): Unit = {} def fetchBlock(reqHost: String, reqPort: Int, rmaAddress: Long, rmaLength: Int, rmaRkey: Long, localAddress: Int, shuffleBuffer: ShuffleBuffer, diff --git a/src/main/scala/org/apache/spark/storage/pmof/RdmaShuffleBlockFetcherIterator.scala b/src/main/scala/org/apache/spark/storage/pmof/RdmaShuffleBlockFetcherIterator.scala index 1b3e7f32..cf292a1e 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/RdmaShuffleBlockFetcherIterator.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/RdmaShuffleBlockFetcherIterator.scala @@ -23,11 +23,12 @@ import java.util.concurrent.LinkedBlockingQueue import javax.annotation.concurrent.GuardedBy import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.shuffle.{ShuffleClient, TempFileManager} +import org.apache.spark.network.shuffle.{ShuffleClient, DownloadFileManager, DownloadFile, SimpleDownloadFile} import org.apache.spark.network.pmof._ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage._ import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.network.util.TransportConf import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -70,7 +71,7 @@ final class RdmaShuffleBlockFetcherIterator( maxBlocksInFlightPerAddress: Int, maxReqSizeShuffleToMem: Long, detectCorrupt: Boolean) - extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging { + extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging { import RdmaShuffleBlockFetcherIterator._ @@ -126,7 +127,7 @@ final class RdmaShuffleBlockFetcherIterator( * deleted when cleanup. This is a layer of defensiveness against disk file leaks. */ @GuardedBy("this") - private[this] val shuffleFilesSet = mutable.HashSet[File]() + private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]() private[this] val remoteRdmaRequestQueue = new LinkedBlockingQueue[RdmaRequest]() @@ -256,11 +257,13 @@ final class RdmaShuffleBlockFetcherIterator( currentResult = null } - override def createTempFile(): File = { - blockManager.diskBlockManager.createTempLocalBlock()._2 + //override def createTempFile(): DownloadFile = { + override def createTempFile(transportConf: TransportConf): DownloadFile = { + new SimpleDownloadFile( + blockManager.diskBlockManager.createTempLocalBlock()._2, transportConf) } - override def registerTempFileToClean(file: File): Boolean = synchronized { + override def registerTempFileToClean(file: DownloadFile): Boolean = synchronized { if (isZombie) { false } else { @@ -296,7 +299,7 @@ final class RdmaShuffleBlockFetcherIterator( } shuffleFilesSet.foreach { file => if (!file.delete()) { - logWarning("Failed to cleanup shuffle fetch temp file " + file.getAbsolutePath) + //logWarning("Failed to cleanup shuffle fetch temp file " + file.getAbsolutePath) } } }