Skip to content

Commit

Permalink
Merge pull request populse#320 from DimitriPapadopoulos/pre-commit
Browse files Browse the repository at this point in the history
Run pre-commit over latest 3.0 changes
  • Loading branch information
sapetnioc committed Nov 13, 2023
2 parents 88ac90a + 3ba30b5 commit 8d5ebc8
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
20 changes: 8 additions & 12 deletions capsul/database/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,24 +715,23 @@ def job_json(self, engine_id, execution_id, job_uuid):
return job

def kill_jobs(self, engine_id, execution_id, job_ids):
""" Request killing of jobs
"""
"""Request killing of jobs"""
# we just set a flag to 1 associated with the jobs to be killed.
# Workers will poll for it while jobs are running, and react
# accordingly.
# TODO:
# - atomicity: ensure jobs still exist and are running
# - skip or error if a job is not running
key = f'capsul:{engine_id}:{execution_id}'
key = f"capsul:{engine_id}:{execution_id}"
if job_ids is None:
job_ids = self.redis.hget(f'capsul:{engine_id}:{execution_id}',
'ongoing')
job_ids = self.redis.hget(f"capsul:{engine_id}:{execution_id}", "ongoing")
for job_id in job_ids:
self.redis.hset(key, f'kill_job:{job_id}', 1)
self.redis.hset(key, f"kill_job:{job_id}", 1)

def job_kill_requested(self, engine_id, execution_id, job_id):
return self.redis.hget(f'capsul:{engine_id}:{execution_id}',
f'kill_job:{job_id}')
return self.redis.hget(
f"capsul:{engine_id}:{execution_id}", f"kill_job:{job_id}"
)

def execution_report_json(self, engine_id, execution_id):
(
Expand Down Expand Up @@ -781,10 +780,7 @@ def execution_report_json(self, engine_id, execution_id):

def dispose(self, engine_id, execution_id, bypass_persistence=False):
keys = [f"capsul:{engine_id}", f"capsul:{engine_id}:{execution_id}"]
args = [
execution_id,
int(bool(bypass_persistence))
]
args = [execution_id, int(bool(bypass_persistence))]
self._dispose(keys=keys, args=args)

def check_shutdown(self):
Expand Down
4 changes: 2 additions & 2 deletions capsul/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def start_workers(self):
for i in range(start_count):
workers_command = self.database.workers_command(self.engine_id)
try:
print('workers_command:', workers_command)
print("workers_command:", workers_command)
subprocess.run(
workers_command,
capture_output=False,
Expand Down Expand Up @@ -344,7 +344,7 @@ def dispose(self, *args, **kwargs):
"""Remove the given execution from the database and the associated
resources (temporary files etc.)
"""
print('Dispose:', self.engine_id, args, kwargs)
print("Dispose:", self.engine_id, args, kwargs)
self.database.dispose(self.engine_id, *args, **kwargs)

def run(self, executable, timeout=None, print_report=False, debug=False, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions capsul/engine/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, database, engine_id):
self.current_execution = None
self.current_job = None
self.job_pid = None
self.poll_interval = 5.
self.poll_interval = 5.0
self.stop_poll_interval = 0.1

def run(self):
Expand All @@ -35,11 +35,11 @@ def run(self):
execution_id = self.current_execution
job_id = self.current_job
job_pid = self.job_pid
if job_id is not None and execution_id is not None \
and job_pid is not None:
if job_id is not None and execution_id is not None and job_pid is not None:
# poll the database
kill_job = self.database.job_kill_requested(
engine_id, execution_id, job_id)
engine_id, execution_id, job_id
)
if kill_job:
os.kill(job_pid, signal.SIGTERM)
os.kill(job_pid, signal.SIGKILL)
Expand Down
11 changes: 8 additions & 3 deletions capsul/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@


def run_job(
database, engine_id, execution_id, job_uuid, same_python=False, debug=False, set_pid_function=None
database,
engine_id,
execution_id,
job_uuid,
same_python=False,
debug=False,
set_pid_function=None,
):
if same_python:
stdout = io.StringIO()
Expand All @@ -35,8 +41,7 @@ def run_job(
env["CAPSUL_DEBUG"] = "1"

proc = subprocess.Popen(
[sys.executable, "-m", "capsul.run", engine_id, execution_id,
job_uuid],
[sys.executable, "-m", "capsul.run", engine_id, execution_id, job_uuid],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand Down

0 comments on commit 8d5ebc8

Please sign in to comment.