From 88794af941de62774572cc0d7f86791da775e4f9 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Tue, 9 Jul 2024 12:43:30 +0200 Subject: [PATCH] fix TW entrypoint (#8541) Use the same pattern as Rucio ASO script to allow MasterWorker.py running from GH --- cicd/crabtaskworker_pypi/Dockerfile | 2 + cicd/crabtaskworker_pypi/TaskWorker/manage.sh | 27 ++--- cicd/crabtaskworker_pypi/bin/crab-taskworker | 8 ++ cicd/gitlab/deployTW.sh | 2 +- src/python/TaskWorker/Main.py | 102 ++++++++++++++++++ src/python/TaskWorker/MasterWorker.py | 72 ------------- src/python/TaskWorker/SequentialWorker.py | 34 ------ 7 files changed, 128 insertions(+), 119 deletions(-) create mode 100755 cicd/crabtaskworker_pypi/bin/crab-taskworker create mode 100644 src/python/TaskWorker/Main.py delete mode 100644 src/python/TaskWorker/SequentialWorker.py diff --git a/cicd/crabtaskworker_pypi/Dockerfile b/cicd/crabtaskworker_pypi/Dockerfile index 4b24dc91a0..79dae0553f 100644 --- a/cicd/crabtaskworker_pypi/Dockerfile +++ b/cicd/crabtaskworker_pypi/Dockerfile @@ -121,6 +121,8 @@ COPY cicd/crabtaskworker_pypi/TaskWorker/start.sh \ cicd/crabtaskworker_pypi/updateDatafiles.sh \ ${WDIR}/srv/TaskManager/ +COPY cicd/crabtaskworker_pypi/bin/crab-taskworker /usr/local/bin/crab-taskworker + ## publisher COPY cicd/crabtaskworker_pypi/Publisher/start.sh \ cicd/crabtaskworker_pypi/Publisher/env.sh \ diff --git a/cicd/crabtaskworker_pypi/TaskWorker/manage.sh b/cicd/crabtaskworker_pypi/TaskWorker/manage.sh index cee378cf86..eac466383e 100755 --- a/cicd/crabtaskworker_pypi/TaskWorker/manage.sh +++ b/cicd/crabtaskworker_pypi/TaskWorker/manage.sh @@ -24,29 +24,32 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) ## some variable use in start_srv CONFIG="${SCRIPT_DIR}"/current/TaskWorkerConfig.py +# path where we install crab code +APP_PATH="${APP_PATH:-/data/srv/current/lib/python/site-packages/}" # CRABTASKWORKER_ROOT is a mandatory variable for getting data directory in `DagmanCreator.getLocation()` # Hardcoded the path and use new_updateTMRuntime.sh to build it from source and copy to this path. -export CRABTASKWORKER_ROOT=/data/srv/current/lib/python/site-packages/ +export CRABTASKWORKER_ROOT="${APP_PATH}" helpFunction() { grep "^##H" "${0}" | sed -r "s/##H(| )//g" } +_getMasterWorkerPid() { + pid=$(pgrep -f 'crab-taskworker' | grep -v grep | head -1 ) || true + echo "${pid}" +} + start_srv() { # Check require env - # shellcheck disable=SC2269 - DEBUG="${DEBUG}" - export PYTHONPATH="${PYTHONPATH}" - - # hardcode APP_DIR, but if debug mode, APP_DIR can be override - if [[ "$DEBUG" = 'true' ]]; then - APP_DIR="${APP_DIR:-/data/repos/CRABServer/src/python}" - python3 -m pdb "${APP_DIR}"/TaskWorker/SequentialWorker.py "${CONFIG}" --logDebug + export PYTHONPATH + echo "Starting TaskWorker..." + if [[ $DEBUG ]]; then + crab-taskworker --config "${CONFIG}" --logDebug --pdb else - APP_DIR=/data/srv/current/lib/python/site-packages - python3 "${APP_DIR}"/TaskWorker/MasterWorker.py --config "${CONFIG}" --logDebug & + crab-taskworker --config "${CONFIG}" --logDebug & fi + echo "Started TaskWorker with MasterWorker pid $(_getMasterWorkerPid)" } stop_srv() { @@ -57,7 +60,7 @@ stop_srv() { checkTimes=12 timeout=15 #that will give 12*15=180 seconds (3min) for the TW to finish work - TaskMasterPid=$(ps exfww | grep MasterWorker | grep -v grep | head -1 | awk '{print $1}') || true + TaskMasterPid=$(_getMasterWorkerPid) if [[ -z $TaskMasterPid ]]; then echo "No master process running." return; diff --git a/cicd/crabtaskworker_pypi/bin/crab-taskworker b/cicd/crabtaskworker_pypi/bin/crab-taskworker new file mode 100755 index 0000000000..6221d2380b --- /dev/null +++ b/cicd/crabtaskworker_pypi/bin/crab-taskworker @@ -0,0 +1,8 @@ +#!/usr/bin/env python +""" +Entrypoint to TaskWorker process +""" + +from TaskWorker.Main import main + +main() diff --git a/cicd/gitlab/deployTW.sh b/cicd/gitlab/deployTW.sh index 02544e2f37..beb5f97552 100644 --- a/cicd/gitlab/deployTW.sh +++ b/cicd/gitlab/deployTW.sh @@ -10,7 +10,7 @@ echo "(DEBUG) Environment: ${Environment}" WORK_DIR=$PWD if [ "X${Service}" == "XTaskWorker" ] ; then - processName=MasterWorker + processName=crab-taskworker else processName=RunPublisher fi diff --git a/src/python/TaskWorker/Main.py b/src/python/TaskWorker/Main.py new file mode 100644 index 0000000000..8e4f0bb9dc --- /dev/null +++ b/src/python/TaskWorker/Main.py @@ -0,0 +1,102 @@ +from optparse import OptionParser +import signal +import os +import logging +import sys + +from WMCore.Configuration import loadConfigurationFile +from TaskWorker.WorkerExceptions import ConfigException +from TaskWorker.MasterWorker import MasterWorker +import HTCondorLocator + +def validateConfig(config): + """Verify that the input configuration contains all needed info + + :arg WMCore.Configuration config: input configuration + :return bool, string: flag for validation result and a message.""" + if getattr(config, 'TaskWorker', None) is None: + return False, "Configuration problem: Task worker section is missing. " + if not hasattr(config.TaskWorker, 'scheddPickerFunction'): + config.TaskWorker.scheddPickerFunction = HTCondorLocator.memoryBasedChoices + return True, 'Ok' + + +def main(): + """ + parse args and run. + """ + usage = "usage: %prog [options] [args]" + parser = OptionParser(usage=usage) + + parser.add_option("-d", "--logDebug", + action="store_true", + dest="logDebug", + default=False, + help="print extra messages to stdout") + parser.add_option("-w", "--logWarning", + action="store_true", + dest="logWarning", + default=False, + help="don't print any messages to stdout") + parser.add_option("-s", "--sequential", + action="store_true", + dest="sequential", + default=False, + help="run in sequential (no subprocesses) mode") + parser.add_option("-c", "--console", + action="store_true", + dest="console", + default=False, + help="log to console") + parser.add_option("--config", + dest="config", + default=None, + metavar="FILE", + help="configuration file path") + parser.add_option("--pdb", + action="store_true", + dest="pdb", + default=False, + help="Enter pdb mode. Set up TW to run sequential mode and invoke pdb.") + + (options, args) = parser.parse_args() + + + if not options.config: + raise ConfigException("Configuration not found") + + configuration = loadConfigurationFile(os.path.abspath(options.config)) + status_, msg_ = validateConfig(configuration) + if not status_: + raise ConfigException(msg_) + + if options.pdb: + # override root loglevel to debug + logging.getLogger().setLevel(logging.DEBUG) + # need to force a single thread + configuration.TaskWorker.nslaves = 1 + configuration.FeatureFlags.childWorker = False + # start with pdb + import pdb #pylint: disable=import-outside-toplevel + pdb.set_trace() #pylint: disable=forgotten-debug-statement + mc = MasterWorker(config=configuration, logWarning=False, logDebug=True, sequential=True, console=True) + mc.algorithm() + # exit program + sys.exit(0) + + # main + mw = None + try: + mw = MasterWorker(configuration, logWarning=options.logWarning, logDebug=options.logDebug, sequential=options.sequential, console=options.console) + signal.signal(signal.SIGINT, mw.quit_) + signal.signal(signal.SIGTERM, mw.quit_) + mw.algorithm() + except: + if mw: + mw.logger.exception("Unexpected and fatal error. Exiting task worker") + #don't really wanna miss this, propagating the exception and exiting really bad + raise + finally: + #there can be an exception before slaves are created, e.g. in the __init__ + if hasattr(mw, 'slaves'): + mw.slaves.end() diff --git a/src/python/TaskWorker/MasterWorker.py b/src/python/TaskWorker/MasterWorker.py index d1004a2ce0..2adb8717be 100644 --- a/src/python/TaskWorker/MasterWorker.py +++ b/src/python/TaskWorker/MasterWorker.py @@ -23,7 +23,6 @@ #CRAB dependencies from RESTInteractions import CRABRest -import HTCondorLocator from ServerUtilities import newX509env from ServerUtilities import SERVICE_INSTANCES from TaskWorker import __version__ @@ -48,17 +47,6 @@ 'private': {'host': None, 'instance': 'dev'}, } -def validateConfig(config): - """Verify that the input configuration contains all needed info - - :arg WMCore.Configuration config: input configuration - :return bool, string: flag for validation result and a message.""" - if getattr(config, 'TaskWorker', None) is None: - return False, "Configuration problem: Task worker section is missing. " - if not hasattr(config.TaskWorker, 'scheddPickerFunction'): - config.TaskWorker.scheddPickerFunction = HTCondorLocator.memoryBasedChoices - return True, 'Ok' - def getRESTParams(config, logger): """ get REST host name and db instance from a config object returns a tuple of strings (host, dbinstance). If can't, raises exception""" @@ -459,63 +447,3 @@ def algorithm(self): dummyFinished = self.slaves.checkFinished() self.logger.debug("Master Worker Exiting Main Cycle.") - - -if __name__ == '__main__': - from optparse import OptionParser - - usage = "usage: %prog [options] [args]" - parser = OptionParser(usage=usage) - - parser.add_option("-d", "--logDebug", - action="store_true", - dest="logDebug", - default=False, - help="print extra messages to stdout") - parser.add_option("-w", "--logWarning", - action="store_true", - dest="logWarning", - default=False, - help="don't print any messages to stdout") - parser.add_option("-s", "--sequential", - action="store_true", - dest="sequential", - default=False, - help="run in sequential (no subprocesses) mode") - parser.add_option("-c", "--console", - action="store_true", - dest="console", - default=False, - help="log to console") - - parser.add_option("--config", - dest="config", - default=None, - metavar="FILE", - help="configuration file path") - - (options, args) = parser.parse_args() - - if not options.config: - raise ConfigException("Configuration not found") - - configuration = loadConfigurationFile(os.path.abspath(options.config)) - status_, msg_ = validateConfig(configuration) - if not status_: - raise ConfigException(msg_) - - mw = None - try: - mw = MasterWorker(configuration, logWarning=options.logWarning, logDebug=options.logDebug, sequential=options.sequential, console=options.console) - signal.signal(signal.SIGINT, mw.quit_) - signal.signal(signal.SIGTERM, mw.quit_) - mw.algorithm() - except: - if mw: - mw.logger.exception("Unexpected and fatal error. Exiting task worker") - #don't really wanna miss this, propagating the exception and exiting really bad - raise - finally: - #there can be an exception before slaves are created, e.g. in the __init__ - if hasattr(mw, 'slaves'): - mw.slaves.end() diff --git a/src/python/TaskWorker/SequentialWorker.py b/src/python/TaskWorker/SequentialWorker.py deleted file mode 100644 index 177ee24c5b..0000000000 --- a/src/python/TaskWorker/SequentialWorker.py +++ /dev/null @@ -1,34 +0,0 @@ -""" This worker can be used for testing purposes. Just run: - "python SequentialWorker.py /path/to/config" - and have fun! - If you want to immedately start a pdb session add a 2nd argument (any word will do) - "python SequentialWorker.py /path/to/config d" - - More details: it instantiates the MasterWorker with the TEST flag True. This makes the MasterWorker - sequential (it does not instantiate new threads) and the logging is done at the console and not on - a file. -""" - -from WMCore.Configuration import loadConfigurationFile -from TaskWorker.MasterWorker import MasterWorker, validateConfig - -import logging -import sys - -logging.getLogger().setLevel(logging.DEBUG) - -config = loadConfigurationFile(sys.argv[1]) -config.TaskWorker.nslaves = 1 - -validateConfig(config) - -usePdb = (len(sys.argv) == 3) -if usePdb: - import pdb - pdb.set_trace() - -# no childWorker when running with pdb -config.FeatureFlags.childWorker = False - -mc = MasterWorker(config=config, logWarning=False, logDebug=True, sequential=True, console=True) -mc.algorithm()