Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] feat: SingularityCE: looking for the platform-aware image in CVMFS lo… #7589

Draft
wants to merge 3 commits into
base: rel-v8r0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ console_scripts =
dirac-configuration-shell = DIRAC.ConfigurationSystem.scripts.dirac_configuration_shell:main [admin]
# Core
dirac-agent = DIRAC.Core.scripts.dirac_agent:main [server,pilot]
dirac-apptainer-exec = DIRAC.Core.scripts.dirac_apptainer_exec:main [server,pilot]
dirac-configure = DIRAC.Core.scripts.dirac_configure:main
dirac-executor = DIRAC.Core.scripts.dirac_executor:main [server]
dirac-info = DIRAC.Core.scripts.dirac_info:main
Expand Down
35 changes: 31 additions & 4 deletions src/DIRAC/Core/Utilities/File.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
By default on Error they return None.
"""

import os
import errno
import glob
import hashlib
import os
import random
import glob
import sys
import re
import errno
import stat
import sys
import tempfile
import threading
from contextlib import contextmanager

# Translation table of a given unit to Bytes
Expand Down Expand Up @@ -277,6 +278,32 @@ def secureOpenForWrite(filename=None, *, text=True):
yield fd, filename


def safe_listdir(directory, timeout=60):
"""This is a "safe" list directory,
for lazily-loaded File Systems like CVMFS.
There's by default a 60 seconds timeout.

:param str directory: directory to list
:param int timeout: optional timeout, in seconds. Defaults to 60.
"""

def listdir(directory):
try:
return os.listdir(directory)
except FileNotFoundError:
print(f"{directory} not found")
return []

contents = []
t = threading.Thread(target=lambda: contents.extend(listdir(directory)))
t.daemon = True # don't delay program's exit
t.start()
t.join(timeout)
if t.is_alive():
return None # timeout
return contents


if __name__ == "__main__":
for p in sys.argv[1:]:
print(f"{p} : {getGlobbedTotalSize(p)} bytes")
45 changes: 43 additions & 2 deletions src/DIRAC/Core/Utilities/Os.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
by default on Error they return None
"""
import os
import platform
import shutil

import DIRAC
from DIRAC.Core.Utilities import List
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.File import safe_listdir
from DIRAC.Core.Utilities.Subprocess import shellCall, systemCall
from DIRAC.Core.Utilities import List

DEBUG = 0

Expand Down Expand Up @@ -147,6 +149,45 @@ def sourceEnv(timeout, cmdTuple, inputEnv=None):
return result


@deprecated("Will be removed in DIRAC 8.1", onlyOnce=True)
@deprecated("Will be removed in DIRAC 9.0", onlyOnce=True)
def which(executable):
return shutil.which(executable)


def findImage(container_root="/cvmfs/unpacked.cern.ch/"): # FIXME: this might not be needed iff we use multi-platform locations
"""Finds the image for the current platform

This looks into location "${container_root}"
and expects to find one of the following platforms:
- amd64
- arm64
- ppc64le
"""
plat = DIRAC.gConfig.getValue("LocalSite/Platform", platform.machine())
DIRAC.gLogger.info(f"Platform: {plat}")

# NB: platform compatibility is more complex than the following simple identification.
#
# Given that, on Linux, platform.machine() returns the same values as uname -m,
# and this is already "confusing", e.g. see
# https://stackoverflow.com/questions/45125516/possible-values-for-uname-m
# https://en.wikipedia.org/wiki/Uname
# Since here we are using the architecture specification defined by opencontainers initiative:
# https://github.com/opencontainers/image-spec/blob/main/image-index.md#platform-variants
# we need to make some simple "conversions" to get the right values:

if plat.lower() == "x86_64":
plat = "amd64"
if plat.lower().startswith("arm") or plat.lower() == "aarch64":
plat = "arm64"
if plat.lower().startswith("ppc64"):
plat = "ppc64le"

if plat not in ["amd64", "arm64", "ppc64le"]:
DIRAC.gLogger.error(f"Platform {plat} not supported")
return None

rootImage = f"{container_root}:{plat}"
DIRAC.gLogger.verbose(f"Checking {rootImage} existence")
if safe_listdir(rootImage):
return rootImage
85 changes: 85 additions & 0 deletions src/DIRAC/Core/scripts/dirac_apptainer_exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
""" Starts a DIRAC command inside an apptainer container.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed this the other day but have yet to do a single test with it.

Anyway, a use case for this is clearly running, from the pilot, the dirac-platfom (dirac-architecture) script.

"""

import os
import shutil
import sys

import DIRAC
from DIRAC import S_ERROR, gLogger
from DIRAC.Core.Base.Script import Script
from DIRAC.Core.Utilities.Os import findImage
from DIRAC.Core.Utilities.Subprocess import systemCall

CONTAINER_WRAPPER = """#!/bin/bash

echo "Starting inner container wrapper scripts (no install) at `date`."
export DIRAC=%(dirac_env_var)s
export DIRACOS=%(diracos_env_var)s
# In any case we need to find a bashrc, and a cfg
source %(rc_script)s
%(command)s
echo "Finishing inner container wrapper scripts at `date`."
"""

CONTAINER_DEFROOT = "/cvmfs/dirac.egi.eu/container/apptainer/alma9/x86_64"


def getEnv():
"""Gets the environment for use within the container.
We blank almost everything to prevent contamination from the host system.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling to reconcile the docstring with the implementation...it doesn't clear anything at the moment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just a copy-paste. I will fix it once I can test this script.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure we should clean the environment and apply the allow list that is in SingularityCE.

"""

payloadEnv = {k: v for k, v in os.environ.items()}
payloadEnv["TMP"] = "/tmp"
payloadEnv["TMPDIR"] = "/tmp"
payloadEnv["X509_USER_PROXY"] = os.path.join("tmp", "proxy")
payloadEnv["DIRACSYSCONFIG"] = os.path.join("tmp", "dirac.cfg")

return payloadEnv


@Script()
def main():
Script.registerArgument(" command: Command to execute inside the container")
command = Script.getPositionalArgs(group=True)

wrapSubs = {
"dirac_env_var": os.environ.get("DIRAC", os.getcwd()),
"diracos_env_var": os.environ.get("DIRACOS", os.getcwd()),
}
wrapSubs["rc_script"] = os.path.join(os.path.realpath(sys.base_prefix), "diracosrc")
wrapSubs["command"] = command
shutil.copyfile("dirac.cfg", os.path.join("tmp", "dirac.cfg"))

wrapLoc = os.path.join("tmp", "dirac_container.sh")
rawfd = os.open(wrapLoc, os.O_WRONLY | os.O_CREAT, 0o700)
fd = os.fdopen(rawfd, "w")
fd.write(CONTAINER_WRAPPER % wrapSubs)
fd.close()

innerCmd = os.path.join("tmp", "dirac_container.sh")
cmd = ["apptainer", "exec"]
cmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME)
cmd.extend(["--ipc"]) # run container in a new IPC namespace
cmd.extend(["--workdir", "/tmp"]) # working directory to be used for /tmp, /var/tmp and $HOME
cmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead
cmd.extend(["--bind", "{0}:{0}:ro".format(os.path.join(os.path.realpath(sys.base_prefix)))])

rootImage = findImage() or CONTAINER_DEFROOT

if os.path.isdir(rootImage) or os.path.isfile(rootImage):
cmd.extend([rootImage, innerCmd])
else:
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
gLogger.error("Apptainer image to exec not found: ", rootImage)
return S_ERROR("Failed to find Apptainer image to exec")

gLogger.debug(f"Execute Apptainer command: {cmd}")
result = systemCall(0, cmd, env=getEnv())
if not result["OK"]:
DIRAC.exit(1)


if __name__ == "__main__":
main()
24 changes: 11 additions & 13 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
See the Configuration/Resources/Computing documention for details on
where to set the option parameters.
"""

import io
import json
import os
import re
Expand All @@ -21,16 +19,17 @@
import tempfile

import DIRAC
from DIRAC import S_OK, S_ERROR, gConfig, gLogger
from DIRAC.Core.Utilities.Subprocess import systemCall
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.ConfigurationSystem.Client.Helpers import Operations
from DIRAC.Core.Utilities.Os import findImage
from DIRAC.Core.Utilities.Subprocess import systemCall
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createRelocatedJobWrapper

# Default container to use if it isn't specified in the CE options
CONTAINER_DEFROOT = "/cvmfs/cernvm-prod.cern.ch/cvm4"
CONTAINER_DEFROOT = "/cvmfs/unpacked.cern.ch/something" # FIXME
CONTAINER_WORKDIR = "DIRAC_containers"
CONTAINER_INNERDIR = "/tmp"

Expand Down Expand Up @@ -107,9 +106,6 @@ def __init__(self, ceUniqueID):
super().__init__(ceUniqueID)
self.__submittedJobs = 0
self.__runningJobs = 0
self.__root = CONTAINER_DEFROOT
if "ContainerRoot" in self.ceParameters:
self.__root = self.ceParameters["ContainerRoot"]
self.__workdir = CONTAINER_WORKDIR
self.__innerdir = CONTAINER_INNERDIR
self.__singularityBin = "singularity"
Expand Down Expand Up @@ -147,7 +143,7 @@ def __hasSingularity(self):
self.log.debug(f'Use singularity from "{self.__singularityBin}"')
return True
if "PATH" not in os.environ:
return False # Hmm, PATH not set? How unusual...
return False # PATH might not be set (e.g. HTCondorCE)
searchPaths = os.environ["PATH"].split(os.pathsep)
# We can use CVMFS as a last resort if userNS is enabled
if self.__hasUserNS():
Expand Down Expand Up @@ -359,8 +355,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs):

:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
rootImage = self.__root
renewTask = None
# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
Expand All @@ -375,6 +369,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
baseDir = ret["baseDir"]
tmpDir = ret["tmpDir"]

renewTask = False
if proxy:
payloadProxyLoc = ret["proxyLocation"]

Expand Down Expand Up @@ -449,10 +444,13 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
containerOpts = self.ceParameters["ContainerOptions"].split(",")
for opt in containerOpts:
cmd.extend([opt.strip()])
if os.path.isdir(rootImage) or os.path.isfile(rootImage):

rootImage = findImage() or self.ceParameters.get("ContainerRoot") or CONTAINER_DEFROOT

if rootImage and os.path.isdir(rootImage) or os.path.isfile(rootImage):
cmd.extend([rootImage, innerCmd])
else:
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
# if we are here it is because there's no image, or it is not accessible (e.g. not on CVMFS)
self.log.error("Singularity image to exec not found: ", rootImage)
return S_ERROR("Failed to find singularity image to exec")

Expand Down
Loading