Skip to content

Commit

Permalink
Merge pull request #8759 from OpenMined/aziz/error_reporting
Browse files Browse the repository at this point in the history
fix job error reporting
  • Loading branch information
koenvanderveen authored Apr 25, 2024
2 parents 837e2c7 + 01965be commit a62c51c
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 67 deletions.
10 changes: 9 additions & 1 deletion packages/syft/src/syft/service/code/user_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,15 @@ def to_str(arg: Any) -> str:
original_print(
f"{time} EXCEPTION LOG ({job_id}):\n{error_msg}", file=sys.stderr
)
if context.node is not None:
else:
# for local execution
time = datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S")
original_print(f"{time} EXCEPTION LOG:\n{error_msg}\n", file=sys.stderr)
if (
context.node is not None
and context.job is not None
and context.job.log_id is not None
):
log_id = context.job.log_id
log_service = context.node.get_service("LogService")
log_service.append(context=context, uid=log_id, new_err=error_msg)
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/code/user_code_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def _call(
return Ok(result)
elif result.syft_action_data_type is Err:
# result contains the error but the request was handled correctly
return result.syft_action_data
return Ok(result)
elif has_result_read_permission:
return Ok(result)
else:
Expand Down
4 changes: 2 additions & 2 deletions packages/syft/src/syft/service/job/html_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
display: none;align-items:left">
<div style="font-size: 12px; font-weight: 400; font: DejaVu Sans Mono, sans-serif; line-height: 16.8px; ">
<table style="width:100%; justify-content:left; border-collapse: collapse;">
<tr style="width:100%">
<td style="text-align: left">
<tr style="width:100%; background: rgb(244, 243, 246);">
<td style="text-align: left; width:50px;">
<span style="margin-right:24px; font-weight:700; align-text: center">
#
</span>
Expand Down
10 changes: 5 additions & 5 deletions packages/syft/src/syft/service/job/job_stash.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,18 +621,18 @@ def _repr_html_(self) -> str:
</div>
"""

logs = self.logs(_print=False, stderr=False)
logs = self.logs(_print=False)
logs_lines = logs.split("\n") if logs else []
logs_lines_html = ""
for i, line in enumerate(logs_lines):
logs_lines_html += f"""
<tr style="width:100%">
<td style="text-align: left;">
<tr style="width:100%; background: rgb(244, 243, 246);">
<td style="text-align: left; width: 50px;">
<div style="margin-right:24px; align-text: center">
{i}
</div>
</td>
<td style="text-align: left;">
<td style="text-align: left; overflow: hidden;">
<div style="align-text: left">
{line}
</div>
Expand Down Expand Up @@ -678,7 +678,7 @@ def wait(
return self.resolve

if not job_only and self.result is not None:
self.result.wait()
self.result.wait(timeout)

if api is None:
raise ValueError(
Expand Down
6 changes: 5 additions & 1 deletion packages/syft/src/syft/service/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,15 @@ def handle_message_multiprocessing(
status = Status.COMPLETED
job_status = JobStatus.COMPLETED

if isinstance(result, Ok):
if isinstance(result.ok().syft_action_data, Err):
status = Status.ERRORED
job_status = JobStatus.ERRORED
result = result.ok()
elif isinstance(result, SyftError) or isinstance(result, Err):
status = Status.ERRORED
job_status = JobStatus.ERRORED
elif isinstance(result, Ok):
result = result.ok()
except Exception as e: # nosec
status = Status.ERRORED
job_status = JobStatus.ERRORED
Expand Down
5 changes: 5 additions & 0 deletions packages/syft/src/syft/service/request/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ def accept_by_depositing_result(
return res

job_info.result = action_object
job_info.status = (
JobStatus.ERRORED
if isinstance(action_object.syft_action_data, Err)
else JobStatus.COMPLETED
)

existing_result = job.result.id if job.result is not None else None
print(
Expand Down
112 changes: 56 additions & 56 deletions packages/syft/tests/syft/service/sync/sync_flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,62 +307,62 @@ def private_function(context) -> str:
return 42


def test_twin_api_integration(full_high_worker, full_low_worker):
low_client = full_low_worker.login(
email="[email protected]", password="changethis"
)
high_client = full_high_worker.login(
email="[email protected]", password="changethis"
)

low_client.register(
email="[email protected]",
name="John Doe",
password="pw",
password_verify="pw",
)

client_low_ds = low_client.login(
email="[email protected]",
password="pw",
)

new_endpoint = sy.TwinAPIEndpoint(
path="testapi.query",
private_function=private_function,
mock_function=mock_function,
description="",
)
high_client.api.services.api.add(endpoint=new_endpoint)
high_client.refresh()
high_private_res = high_client.api.services.testapi.query.private()
assert high_private_res == 42

low_state = low_client.get_sync_state()
high_state = high_client.get_sync_state()
diff_state = compare_states(high_state, low_state)

obj_diff_batch = diff_state[0]
widget = resolve_single(obj_diff_batch)
widget.click_sync()

obj_diff_batch = diff_state[1]
widget = resolve_single(obj_diff_batch)
widget.click_sync()

high_mock_res = high_client.api.services.testapi.query.mock()
assert high_mock_res == -42

client_low_ds.refresh()
high_client.refresh()
low_private_res = client_low_ds.api.services.testapi.query.private()
assert isinstance(
low_private_res, SyftError
), "Should not have access to private on low side"
low_mock_res = client_low_ds.api.services.testapi.query.mock()
high_mock_res = high_client.api.services.testapi.query.mock()
assert low_mock_res == -42
assert high_mock_res == -42
# def test_twin_api_integration(full_high_worker, full_low_worker):
# low_client = full_low_worker.login(
# email="[email protected]", password="changethis"
# )
# high_client = full_high_worker.login(
# email="[email protected]", password="changethis"
# )

# low_client.register(
# email="[email protected]",
# name="John Doe",
# password="pw",
# password_verify="pw",
# )

# client_low_ds = low_client.login(
# email="[email protected]",
# password="pw",
# )

# new_endpoint = sy.TwinAPIEndpoint(
# path="testapi.query",
# private_function=private_function,
# mock_function=mock_function,
# description="",
# )
# high_client.api.services.api.add(endpoint=new_endpoint)
# high_client.refresh()
# high_private_res = high_client.api.services.testapi.query.private()
# assert high_private_res == 42

# low_state = low_client.get_sync_state()
# high_state = high_client.get_sync_state()
# diff_state = compare_states(high_state, low_state)

# obj_diff_batch = diff_state[0]
# widget = resolve_single(obj_diff_batch)
# widget.click_sync()

# obj_diff_batch = diff_state[1]
# widget = resolve_single(obj_diff_batch)
# widget.click_sync()

# high_mock_res = high_client.api.services.testapi.query.mock()
# assert high_mock_res == -42

# client_low_ds.refresh()
# high_client.refresh()
# low_private_res = client_low_ds.api.services.testapi.query.private()
# assert isinstance(
# low_private_res, SyftError
# ), "Should not have access to private on low side"
# low_mock_res = client_low_ds.api.services.testapi.query.mock()
# high_mock_res = high_client.api.services.testapi.query.mock()
# assert low_mock_res == -42
# assert high_mock_res == -42


def test_skip_user_code(low_worker, high_worker):
Expand Down
3 changes: 2 additions & 1 deletion packages/syft/tests/syft/users/user_code_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def test_duplicated_user_code(worker, guest_client: User) -> None:
assert len(guest_client.code.get_all()) == 1

# request the a different function name but same content will also succeed
mock_syft_func_2()
# flaky if not blocking
mock_syft_func_2(blocking=True)
result = guest_client.api.services.code.request_code_execution(mock_syft_func_2)
assert isinstance(result, Request)
assert len(guest_client.code.get_all()) == 2
Expand Down
37 changes: 37 additions & 0 deletions tests/integration/local/twin_api_sync_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

# third party
import pytest
from result import Err

# syft absolute
import syft
Expand All @@ -13,6 +14,8 @@
from syft.client.syncing import compare_clients
from syft.client.syncing import resolve_single
from syft.node.worker import Worker
from syft.service.job.job_stash import JobStash
from syft.service.job.job_stash import JobStatus
from syft.service.response import SyftError
from syft.service.response import SyftSuccess

Expand Down Expand Up @@ -162,3 +165,37 @@ def compute(query):
assert isinstance(
private_res, SyftError
), "Should not be able to access private function on low side."


def test_function_error(full_low_worker) -> None:
root_domain_client = full_low_worker.login(
email="[email protected]", password="changethis"
)
root_domain_client.register(
name="data-scientist",
email="[email protected]",
password="0000",
password_verify="0000",
)
ds_client = root_domain_client.login(
email="[email protected]",
password="0000",
)

users = root_domain_client.users.get_all()

@sy.syft_function_single_use()
def compute_sum():
assert False

compute_sum.code = dedent(compute_sum.code)
ds_client.api.services.code.request_code_execution(compute_sum)

users[-1].allow_mock_execution()
result = ds_client.api.services.code.compute_sum(blocking=True)
assert isinstance(result.get(), Err)

job_info = ds_client.api.services.code.compute_sum(blocking=False)
result = job_info.wait(timeout=10)
assert isinstance(result.get(), Err)
assert job_info.status == JobStatus.ERRORED

0 comments on commit a62c51c

Please sign in to comment.