Skip to content

Commit

Permalink
fix TW entrypoint (#8541)
Browse files Browse the repository at this point in the history
Use the same pattern as Rucio ASO script to allow MasterWorker.py running from GH
  • Loading branch information
novicecpp authored Jul 9, 2024
1 parent c480002 commit 88794af
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 119 deletions.
2 changes: 2 additions & 0 deletions cicd/crabtaskworker_pypi/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ COPY cicd/crabtaskworker_pypi/TaskWorker/start.sh \
cicd/crabtaskworker_pypi/updateDatafiles.sh \
${WDIR}/srv/TaskManager/

COPY cicd/crabtaskworker_pypi/bin/crab-taskworker /usr/local/bin/crab-taskworker

## publisher
COPY cicd/crabtaskworker_pypi/Publisher/start.sh \
cicd/crabtaskworker_pypi/Publisher/env.sh \
Expand Down
27 changes: 15 additions & 12 deletions cicd/crabtaskworker_pypi/TaskWorker/manage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,32 @@ SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

## some variable use in start_srv
CONFIG="${SCRIPT_DIR}"/current/TaskWorkerConfig.py
# path where we install crab code
APP_PATH="${APP_PATH:-/data/srv/current/lib/python/site-packages/}"

# CRABTASKWORKER_ROOT is a mandatory variable for getting data directory in `DagmanCreator.getLocation()`
# Hardcoded the path and use new_updateTMRuntime.sh to build it from source and copy to this path.
export CRABTASKWORKER_ROOT=/data/srv/current/lib/python/site-packages/
export CRABTASKWORKER_ROOT="${APP_PATH}"

helpFunction() {
grep "^##H" "${0}" | sed -r "s/##H(| )//g"
}

_getMasterWorkerPid() {
pid=$(pgrep -f 'crab-taskworker' | grep -v grep | head -1 ) || true
echo "${pid}"
}

start_srv() {
# Check require env
# shellcheck disable=SC2269
DEBUG="${DEBUG}"
export PYTHONPATH="${PYTHONPATH}"

# hardcode APP_DIR, but if debug mode, APP_DIR can be override
if [[ "$DEBUG" = 'true' ]]; then
APP_DIR="${APP_DIR:-/data/repos/CRABServer/src/python}"
python3 -m pdb "${APP_DIR}"/TaskWorker/SequentialWorker.py "${CONFIG}" --logDebug
export PYTHONPATH
echo "Starting TaskWorker..."
if [[ $DEBUG ]]; then
crab-taskworker --config "${CONFIG}" --logDebug --pdb
else
APP_DIR=/data/srv/current/lib/python/site-packages
python3 "${APP_DIR}"/TaskWorker/MasterWorker.py --config "${CONFIG}" --logDebug &
crab-taskworker --config "${CONFIG}" --logDebug &
fi
echo "Started TaskWorker with MasterWorker pid $(_getMasterWorkerPid)"
}

stop_srv() {
Expand All @@ -57,7 +60,7 @@ stop_srv() {
checkTimes=12
timeout=15 #that will give 12*15=180 seconds (3min) for the TW to finish work

TaskMasterPid=$(ps exfww | grep MasterWorker | grep -v grep | head -1 | awk '{print $1}') || true
TaskMasterPid=$(_getMasterWorkerPid)
if [[ -z $TaskMasterPid ]]; then
echo "No master process running."
return;
Expand Down
8 changes: 8 additions & 0 deletions cicd/crabtaskworker_pypi/bin/crab-taskworker
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python
"""
Entrypoint to TaskWorker process
"""

from TaskWorker.Main import main

main()
2 changes: 1 addition & 1 deletion cicd/gitlab/deployTW.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ echo "(DEBUG) Environment: ${Environment}"
WORK_DIR=$PWD

if [ "X${Service}" == "XTaskWorker" ] ; then
processName=MasterWorker
processName=crab-taskworker
else
processName=RunPublisher
fi
Expand Down
102 changes: 102 additions & 0 deletions src/python/TaskWorker/Main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from optparse import OptionParser
import signal
import os
import logging
import sys

from WMCore.Configuration import loadConfigurationFile
from TaskWorker.WorkerExceptions import ConfigException
from TaskWorker.MasterWorker import MasterWorker
import HTCondorLocator

def validateConfig(config):
"""Verify that the input configuration contains all needed info
:arg WMCore.Configuration config: input configuration
:return bool, string: flag for validation result and a message."""
if getattr(config, 'TaskWorker', None) is None:
return False, "Configuration problem: Task worker section is missing. "
if not hasattr(config.TaskWorker, 'scheddPickerFunction'):
config.TaskWorker.scheddPickerFunction = HTCondorLocator.memoryBasedChoices
return True, 'Ok'


def main():
"""
parse args and run.
"""
usage = "usage: %prog [options] [args]"
parser = OptionParser(usage=usage)

parser.add_option("-d", "--logDebug",
action="store_true",
dest="logDebug",
default=False,
help="print extra messages to stdout")
parser.add_option("-w", "--logWarning",
action="store_true",
dest="logWarning",
default=False,
help="don't print any messages to stdout")
parser.add_option("-s", "--sequential",
action="store_true",
dest="sequential",
default=False,
help="run in sequential (no subprocesses) mode")
parser.add_option("-c", "--console",
action="store_true",
dest="console",
default=False,
help="log to console")
parser.add_option("--config",
dest="config",
default=None,
metavar="FILE",
help="configuration file path")
parser.add_option("--pdb",
action="store_true",
dest="pdb",
default=False,
help="Enter pdb mode. Set up TW to run sequential mode and invoke pdb.")

(options, args) = parser.parse_args()


if not options.config:
raise ConfigException("Configuration not found")

configuration = loadConfigurationFile(os.path.abspath(options.config))
status_, msg_ = validateConfig(configuration)
if not status_:
raise ConfigException(msg_)

if options.pdb:
# override root loglevel to debug
logging.getLogger().setLevel(logging.DEBUG)
# need to force a single thread
configuration.TaskWorker.nslaves = 1
configuration.FeatureFlags.childWorker = False
# start with pdb
import pdb #pylint: disable=import-outside-toplevel
pdb.set_trace() #pylint: disable=forgotten-debug-statement
mc = MasterWorker(config=configuration, logWarning=False, logDebug=True, sequential=True, console=True)
mc.algorithm()
# exit program
sys.exit(0)

# main
mw = None
try:
mw = MasterWorker(configuration, logWarning=options.logWarning, logDebug=options.logDebug, sequential=options.sequential, console=options.console)
signal.signal(signal.SIGINT, mw.quit_)
signal.signal(signal.SIGTERM, mw.quit_)
mw.algorithm()
except:
if mw:
mw.logger.exception("Unexpected and fatal error. Exiting task worker")
#don't really wanna miss this, propagating the exception and exiting really bad
raise
finally:
#there can be an exception before slaves are created, e.g. in the __init__
if hasattr(mw, 'slaves'):
mw.slaves.end()
72 changes: 0 additions & 72 deletions src/python/TaskWorker/MasterWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#CRAB dependencies
from RESTInteractions import CRABRest
import HTCondorLocator
from ServerUtilities import newX509env
from ServerUtilities import SERVICE_INSTANCES
from TaskWorker import __version__
Expand All @@ -48,17 +47,6 @@
'private': {'host': None, 'instance': 'dev'},
}

def validateConfig(config):
"""Verify that the input configuration contains all needed info
:arg WMCore.Configuration config: input configuration
:return bool, string: flag for validation result and a message."""
if getattr(config, 'TaskWorker', None) is None:
return False, "Configuration problem: Task worker section is missing. "
if not hasattr(config.TaskWorker, 'scheddPickerFunction'):
config.TaskWorker.scheddPickerFunction = HTCondorLocator.memoryBasedChoices
return True, 'Ok'

def getRESTParams(config, logger):
""" get REST host name and db instance from a config object
returns a tuple of strings (host, dbinstance). If can't, raises exception"""
Expand Down Expand Up @@ -459,63 +447,3 @@ def algorithm(self):
dummyFinished = self.slaves.checkFinished()

self.logger.debug("Master Worker Exiting Main Cycle.")


if __name__ == '__main__':
from optparse import OptionParser

usage = "usage: %prog [options] [args]"
parser = OptionParser(usage=usage)

parser.add_option("-d", "--logDebug",
action="store_true",
dest="logDebug",
default=False,
help="print extra messages to stdout")
parser.add_option("-w", "--logWarning",
action="store_true",
dest="logWarning",
default=False,
help="don't print any messages to stdout")
parser.add_option("-s", "--sequential",
action="store_true",
dest="sequential",
default=False,
help="run in sequential (no subprocesses) mode")
parser.add_option("-c", "--console",
action="store_true",
dest="console",
default=False,
help="log to console")

parser.add_option("--config",
dest="config",
default=None,
metavar="FILE",
help="configuration file path")

(options, args) = parser.parse_args()

if not options.config:
raise ConfigException("Configuration not found")

configuration = loadConfigurationFile(os.path.abspath(options.config))
status_, msg_ = validateConfig(configuration)
if not status_:
raise ConfigException(msg_)

mw = None
try:
mw = MasterWorker(configuration, logWarning=options.logWarning, logDebug=options.logDebug, sequential=options.sequential, console=options.console)
signal.signal(signal.SIGINT, mw.quit_)
signal.signal(signal.SIGTERM, mw.quit_)
mw.algorithm()
except:
if mw:
mw.logger.exception("Unexpected and fatal error. Exiting task worker")
#don't really wanna miss this, propagating the exception and exiting really bad
raise
finally:
#there can be an exception before slaves are created, e.g. in the __init__
if hasattr(mw, 'slaves'):
mw.slaves.end()
34 changes: 0 additions & 34 deletions src/python/TaskWorker/SequentialWorker.py

This file was deleted.

0 comments on commit 88794af

Please sign in to comment.