diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index f119239d9f7..7861ee422e0 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -1289,6 +1289,27 @@ def add_api_endpoint_execution_to_queue( None, ) + def get_worker_pool_ref_by_name( + self, credentials: SyftVerifyKey, worker_pool_name: str | None = None + ) -> LinkedObject | SyftError: + # If worker pool id is not set, then use default worker pool + # Else, get the worker pool for given uid + if worker_pool_name is None: + worker_pool = self.get_default_worker_pool() + else: + result = self.pool_stash.get_by_name(credentials, worker_pool_name) + if result.is_err(): + return SyftError(message=f"{result.err()}") + worker_pool = result.ok() + + # Create a Worker pool reference object + worker_pool_ref = LinkedObject.from_obj( + worker_pool, + service_type=SyftWorkerPoolService, + node_uid=self.id, + ) + return worker_pool_ref + def add_action_to_queue( self, action: Action, @@ -1312,23 +1333,11 @@ def add_action_to_queue( user_code = result.ok() worker_pool_name = user_code.worker_pool_name - # If worker pool id is not set, then use default worker pool - # Else, get the worker pool for given uid - if worker_pool_name is None: - worker_pool = self.get_default_worker_pool() - else: - result = self.pool_stash.get_by_name(credentials, worker_pool_name) - if result.is_err(): - return SyftError(message=f"{result.err()}") - worker_pool = result.ok() - - # Create a Worker pool reference object - worker_pool_ref = LinkedObject.from_obj( - worker_pool, - service_type=SyftWorkerPoolService, - node_uid=self.id, + worker_pool_ref = self.get_worker_pool_ref_by_name( + credentials, worker_pool_name ) - + if isinstance(worker_pool_ref, SyftError): + return worker_pool_ref queue_item = ActionQueueItem( id=task_uid, node_uid=self.id, @@ -1473,12 +1482,10 @@ def add_api_call_to_queue( else: worker_settings = WorkerSettings.from_node(node=self) - default_worker_pool = self.get_default_worker_pool() - worker_pool = LinkedObject.from_obj( - default_worker_pool, - service_type=SyftWorkerPoolService, - node_uid=self.id, - ) + worker_pool_ref = self.get_worker_pool_ref_by_name(credentials=credentials) + if isinstance(worker_pool_ref, SyftError): + return worker_pool_ref + queue_item = QueueItem( id=UID(), node_uid=self.id, @@ -1490,7 +1497,7 @@ def add_api_call_to_queue( method=method_str, args=unsigned_call.args, kwargs=unsigned_call.kwargs, - worker_pool=worker_pool, + worker_pool=worker_pool_ref, ) return self.add_queueitem_to_queue( queue_item, diff --git a/packages/syft/src/syft/service/job/job_service.py b/packages/syft/src/syft/service/job/job_service.py index 6ad8b0b11cc..323dff99ae9 100644 --- a/packages/syft/src/syft/service/job/job_service.py +++ b/packages/syft/src/syft/service/job/job_service.py @@ -1,4 +1,7 @@ # stdlib +from collections.abc import Callable +import inspect +import time from typing import Any from typing import cast @@ -28,6 +31,18 @@ from .job_stash import JobStatus +def wait_until( + predicate: Callable[[], bool], timeout: int = 10 +) -> SyftSuccess | SyftError: + start = time.time() + code_string = inspect.getsource(predicate).strip() + while time.time() - start < timeout: + if predicate(): + return SyftSuccess(message=f"Predicate {code_string} is True") + time.sleep(1) + return SyftError(message=f"Timeout reached for predicate {code_string}") + + @instrument @serializable() class JobService(AbstractService): @@ -112,16 +127,31 @@ def get_by_result_id( def restart( self, context: AuthedServiceContext, uid: UID ) -> SyftSuccess | SyftError: - res = self.stash.get_by_uid(context.credentials, uid=uid) - if res.is_err(): - return SyftError(message=res.err()) + job_or_err = self.stash.get_by_uid(context.credentials, uid=uid) + if job_or_err.is_err(): + return SyftError(message=job_or_err.err()) + if job_or_err.ok() is None: + return SyftError(message="Job not found") + + job = job_or_err.ok() + if job.parent_job_id is not None: + return SyftError( + message="Not possible to restart subjobs. Please restart the parent job." + ) + if job.status == JobStatus.PROCESSING: + return SyftError( + message="Jobs in progress cannot be restarted. " + "Please wait for completion or cancel the job via .cancel() to proceed." + ) - job = res.ok() job.status = JobStatus.CREATED self.update(context=context, job=job) task_uid = UID() worker_settings = WorkerSettings.from_node(context.node) + worker_pool_ref = context.node.get_worker_pool_ref_by_name(context.credentials) + if isinstance(worker_pool_ref, SyftError): + return worker_pool_ref queue_item = ActionQueueItem( id=task_uid, @@ -132,6 +162,7 @@ def restart( worker_settings=worker_settings, args=[], kwargs={"action": job.action}, + worker_pool=worker_pool_ref, ) context.node.queue_stash.set_placeholder(context.credentials, queue_item) @@ -139,8 +170,8 @@ def restart( log_service = context.node.get_service("logservice") result = log_service.restart(context, job.log_id) - if result.is_err(): - return SyftError(message=str(result.err())) + if isinstance(result, SyftError): + return result return SyftSuccess(message="Great Success!") @@ -158,28 +189,62 @@ def update( res = res.ok() return SyftSuccess(message="Great Success!") + def _kill(self, context: AuthedServiceContext, job: Job) -> SyftSuccess | SyftError: + # set job and subjobs status to TERMINATING + # so that MonitorThread can kill them + job.status = JobStatus.TERMINATING + res = self.stash.update(context.credentials, obj=job) + results = [res] + + # attempt to kill all subjobs + subjobs_or_err = self.stash.get_by_parent_id(context.credentials, uid=job.id) + if subjobs_or_err.is_ok() and subjobs_or_err.ok() is not None: + subjobs = subjobs_or_err.ok() + for subjob in subjobs: + subjob.status = JobStatus.TERMINATING + res = self.stash.update(context.credentials, obj=subjob) + results.append(res) + + errors = [res.err() for res in results if res.is_err()] + if errors: + return SyftError(message=f"Failed to kill job: {errors}") + + # wait for job and subjobs to be killed by MonitorThread + wait_until(lambda: job.fetched_status == JobStatus.INTERRUPTED) + wait_until( + lambda: all( + subjob.fetched_status == JobStatus.INTERRUPTED for subjob in job.subjobs + ) + ) + + return SyftSuccess(message="Job killed successfully!") + @service_method( path="job.kill", name="kill", roles=DATA_SCIENTIST_ROLE_LEVEL, ) def kill(self, context: AuthedServiceContext, id: UID) -> SyftSuccess | SyftError: - res = self.stash.get_by_uid(context.credentials, uid=id) - if res.is_err(): - return SyftError(message=res.err()) + job_or_err = self.stash.get_by_uid(context.credentials, uid=id) + if job_or_err.is_err(): + return SyftError(message=job_or_err.err()) + if job_or_err.ok() is None: + return SyftError(message="Job not found") - job = res.ok() - if job.job_pid is not None and job.status == JobStatus.PROCESSING: - job.status = JobStatus.INTERRUPTED - res = self.stash.update(context.credentials, obj=job) - if res.is_err(): - return SyftError(message=res.err()) - return SyftSuccess(message="Job killed successfully!") - else: + job = job_or_err.ok() + if job.parent_job_id is not None: return SyftError( - message="Job is not running or isn't running in multiprocessing mode." - "Killing threads is currently not supported" + message="Not possible to cancel subjobs. To stop execution, please cancel the parent job." ) + if job.status != JobStatus.PROCESSING: + return SyftError(message="Job is not running") + if job.job_pid is None: + return SyftError( + message="Job termination disabled in dev mode. " + "Set 'dev_mode=False' or 'thread_workers=False' to enable." + ) + + return self._kill(context, job) @service_method( path="job.get_subjobs", diff --git a/packages/syft/src/syft/service/job/job_stash.py b/packages/syft/src/syft/service/job/job_stash.py index 0171af347aa..d7aa3aca00b 100644 --- a/packages/syft/src/syft/service/job/job_stash.py +++ b/packages/syft/src/syft/service/job/job_stash.py @@ -54,6 +54,7 @@ class JobStatus(str, Enum): PROCESSING = "processing" ERRORED = "errored" COMPLETED = "completed" + TERMINATING = "terminating" INTERRUPTED = "interrupted" @@ -254,47 +255,26 @@ def apply_info(self, info: "JobInfo") -> None: self.result = info.result def restart(self, kill: bool = False) -> None: - if kill: - self.kill() - self.fetch() - if not self.has_parent: - # this is currently the limitation, we will need to implement - # killing toplevel jobs later - print("Can only kill nested jobs") - elif kill or ( - self.status != JobStatus.PROCESSING and self.status != JobStatus.CREATED - ): - api = APIRegistry.api_for( - node_uid=self.syft_node_location, - user_verify_key=self.syft_client_verify_key, - ) - if api is None: - raise ValueError( - f"Can't access Syft API. You must login to {self.syft_node_location}" - ) - call = SyftAPICall( - node_uid=self.node_uid, - path="job.restart", - args=[], - kwargs={"uid": self.id}, - blocking=True, - ) - - api.make_call(call) - else: - print( - "Job is running or scheduled, if you want to kill it use job.kill() first" + api = APIRegistry.api_for( + node_uid=self.syft_node_location, + user_verify_key=self.syft_client_verify_key, + ) + if api is None: + raise ValueError( + f"Can't access Syft API. You must login to {self.syft_node_location}" ) - return None + call = SyftAPICall( + node_uid=self.node_uid, + path="job.restart", + args=[], + kwargs={"uid": self.id}, + blocking=True, + ) + res = api.make_call(call) + self.fetch() + return res def kill(self) -> SyftError | SyftSuccess: - if self.status != JobStatus.PROCESSING: - return SyftError(message="Job is not running") - if self.job_pid is None: - return SyftError( - message="Job termination disabled in dev mode. " - "Set 'dev_mode=False' or 'thread_workers=False' to enable." - ) api = APIRegistry.api_for( node_uid=self.syft_node_location, user_verify_key=self.syft_client_verify_key, @@ -310,8 +290,9 @@ def kill(self) -> SyftError | SyftSuccess: kwargs={"id": self.id}, blocking=True, ) - api.make_call(call) - return SyftSuccess(message="Job is killed successfully!") + res = api.make_call(call) + self.fetch() + return res def fetch(self) -> None: api = APIRegistry.api_for( @@ -329,7 +310,9 @@ def fetch(self) -> None: kwargs={"uid": self.id}, blocking=True, ) - job: Job = api.make_call(call) + job: Job | None = api.make_call(call) + if job is None: + return self.resolved = job.resolved if job.resolved: self.result = job.result @@ -532,6 +515,11 @@ def _repr_markdown_(self, wrap_as_python: bool = True, indent: int = 0) -> str: """ return as_markdown_code(md) + @property + def fetched_status(self) -> JobStatus: + self.fetch() + return self.status + @property def requesting_user(self) -> UserView | SyftError: api = APIRegistry.api_for( diff --git a/packages/syft/src/syft/service/queue/queue.py b/packages/syft/src/syft/service/queue/queue.py index fcf5cd2b397..968e4b7c975 100644 --- a/packages/syft/src/syft/service/queue/queue.py +++ b/packages/syft/src/syft/service/queue/queue.py @@ -1,14 +1,16 @@ # stdlib +from multiprocessing import Process import threading +from threading import Thread import time from typing import Any from typing import cast # third party +from loguru import logger import psutil from result import Err from result import Ok -from result import Result # relative from ...node.credentials import SyftVerifyKey @@ -20,7 +22,6 @@ from ...types.datetime import DateTime from ...types.uid import UID from ..job.job_stash import Job -from ..job.job_stash import JobStash from ..job.job_stash import JobStatus from ..response import SyftError from ..response import SyftSuccess @@ -59,20 +60,28 @@ def monitor(self) -> None: job = self.worker.job_stash.get_by_uid( self.credentials, self.queue_item.job_id ).ok() - if job is None or job.status != JobStatus.INTERRUPTED: - return - else: - job.resolved = True + if job and job.status == JobStatus.TERMINATING: + self.terminate(job) + for subjob in job.subjobs: + self.terminate(subjob) + self.queue_item.status = Status.INTERRUPTED self.queue_item.resolved = True self.worker.queue_stash.set_result(self.credentials, self.queue_item) - self.worker.job_stash.set_result(self.credentials, job) - process = psutil.Process(job.job_pid) - process.terminate() + # How about subjobs of subjobs? def stop(self) -> None: self.stop_requested.set() + def terminate(self, job: Job) -> None: + job.resolved = True + job.status = JobStatus.INTERRUPTED + self.worker.job_stash.set_result(self.credentials, job) + try: + psutil.Process(job.job_pid).terminate() + except psutil.Error as e: + logger.warning(f"Failed to terminate job {job.id}: {e}") + @serializable() class QueueManager(BaseQueueManager): @@ -245,32 +254,6 @@ def handle_message_multiprocessing( monitor_thread.stop() -def evaluate_can_run_job( - job_id: UID, job_stash: JobStash, credentials: SyftVerifyKey -) -> Result[Job, str]: - """Evaluate if a Job can be executed by the user. - - A Job cannot be executed if any of the following are met: - - User doesn't have permission to the job. - - Job is either marked Completed or result is available. - - Job is Cancelled or Interrupted. - """ - res = job_stash.get_by_uid(credentials, job_id) - - # User doesn't have access to job - if res.is_err(): - return res - - job_item = res.ok() - - if job_item.status == JobStatus.COMPLETED or job_item.resolved: - return Err(f"Job: {job_id} already Completed.") - elif job_item.status == JobStatus.INTERRUPTED: - return Err(f"Job interrupted. Job Id: {job_id}") - - return Ok(job_item) - - @serializable() class APICallMessageHandler(AbstractMessageHandler): queue_name = "api_call" @@ -304,9 +287,9 @@ def handle_message(message: bytes, syft_worker_id: UID) -> None: worker.signing_key = worker_settings.signing_key credentials = queue_item.syft_client_verify_key - - res = evaluate_can_run_job(queue_item.job_id, worker.job_stash, credentials) + res = worker.job_stash.get_by_uid(credentials, queue_item.job_id) if res.is_err(): + logger.warning(res.err()) raise Exception(res.value) job_item: Job = res.ok() @@ -317,14 +300,6 @@ def handle_message(message: bytes, syft_worker_id: UID) -> None: job_item.node_uid = cast(UID, worker.id) job_item.updated_at = DateTime.now() - # try: - # worker_name = os.getenv("DOCKER_WORKER_NAME", None) - # docker_worker = worker.worker_stash.get_worker_by_name( - # credentials, worker_name - # ).ok() - # job_item.job_worker_id = str(docker_worker.container_id) - # except Exception: - # job_item.job_worker_id = str(worker.id) if syft_worker_id is not None: job_item.job_worker_id = syft_worker_id @@ -337,9 +312,6 @@ def handle_message(message: bytes, syft_worker_id: UID) -> None: raise Exception(f"{job_result.err()}") if queue_config.thread_workers: - # stdlib - from threading import Thread - thread = Thread( target=handle_message_multiprocessing, args=(worker_settings, queue_item, credentials), @@ -347,8 +319,8 @@ def handle_message(message: bytes, syft_worker_id: UID) -> None: thread.start() thread.join() else: - # stdlib - from multiprocessing import Process + # if psutil.pid_exists(job_item.job_pid): + # psutil.Process(job_item.job_pid).terminate() process = Process( target=handle_message_multiprocessing, diff --git a/packages/syft/src/syft/service/response.py b/packages/syft/src/syft/service/response.py index d30c1dbac2b..37227046c5c 100644 --- a/packages/syft/src/syft/service/response.py +++ b/packages/syft/src/syft/service/response.py @@ -57,6 +57,9 @@ def _repr_html_class_(self) -> str: def to_result(self) -> Err: return Err(value=self.message) + def __bool__(self) -> bool: + return False + @serializable() class SyftSuccess(SyftResponseMessage): diff --git a/packages/syft/tests/conftest.py b/packages/syft/tests/conftest.py index 58f269f15a1..9623ea3d6dd 100644 --- a/packages/syft/tests/conftest.py +++ b/packages/syft/tests/conftest.py @@ -157,48 +157,6 @@ def low_worker() -> Worker: del worker -@pytest.fixture(scope="function") -def full_high_worker(n_consumers: int = 3, create_producer: bool = True) -> Worker: - _node = sy.orchestra.launch( - node_side_type=NodeSideType.HIGH_SIDE, - name=token_hex(8), - # dev_mode=True, - reset=True, - n_consumers=n_consumers, - create_producer=create_producer, - queue_port=None, - in_memory_workers=True, - local_db=False, - thread_workers=False, - ) - # startup code here - yield _node - # Cleanup code - _node.python_node.cleanup() - _node.land() - - -@pytest.fixture(scope="function") -def full_low_worker(n_consumers: int = 3, create_producer: bool = True) -> Worker: - _node = sy.orchestra.launch( - node_side_type=NodeSideType.LOW_SIDE, - name=token_hex(8), - # dev_mode=True, - reset=True, - n_consumers=n_consumers, - create_producer=create_producer, - queue_port=None, - in_memory_workers=True, - local_db=False, - thread_workers=False, - ) - # startup code here - yield _node - # # Cleanup code - _node.python_node.cleanup() - _node.land() - - @pytest.fixture def root_domain_client(worker) -> DomainClient: yield worker.root_client diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1c8a4fc8b27..f6ccf94f32c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,8 +1,16 @@ +# stdlib +from secrets import token_hex + # third party import _pytest from faker import Faker import pytest +# syft absolute +import syft as sy +from syft.abstract_node import NodeSideType +from syft.node.worker import Worker + def pytest_configure(config: _pytest.config.Config) -> None: config.addinivalue_line("markers", "frontend: frontend integration tests") @@ -31,3 +39,45 @@ def domain_2_port() -> int: @pytest.fixture def faker(): return Faker() + + +@pytest.fixture(scope="function") +def full_low_worker(n_consumers: int = 3, create_producer: bool = True) -> Worker: + _node = sy.orchestra.launch( + node_side_type=NodeSideType.LOW_SIDE, + name=token_hex(8), + # dev_mode=True, + reset=True, + n_consumers=n_consumers, + create_producer=create_producer, + queue_port=None, + in_memory_workers=True, + local_db=False, + thread_workers=False, + ) + # startup code here + yield _node + # # Cleanup code + _node.python_node.cleanup() + _node.land() + + +@pytest.fixture(scope="function") +def full_high_worker(n_consumers: int = 3, create_producer: bool = True) -> Worker: + _node = sy.orchestra.launch( + node_side_type=NodeSideType.HIGH_SIDE, + name=token_hex(8), + # dev_mode=True, + reset=True, + n_consumers=n_consumers, + create_producer=create_producer, + queue_port=None, + in_memory_workers=True, + local_db=False, + thread_workers=False, + ) + # startup code here + yield _node + # Cleanup code + _node.python_node.cleanup() + _node.land() diff --git a/tests/integration/local/job_test.py b/tests/integration/local/job_test.py new file mode 100644 index 00000000000..e713da731df --- /dev/null +++ b/tests/integration/local/job_test.py @@ -0,0 +1,133 @@ +# stdlib + +# stdlib +from secrets import token_hex +import time + +# third party +import pytest + +# syft absolute +import syft as sy +from syft import syft_function +from syft import syft_function_single_use +from syft.service.job.job_service import wait_until +from syft.service.job.job_stash import JobStatus +from syft.service.response import SyftError +from syft.service.response import SyftSuccess + + +@pytest.mark.local_node +def test_job_restart(job) -> None: + job.wait(timeout=2) + + assert wait_until( + lambda: job.fetched_status == JobStatus.PROCESSING + ), "Job not started" + assert wait_until( + lambda: all( + subjob.fetched_status == JobStatus.PROCESSING for subjob in job.subjobs + ) + ), "Subjobs not started" + + result = job.subjobs[0].restart() + assert isinstance(result, SyftError), "Should not restart subjob" + + result = job.restart() + assert isinstance(result, SyftError), "Should not restart running job" + + result = job.kill() + assert isinstance(result, SyftSuccess), "Should kill job" + assert job.fetched_status == JobStatus.INTERRUPTED + + result = job.restart() + assert isinstance(result, SyftSuccess), "Should restart idle job" + + job.wait(timeout=10) + + assert wait_until( + lambda: job.fetched_status == JobStatus.PROCESSING + ), "Job not restarted" + assert wait_until( + lambda: len( + [ + subjob.fetched_status == JobStatus.PROCESSING + for subjob in job.subjobs + if subjob.fetched_status != JobStatus.INTERRUPTED + ] + ) + == 2 + ), "Subjobs not restarted" + + +@pytest.fixture +def node(): + node = sy.orchestra.launch( + name=token_hex(8), + dev_mode=False, + thread_workers=False, + reset=True, + n_consumers=4, + create_producer=True, + node_side_type=sy.NodeSideType.LOW_SIDE, + ) + try: + yield node + finally: + node.python_node.cleanup() + node.land() + + +@pytest.fixture +def job(node): + client = node.login(email="info@openmined.org", password="changethis") + _ = client.register(name="a", email="aa@b.org", password="c", password_verify="c") + ds_client = node.login(email="aa@b.org", password="c") + + @syft_function() + def process_batch(): + # stdlib + + while time.sleep(1) is None: + ... + + ds_client.code.submit(process_batch) + + @syft_function_single_use() + def process_all(domain): + # stdlib + + _ = domain.launch_job(process_batch) + _ = domain.launch_job(process_batch) + + while time.sleep(1) is None: + ... + + _ = ds_client.code.request_code_execution(process_all) + client.requests[-1].approve(approve_nested=True) + client = node.login(email="info@openmined.org", password="changethis") + job = client.code.process_all(blocking=False) + try: + yield job + finally: + job.kill() + + +@pytest.mark.local_node +def test_job_kill(job) -> None: + job.wait(timeout=2) + assert wait_until( + lambda: job.fetched_status == JobStatus.PROCESSING + ), "Job not started" + assert wait_until( + lambda: all( + subjob.fetched_status == JobStatus.PROCESSING for subjob in job.subjobs + ) + ), "Subjobs not started" + + result = job.subjobs[0].kill() + assert isinstance(result, SyftError), "Should not kill subjob" + + result = job.kill() + assert isinstance(result, SyftSuccess), "Should kill job" + assert job.fetched_status == JobStatus.INTERRUPTED diff --git a/tests/integration/local/twin_api_sync_test.py b/tests/integration/local/twin_api_sync_test.py index 27bf3ab4ced..fc2c9f59811 100644 --- a/tests/integration/local/twin_api_sync_test.py +++ b/tests/integration/local/twin_api_sync_test.py @@ -1,5 +1,4 @@ # stdlib -from secrets import token_hex import sys # third party @@ -9,11 +8,9 @@ # syft absolute import syft import syft as sy -from syft.abstract_node import NodeSideType from syft.client.domain_client import DomainClient from syft.client.syncing import compare_clients from syft.client.syncing import resolve_single -from syft.node.worker import Worker from syft.service.job.job_stash import JobStatus from syft.service.response import SyftError from syft.service.response import SyftSuccess @@ -48,48 +45,6 @@ def get_ds_client(client: DomainClient) -> DomainClient: return client.login(email="a@a.com", password="asdf") -@pytest.fixture(scope="function") -def full_high_worker(n_consumers: int = 3, create_producer: bool = True) -> Worker: - _node = sy.orchestra.launch( - node_side_type=NodeSideType.HIGH_SIDE, - name=token_hex(8), - # dev_mode=True, - reset=True, - n_consumers=n_consumers, - create_producer=create_producer, - queue_port=None, - in_memory_workers=True, - local_db=False, - thread_workers=False, - ) - # startup code here - yield _node - # Cleanup code - _node.python_node.cleanup() - _node.land() - - -@pytest.fixture(scope="function") -def full_low_worker(n_consumers: int = 3, create_producer: bool = True) -> Worker: - _node = sy.orchestra.launch( - node_side_type=NodeSideType.LOW_SIDE, - name=token_hex(8), - # dev_mode=True, - reset=True, - n_consumers=n_consumers, - create_producer=create_producer, - queue_port=None, - in_memory_workers=True, - local_db=False, - thread_workers=False, - ) - # startup code here - yield _node - # Cleanup code - _node.python_node.cleanup() - _node.land() - - @sy.api_endpoint_method() def mock_function(context) -> str: return -42