Skip to content

Commit

Permalink
removed debugging statements
Browse files Browse the repository at this point in the history
  • Loading branch information
geertvandeweyer committed Jan 6, 2025
1 parent 0aa58fd commit 3e09608
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,10 @@ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder
}

override def touchCommand: PartialFunction[Path, Try[S3BatchTouchCommand]] = { case path: S3Path =>
println(s"Touching $path")
Try(S3BatchTouchCommand(path))
}

override def existsCommand: PartialFunction[Path, Try[S3BatchExistsCommand]] = { case path: S3Path =>
println(s"Checking existence of $path")
Try(S3BatchExistsCommand(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,28 @@ import cromwell.backend.ReadLikeFunctions
//import cromwell.core.path.PathFactory
import scala.concurrent.Future
import scala.util.Try
import cromwell.core.path.{DefaultPathBuilder, PathBuilder, PathFactory}
import cromwell.core.path.{DefaultPathBuilder}
//import cromwell.backend.impl.aws.AwsBatchBackendInitializationData

//import cromwell.backend.BackendInitializationData
import cromwell.backend.standard.StandardExpressionFunctionsParams

trait AwsReadLikeFunctions extends ReadLikeFunctions {
// standardParams for expression does not contain backend info...
def standardParams: StandardExpressionFunctionsParams


//def backendConfig: AwsBatchConfiguration

//val aws_config = BackendInitializationData.as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption).configuration

override def readFile(path: String, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] = {
// get backend config
//val sp = stardardParams
//val ec = sp.executionContext
//println(s"standardParams: $ec")
//val initializationData = BackendInitializationData.as[AwsBatchBackendInitializationData](standardParams.backendInitializationDataOption)
//val backendConfig = initializationData.configuration
// Implement your custom logic here
println("in AwsReadLikeFunctions.readFile")
println(s"raw path : $path")
val pathBuilders: List[PathBuilder] = List(DefaultPathBuilder)
val AwsPath = PathFactory.buildPath(path, pathBuilders)
println(s"aws path : $AwsPath")
Future.fromTry(Try(AwsPath)) flatMap { p => asyncIo.contentAsStringAsync(p, maxBytes, failOnOverflow) }
// similar to aws globbing functions, no access to the backend config is available here....
/// => using hard coded /mnt/efs path.
val awsPath = if (path.startsWith("/mnt/efs/")) {
DefaultPathBuilder.get(path)
} else {
buildPath(path)
}
Future.fromTry(Try(awsPath)) flatMap { p => asyncIo.contentAsStringAsync(p, maxBytes, failOnOverflow) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,34 +145,13 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin
return isOptional
}

//println((s"config: ${configuration.efsMntPoint}"))
// check if the file is on efs (local) or s3
def is_efs(womFile: String): Boolean = {
// get efs mount point/ disk from config
val efs_mount = configuration.efsMntPoint.getOrElse("--")
return womFile.startsWith(efs_mount)
}
// orignal implementation:
//protected def processSimpletons(womValueSimpletons: Seq[WomValueSimpleton],
// sourceCallRootPath: Path
// ): Try[(CallOutputs, Set[IoCommand[_]])] = Try {
// val (destinationSimpletons, ioCommands): (List[WomValueSimpleton], Set[IoCommand[_]]) =
// womValueSimpletons.toList.foldMap {
// case WomValueSimpleton(key, wdlFile: WomSingleFile) =>
// val sourcePath = getPath(wdlFile.value).get
// val destinationPath =
// PathCopier.getDestinationFilePath(sourceCallRootPath, sourcePath, destinationCallRootPath)
//
// val destinationSimpleton = WomValueSimpleton(key, WomSingleFile(destinationPath.pathAsString))
//
// // PROD-444: Keep It Short and Simple: Throw on the first error and let the outer Try catch-and-re-wrap
// List(destinationSimpleton) -> Set(commandBuilder.copyCommand(sourcePath, destinationPath).get)
// case nonFileSimpleton => (List(nonFileSimpleton), Set.empty[IoCommand[_]])
// }
//
// (WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, destinationSimpletons), ioCommands)
// }



override def processSimpletons(womValueSimpletons: Seq[WomValueSimpleton],
sourceCallRootPath: Path
Expand All @@ -191,20 +170,14 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin
val destinationSimpleton = WomValueSimpleton(key, WomSingleFile(destinationPath.pathAsString))
if (is_optional(wdlFile.value,womFileMap)) {
// can I use this instead of noopCommand (from super) : case nonFileSimpleton => (List(nonFileSimpleton), Set.empty[IoCommand[_]])
//getPath(wdlFile.value).flatMap(S3BatchCommandBuilder.noopCommand)
Try(destinationSimpleton -> S3BatchCommandBuilder.noopCommand(destinationPath).get)
} else {
//getPath(wdlFile.value).flatMap(S3BatchCommandBuilder.existsOrThrowCommand)
Try(destinationSimpleton -> S3BatchCommandBuilder.existsOrThrowCommand(destinationPath).get)
}
case nonFileSimpleton =>
// case nonFileSimpleton => (List(nonFileSimpleton), Set.empty[IoCommand[_]])
Try(nonFileSimpleton -> S3BatchCommandBuilder.noopCommand(getPath("").get).get)
}
// group touchcommands
//TryUtil.sequence(touchCommands) map {
// WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, womValueSimpletons) -> _.toSet
//}
TryUtil.sequence(touchCommands) map { simpletonsAndCommands =>
val (destinationSimpletons, ioCommands) = simpletonsAndCommands.unzip
WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, destinationSimpletons) -> ioCommands.toSet
Expand All @@ -221,13 +194,10 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin
if (is_efs(wdlFile.value)) {
// on efs : source == destination
val destinationPath = sourcePath
//val destinationPath = PathCopier.getDestinationFilePath(sourceCallRootPath, sourcePath, destinationCallRootPath)
val destinationSimpleton = WomValueSimpleton(key, WomSingleFile(destinationPath.pathAsString))
if (is_optional(wdlFile.value,womFileMap)) {
//getPath(wdlFile.value).flatMap(S3BatchCommandBuilder.noopCommand)
Try(destinationSimpleton -> S3BatchCommandBuilder.noopCommand(destinationPath).get)
} else {
//getPath(wdlFile.value).flatMap(S3BatchCommandBuilder.existsOrThrowCommand)
Try(destinationSimpleton -> S3BatchCommandBuilder.existsOrThrowCommand(destinationPath).get)
}
}
Expand All @@ -238,22 +208,10 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin

// optional
if (is_optional(wdlFile.value,womFileMap)) {
//println(s"copying optional s3 file: ${wdlFile.value}")
//getPath(wdlFile.value).flatMap(S3BatchCommandBuilder.noopCommand)
Try(destinationSimpleton -> S3BatchCommandBuilder.noopCommand(destinationPath).get)
// mandatory
} else {
//val sourcePath = getPath(wdlFile.value).get
//val destinationPath =
// PathCopier.getDestinationFilePath(sourceCallRootPath, sourcePath, destinationCallRootPath)
//val destinationSimpleton = WomValueSimpleton(key, WomSingleFile(destinationPath.pathAsString))
//println(s"copying mandatory s3 file: ${wdlFile.value}" )
//println(s"sourcePath: ${sourcePath}")
//println(s"destinationPath: ${destinationPath}")
//S3BatchCommandBuilder.copyCommand(sourcePath, destinationPath)
Try(destinationSimpleton -> S3BatchCommandBuilder.copyCommand(sourcePath, destinationPath).get)


}
}
case nonFileSimpleton =>
Expand All @@ -262,10 +220,8 @@ class AwsBatchBackendCacheHitCopyingActor(standardParams: StandardCacheHitCopyin
// get copycommands
TryUtil.sequence(copyCommands) map { simpletonsAndCommands =>
val (destinationSimpletons, ioCommands) = simpletonsAndCommands.unzip
//WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, womValueSimpletons) -> _.toSet
WomValueBuilder.toJobOutputs(jobDescriptor.taskCall.outputPorts, destinationSimpletons) -> ioCommands.toSet
}
//super.processSimpletons(womValueSimpletons, sourceCallRootPath)
///////////////////////
// NON-S3 FILESYSTEM //
///////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,41 +124,33 @@ class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorPa
file.toString -> isOptional
}
}.toMap

println(s"Hashing file : ${file.toString}")


// optional files are allowed to be missing
if (callInputFiles.contains(fileRequest.file.toString) && callInputFiles(fileRequest.file.toString) && ! file.exists) {
// return hash of empty string
println(s"Optional File Missing: ${file.toString}. Return hash of empty string")
Some("".md5Sum).map(str => Try(str))
// the file is an efs file and sibling md5 is enabled
} else if (file.toString.startsWith(aws_config.efsMntPoint.getOrElse("--")) && aws_config.checkSiblingMd5.getOrElse(false)) {
val md5 = file.sibling(s"${file.toString}.md5")
// check existance of the main file :
if (!file.exists) {
// cache hit is invalid; return invalid (random) md5
println(s"Non Optional File Missing: ${file.toString}")
Some(Random.alphanumeric.take(32).mkString.md5Sum).map(str => Try(str))
}
// check existence of the sibling file and make sure it's newer than main file
else if (md5.exists && md5.lastModifiedTime.isAfter(file.lastModifiedTime)) {
// read the file.
println("Found valid sibling file for " + file.toString)
val md5_value: Option[String] = Some(md5.contentAsString.split("\\s+")(0))
md5_value.map(str => Try(str))
} else if (md5.exists && md5.lastModifiedTime.isBefore(file.lastModifiedTime)) {
// sibling file is outdated, return invalid (random) string as md5
println("Found outdated sibling file for " + file.toString)
Some(Random.alphanumeric.take(32).mkString.md5Sum).map(str => Try(str))
} else {
// File present, but no sibling found, fall back to default.
println("Found no sibling file for " + file.toString)
None
}
} else {
// non-efs file or sibling md5 is disabled : fall back to default
println(s"non-efs file or sibling md5 is disabled for : ${file.toString}")
None
}
}
Expand Down

0 comments on commit 3e09608

Please sign in to comment.