diff --git a/setup.cfg b/setup.cfg index 718fb1dd3a0..f904496bc77 100644 --- a/setup.cfg +++ b/setup.cfg @@ -117,6 +117,7 @@ console_scripts = dirac-configuration-shell = DIRAC.ConfigurationSystem.scripts.dirac_configuration_shell:main [admin] # Core dirac-agent = DIRAC.Core.scripts.dirac_agent:main [server,pilot] + dirac-apptainer-exec = DIRAC.Core.scripts.dirac_apptainer_exec:main [server,pilot] dirac-configure = DIRAC.Core.scripts.dirac_configure:main dirac-executor = DIRAC.Core.scripts.dirac_executor:main [server] dirac-info = DIRAC.Core.scripts.dirac_info:main diff --git a/src/DIRAC/Core/Utilities/File.py b/src/DIRAC/Core/Utilities/File.py index 9548faea8a6..2141eaa041f 100755 --- a/src/DIRAC/Core/Utilities/File.py +++ b/src/DIRAC/Core/Utilities/File.py @@ -4,15 +4,16 @@ By default on Error they return None. """ -import os +import errno +import glob import hashlib +import os import random -import glob -import sys import re -import errno import stat +import sys import tempfile +import threading from contextlib import contextmanager # Translation table of a given unit to Bytes @@ -277,6 +278,32 @@ def secureOpenForWrite(filename=None, *, text=True): yield fd, filename +def safe_listdir(directory, timeout=60): + """This is a "safe" list directory, + for lazily-loaded File Systems like CVMFS. + There's by default a 60 seconds timeout. + + :param str directory: directory to list + :param int timeout: optional timeout, in seconds. Defaults to 60. + """ + + def listdir(directory): + try: + return os.listdir(directory) + except FileNotFoundError: + print(f"{directory} not found") + return [] + + contents = [] + t = threading.Thread(target=lambda: contents.extend(listdir(directory))) + t.daemon = True # don't delay program's exit + t.start() + t.join(timeout) + if t.is_alive(): + return None # timeout + return contents + + if __name__ == "__main__": for p in sys.argv[1:]: print(f"{p} : {getGlobbedTotalSize(p)} bytes") diff --git a/src/DIRAC/Core/Utilities/Os.py b/src/DIRAC/Core/Utilities/Os.py index 103d0676575..6478cc4a01e 100755 --- a/src/DIRAC/Core/Utilities/Os.py +++ b/src/DIRAC/Core/Utilities/Os.py @@ -3,12 +3,14 @@ by default on Error they return None """ import os +import platform import shutil import DIRAC +from DIRAC.Core.Utilities import List from DIRAC.Core.Utilities.Decorators import deprecated +from DIRAC.Core.Utilities.File import safe_listdir from DIRAC.Core.Utilities.Subprocess import shellCall, systemCall -from DIRAC.Core.Utilities import List DEBUG = 0 @@ -147,6 +149,45 @@ def sourceEnv(timeout, cmdTuple, inputEnv=None): return result -@deprecated("Will be removed in DIRAC 8.1", onlyOnce=True) +@deprecated("Will be removed in DIRAC 9.0", onlyOnce=True) def which(executable): return shutil.which(executable) + + +def findImage(container_root="/cvmfs/unpacked.cern.ch/"): # FIXME: this might not be needed iff we use multi-platform locations + """Finds the image for the current platform + + This looks into location "${container_root}" + and expects to find one of the following platforms: + - amd64 + - arm64 + - ppc64le + """ + plat = DIRAC.gConfig.getValue("LocalSite/Platform", platform.machine()) + DIRAC.gLogger.info(f"Platform: {plat}") + + # NB: platform compatibility is more complex than the following simple identification. + # + # Given that, on Linux, platform.machine() returns the same values as uname -m, + # and this is already "confusing", e.g. see + # https://stackoverflow.com/questions/45125516/possible-values-for-uname-m + # https://en.wikipedia.org/wiki/Uname + # Since here we are using the architecture specification defined by opencontainers initiative: + # https://github.com/opencontainers/image-spec/blob/main/image-index.md#platform-variants + # we need to make some simple "conversions" to get the right values: + + if plat.lower() == "x86_64": + plat = "amd64" + if plat.lower().startswith("arm") or plat.lower() == "aarch64": + plat = "arm64" + if plat.lower().startswith("ppc64"): + plat = "ppc64le" + + if plat not in ["amd64", "arm64", "ppc64le"]: + DIRAC.gLogger.error(f"Platform {plat} not supported") + return None + + rootImage = f"{container_root}:{plat}" + DIRAC.gLogger.verbose(f"Checking {rootImage} existence") + if safe_listdir(rootImage): + return rootImage diff --git a/src/DIRAC/Core/scripts/dirac_apptainer_exec.py b/src/DIRAC/Core/scripts/dirac_apptainer_exec.py new file mode 100644 index 00000000000..c8dcf71d32a --- /dev/null +++ b/src/DIRAC/Core/scripts/dirac_apptainer_exec.py @@ -0,0 +1,85 @@ +""" Starts a DIRAC command inside an apptainer container. +""" + +import os +import shutil +import sys + +import DIRAC +from DIRAC import S_ERROR, gLogger +from DIRAC.Core.Base.Script import Script +from DIRAC.Core.Utilities.Os import findImage +from DIRAC.Core.Utilities.Subprocess import systemCall + +CONTAINER_WRAPPER = """#!/bin/bash + +echo "Starting inner container wrapper scripts (no install) at `date`." +export DIRAC=%(dirac_env_var)s +export DIRACOS=%(diracos_env_var)s +# In any case we need to find a bashrc, and a cfg +source %(rc_script)s +%(command)s +echo "Finishing inner container wrapper scripts at `date`." +""" + +CONTAINER_DEFROOT = "/cvmfs/dirac.egi.eu/container/apptainer/alma9/x86_64" + + +def getEnv(): + """Gets the environment for use within the container. + We blank almost everything to prevent contamination from the host system. + """ + + payloadEnv = {k: v for k, v in os.environ.items()} + payloadEnv["TMP"] = "/tmp" + payloadEnv["TMPDIR"] = "/tmp" + payloadEnv["X509_USER_PROXY"] = os.path.join("tmp", "proxy") + payloadEnv["DIRACSYSCONFIG"] = os.path.join("tmp", "dirac.cfg") + + return payloadEnv + + +@Script() +def main(): + Script.registerArgument(" command: Command to execute inside the container") + command = Script.getPositionalArgs(group=True) + + wrapSubs = { + "dirac_env_var": os.environ.get("DIRAC", os.getcwd()), + "diracos_env_var": os.environ.get("DIRACOS", os.getcwd()), + } + wrapSubs["rc_script"] = os.path.join(os.path.realpath(sys.base_prefix), "diracosrc") + wrapSubs["command"] = command + shutil.copyfile("dirac.cfg", os.path.join("tmp", "dirac.cfg")) + + wrapLoc = os.path.join("tmp", "dirac_container.sh") + rawfd = os.open(wrapLoc, os.O_WRONLY | os.O_CREAT, 0o700) + fd = os.fdopen(rawfd, "w") + fd.write(CONTAINER_WRAPPER % wrapSubs) + fd.close() + + innerCmd = os.path.join("tmp", "dirac_container.sh") + cmd = ["apptainer", "exec"] + cmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME) + cmd.extend(["--ipc"]) # run container in a new IPC namespace + cmd.extend(["--workdir", "/tmp"]) # working directory to be used for /tmp, /var/tmp and $HOME + cmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead + cmd.extend(["--bind", "{0}:{0}:ro".format(os.path.join(os.path.realpath(sys.base_prefix)))]) + + rootImage = findImage() or CONTAINER_DEFROOT + + if os.path.isdir(rootImage) or os.path.isfile(rootImage): + cmd.extend([rootImage, innerCmd]) + else: + # if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS) + gLogger.error("Apptainer image to exec not found: ", rootImage) + return S_ERROR("Failed to find Apptainer image to exec") + + gLogger.debug(f"Execute Apptainer command: {cmd}") + result = systemCall(0, cmd, env=getEnv()) + if not result["OK"]: + DIRAC.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/DIRAC/Resources/Computing/SingularityComputingElement.py b/src/DIRAC/Resources/Computing/SingularityComputingElement.py index f0e153d11b4..bd50607dfb7 100644 --- a/src/DIRAC/Resources/Computing/SingularityComputingElement.py +++ b/src/DIRAC/Resources/Computing/SingularityComputingElement.py @@ -11,8 +11,6 @@ See the Configuration/Resources/Computing documention for details on where to set the option parameters. """ - -import io import json import os import re @@ -21,16 +19,17 @@ import tempfile import DIRAC -from DIRAC import S_OK, S_ERROR, gConfig, gLogger -from DIRAC.Core.Utilities.Subprocess import systemCall +from DIRAC import S_ERROR, S_OK, gConfig, gLogger from DIRAC.ConfigurationSystem.Client.Helpers import Operations +from DIRAC.Core.Utilities.Os import findImage +from DIRAC.Core.Utilities.Subprocess import systemCall from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler from DIRAC.Resources.Computing.ComputingElement import ComputingElement from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.WorkloadManagementSystem.Utilities.Utils import createRelocatedJobWrapper # Default container to use if it isn't specified in the CE options -CONTAINER_DEFROOT = "/cvmfs/cernvm-prod.cern.ch/cvm4" +CONTAINER_DEFROOT = "/cvmfs/unpacked.cern.ch/something" # FIXME CONTAINER_WORKDIR = "DIRAC_containers" CONTAINER_INNERDIR = "/tmp" @@ -107,9 +106,6 @@ def __init__(self, ceUniqueID): super().__init__(ceUniqueID) self.__submittedJobs = 0 self.__runningJobs = 0 - self.__root = CONTAINER_DEFROOT - if "ContainerRoot" in self.ceParameters: - self.__root = self.ceParameters["ContainerRoot"] self.__workdir = CONTAINER_WORKDIR self.__innerdir = CONTAINER_INNERDIR self.__singularityBin = "singularity" @@ -147,7 +143,7 @@ def __hasSingularity(self): self.log.debug(f'Use singularity from "{self.__singularityBin}"') return True if "PATH" not in os.environ: - return False # Hmm, PATH not set? How unusual... + return False # PATH might not be set (e.g. HTCondorCE) searchPaths = os.environ["PATH"].split(os.pathsep) # We can use CVMFS as a last resort if userNS is enabled if self.__hasUserNS(): @@ -359,8 +355,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs): :return: S_OK(payload exit code) / S_ERROR() if submission issue """ - rootImage = self.__root - renewTask = None # Check that singularity is available if not self.__hasSingularity(): self.log.error("Singularity is not installed on PATH.") @@ -375,6 +369,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs): baseDir = ret["baseDir"] tmpDir = ret["tmpDir"] + renewTask = False if proxy: payloadProxyLoc = ret["proxyLocation"] @@ -449,10 +444,13 @@ def submitJob(self, executableFile, proxy=None, **kwargs): containerOpts = self.ceParameters["ContainerOptions"].split(",") for opt in containerOpts: cmd.extend([opt.strip()]) - if os.path.isdir(rootImage) or os.path.isfile(rootImage): + + rootImage = findImage() or self.ceParameters.get("ContainerRoot") or CONTAINER_DEFROOT + + if rootImage and os.path.isdir(rootImage) or os.path.isfile(rootImage): cmd.extend([rootImage, innerCmd]) else: - # if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS) + # if we are here it is because there's no image, or it is not accessible (e.g. not on CVMFS) self.log.error("Singularity image to exec not found: ", rootImage) return S_ERROR("Failed to find singularity image to exec")