Skip to content

Commit

Permalink
feat: SingularityCE: looking for the platform-aware image in CVMFS lo…
Browse files Browse the repository at this point in the history
…cation
  • Loading branch information
fstagni committed Jun 5, 2024
1 parent c111713 commit 6faf080
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 18 deletions.
35 changes: 31 additions & 4 deletions src/DIRAC/Core/Utilities/File.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
By default on Error they return None.
"""

import os
import errno
import glob
import hashlib
import os
import random
import glob
import sys
import re
import errno
import stat
import sys
import tempfile
import threading
from contextlib import contextmanager

# Translation table of a given unit to Bytes
Expand Down Expand Up @@ -277,6 +278,32 @@ def secureOpenForWrite(filename=None, *, text=True):
yield fd, filename


def safe_listdir(directory, timeout=60):
"""This is a "safe" list directory,
for lazily-loaded File Systems like CVMFS.
There's by default a 60 seconds timeout.
:param str directory: directory to list
:param int timeout: optional timeout, in seconds. Defaults to 60.
"""

def listdir(directory):
try:
return os.listdir(directory)
except FileNotFoundError:
print(f"{directory} not found")
return []

contents = []
t = threading.Thread(target=lambda: contents.extend(listdir(directory)))
t.daemon = True # don't delay program's exit
t.start()
t.join(timeout)
if t.is_alive():
return None # timeout
return contents


if __name__ == "__main__":
for p in sys.argv[1:]:
print(f"{p} : {getGlobbedTotalSize(p)} bytes")
52 changes: 50 additions & 2 deletions src/DIRAC/Core/Utilities/Os.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
by default on Error they return None
"""
import os
import platform
import shutil

import DIRAC
from DIRAC.Core.Utilities import List
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.File import safe_listdir
from DIRAC.Core.Utilities.Subprocess import shellCall, systemCall
from DIRAC.Core.Utilities import List

DEBUG = 0

Expand Down Expand Up @@ -147,6 +149,52 @@ def sourceEnv(timeout, cmdTuple, inputEnv=None):
return result


@deprecated("Will be removed in DIRAC 8.1", onlyOnce=True)
@deprecated("Will be removed in DIRAC 9.0", onlyOnce=True)
def which(executable):
return shutil.which(executable)


def findImage(continer_root="container/apptainer/alma9/"):
"""Finds the image for the current platform
This looks into location "${CVMFS_locations}/${container_root}/${platform}/"
and expects to find one of the following platforms:
- x86_64
- aarch64
- ppc64le
"""
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations

plat = DIRAC.gConfig.getValue("LocalSite/Platform", platform.machine())
DIRAC.gLogger.info(f"Platform: {plat}")

# NB: platform compatibility is more complex than the following simple identification.
# sources:
# https://stackoverflow.com/questions/45125516/possible-values-for-uname-m
# https://en.wikipedia.org/wiki/Uname

if plat.lower() == "amd64":
plat = "x86_64"
if plat.lower().startswith("arm"):
plat = "aarch64"
if plat.lower().startswith("ppc64"):
plat = "ppc64le"

if plat not in ["x86_64", "aarch64", "ppc64le"]:
DIRAC.gLogger.error(f"Platform {plat} not supported")
return None

CVMFS_locations = DIRAC.gConfig.getValue(
"LocalSite/CVMFS_locations", Operations().getValue("Pilot/CVMFS_locations", [])
)

rootImage = None

for candidate in CVMFS_locations:
rootImage = os.path.join(candidate, continer_root, plat)
DIRAC.gLogger.verbose(f"Checking {rootImage} existence")
if safe_listdir(rootImage):
break

return rootImage
85 changes: 85 additions & 0 deletions src/DIRAC/Core/scripts/dirac_apptainer_exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
""" Starts a DIRAC command inside an apptainer container.
"""

import os
import shutil
import sys

import DIRAC
from DIRAC import S_ERROR, gLogger
from DIRAC.Core.Base.Script import Script
from DIRAC.Core.Utilities.Os import findImage
from DIRAC.Core.Utilities.Subprocess import systemCall

CONTAINER_WRAPPER = """#!/bin/bash
echo "Starting inner container wrapper scripts (no install) at `date`."
export DIRAC=%(dirac_env_var)s
export DIRACOS=%(diracos_env_var)s
# In any case we need to find a bashrc, and a cfg
source %(rc_script)s
%(command)s
echo "Finishing inner container wrapper scripts at `date`."
"""

CONTAINER_DEFROOT = "/cvmfs/dirac.egi.eu/container/apptainer/alma9/x86_64"


def getEnv():
"""Gets the environment for use within the container.
We blank almost everything to prevent contamination from the host system.
"""

payloadEnv = {k: v for k, v in os.environ.items()}
payloadEnv["TMP"] = "/tmp"
payloadEnv["TMPDIR"] = "/tmp"
payloadEnv["X509_USER_PROXY"] = os.path.join("tmp", "proxy")
payloadEnv["DIRACSYSCONFIG"] = os.path.join("tmp", "dirac.cfg")

return payloadEnv


@Script()
def main():
Script.registerArgument(" command: Command to execute inside the container")
command = Script.getPositionalArgs(group=True)

wrapSubs = {
"dirac_env_var": os.environ.get("DIRAC", os.getcwd()),
"diracos_env_var": os.environ.get("DIRACOS", os.getcwd()),
}
wrapSubs["rc_script"] = os.path.join(os.path.realpath(sys.base_prefix), "diracosrc")
wrapSubs["command"] = command
shutil.copyfile("dirac.cfg", os.path.join("tmp", "dirac.cfg"))

wrapLoc = os.path.join("tmp", "dirac_container.sh")
rawfd = os.open(wrapLoc, os.O_WRONLY | os.O_CREAT, 0o700)
fd = os.fdopen(rawfd, "w")
fd.write(CONTAINER_WRAPPER % wrapSubs)
fd.close()

innerCmd = os.path.join("tmp", "dirac_container.sh")
cmd = ["apptainer", "exec"]
cmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME)
cmd.extend(["--ipc"]) # run container in a new IPC namespace
cmd.extend(["--workdir", "/tmp"]) # working directory to be used for /tmp, /var/tmp and $HOME
cmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead
cmd.extend(["--bind", "{0}:{0}:ro".format(os.path.join(os.path.realpath(sys.base_prefix)))])

rootImage = findImage() or CONTAINER_DEFROOT

if os.path.isdir(rootImage) or os.path.isfile(rootImage):
cmd.extend([rootImage, innerCmd])
else:
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
gLogger.error("Apptainer image to exec not found: ", rootImage)
return S_ERROR("Failed to find Apptainer image to exec")

gLogger.debug(f"Execute Apptainer command: {cmd}")
result = systemCall(0, cmd, env=getEnv())
if not result["OK"]:
DIRAC.exit(1)


if __name__ == "__main__":
main()
22 changes: 10 additions & 12 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
See the Configuration/Resources/Computing documention for details on
where to set the option parameters.
"""

import io
import json
import os
import re
Expand All @@ -21,7 +19,8 @@
import tempfile

import DIRAC
from DIRAC import S_OK, S_ERROR, gConfig, gLogger
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.Core.Utilities.Os import findImage
from DIRAC.Core.Utilities.Subprocess import systemCall
from DIRAC.ConfigurationSystem.Client.Helpers import Operations
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
Expand All @@ -30,7 +29,7 @@
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createRelocatedJobWrapper

# Default container to use if it isn't specified in the CE options
CONTAINER_DEFROOT = "/cvmfs/cernvm-prod.cern.ch/cvm4"
CONTAINER_DEFROOT = "/cvmfs/dirac.egi.eu/container/apptainer/alma9/x86_64"
CONTAINER_WORKDIR = "DIRAC_containers"
CONTAINER_INNERDIR = "/tmp"

Expand Down Expand Up @@ -107,9 +106,6 @@ def __init__(self, ceUniqueID):
super().__init__(ceUniqueID)
self.__submittedJobs = 0
self.__runningJobs = 0
self.__root = CONTAINER_DEFROOT
if "ContainerRoot" in self.ceParameters:
self.__root = self.ceParameters["ContainerRoot"]
self.__workdir = CONTAINER_WORKDIR
self.__innerdir = CONTAINER_INNERDIR
self.__singularityBin = "singularity"
Expand Down Expand Up @@ -147,7 +143,7 @@ def __hasSingularity(self):
self.log.debug(f'Use singularity from "{self.__singularityBin}"')
return True
if "PATH" not in os.environ:
return False # Hmm, PATH not set? How unusual...
return False # PATH might not be set (e.g. HTCondorCE)
searchPaths = os.environ["PATH"].split(os.pathsep)
# We can use CVMFS as a last resort if userNS is enabled
if self.__hasUserNS():
Expand Down Expand Up @@ -359,8 +355,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
rootImage = self.__root
renewTask = None
# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
Expand All @@ -375,6 +369,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
baseDir = ret["baseDir"]
tmpDir = ret["tmpDir"]

renewTask = False
if proxy:
payloadProxyLoc = ret["proxyLocation"]

Expand Down Expand Up @@ -449,10 +444,13 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
containerOpts = self.ceParameters["ContainerOptions"].split(",")
for opt in containerOpts:
cmd.extend([opt.strip()])
if os.path.isdir(rootImage) or os.path.isfile(rootImage):

rootImage = findImage() or self.ceParameters.get("ContainerRoot") or CONTAINER_DEFROOT

if rootImage and os.path.isdir(rootImage) or os.path.isfile(rootImage):
cmd.extend([rootImage, innerCmd])
else:
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
# if we are here it is because there's no image, or it is not accessible (e.g. not on CVMFS)
self.log.error("Singularity image to exec not found: ", rootImage)
return S_ERROR("Failed to find singularity image to exec")

Expand Down

0 comments on commit 6faf080

Please sign in to comment.