Skip to content

Commit 3ea1d14

Browse files
authored
Merge pull request #6 from kbase/dev-client
Add SDK method support
2 parents 8de6e2b + df751df commit 3ea1d14

File tree

4 files changed

+220
-27
lines changed

4 files changed

+220
-27
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dev = [
2727
"ipython==9.6.0",
2828
"pytest==8.4.2",
2929
"pytest-cov==7.0.0",
30+
"requests-mock==1.12.1",
3031
"semver>=3.0.4",
3132
]
3233

src/kbase/sdk_baseclient.py

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import requests as _requests
88
import os as _os
99
from urllib.parse import urlparse as _urlparse
10+
import time as _time
11+
import traceback as _traceback
1012
from typing import Any
13+
from urllib3.exceptions import ProtocolError as _ProtocolError
1114

1215

1316
# The first version is a pretty basic port from the old baseclient, removing some no longer
@@ -17,12 +20,20 @@
1720
__version__ = "0.1.0"
1821

1922

23+
_EXP_BACKOFF_SEC = [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 60, 120, 300]
2024
_CT = "content-type"
2125
_AJ = "application/json"
2226
_URL_SCHEME = frozenset(["http", "https"])
2327
_CHECK_JOB_RETRIES = 3
2428

2529

30+
# tested this manually by shortening _EXP_BACKOFF_MS and adding printouts below
31+
def _get_next_backoff(backoff_index: int = 1) -> tuple[int, float]:
32+
if backoff_index < len(_EXP_BACKOFF_SEC) - 1:
33+
backoff_index += 1
34+
return backoff_index, _EXP_BACKOFF_SEC[backoff_index]
35+
36+
2637
class ServerError(Exception):
2738

2839
def __init__(self, name, code, message, data=None, error=None):
@@ -58,18 +69,12 @@ class SDKBaseClient:
5869
For SDK methods: the url of the callback service.
5970
For SDK dynamic services: the url of the Service Wizard.
6071
For other services: the url of the service.
61-
timeout - methods will fail if they take longer than this value in seconds.
72+
timeout - http requests will fail if they take longer than this value in seconds.
6273
Default 1800.
6374
token - a KBase authentication token.
6475
trust_all_ssl_certificates - set to True to trust self-signed certificates.
6576
If you don't understand the implications, leave as the default, False.
6677
lookup_url - set to true when contacting KBase dynamic services.
67-
async_job_check_time_ms - the wait time between checking job state for
68-
asynchronous jobs run with the run_job method.
69-
async_job_check_time_scale_percent - the percentage increase in wait time between async job
70-
check attempts.
71-
async_job_check_max_time_ms - the maximum time to wait for a job check attempt before
72-
failing.
7378
"""
7479
def __init__(
7580
self,
@@ -79,9 +84,6 @@ def __init__(
7984
token: str = None,
8085
trust_all_ssl_certificates: bool = False, # Too much of a pain to test
8186
lookup_url: bool = False,
82-
async_job_check_time_ms: int = 100,
83-
async_job_check_time_scale_percent: int = 150,
84-
async_job_check_max_time_ms: int = 300000
8587
):
8688
if url is None:
8789
raise ValueError("A url is required")
@@ -93,9 +95,6 @@ def __init__(
9395
self._headers = {}
9496
self.trust_all_ssl_certificates = trust_all_ssl_certificates
9597
self.lookup_url = lookup_url
96-
self.async_job_check_time = async_job_check_time_ms / 1000.0
97-
self.async_job_check_time_scale_percent = async_job_check_time_scale_percent
98-
self.async_job_check_max_time = async_job_check_max_time_ms / 1000.0
9998
self.token = None
10099
if token is not None:
101100
self.token = token
@@ -166,7 +165,48 @@ def _set_up_context(self, service_ver: str = None):
166165
return {"service_ver": service_ver}
167166
return None
168167

169-
def call_method(self, service_method: str, args: list[Any], *, service_ver: str | None = None):
168+
def _check_job(self, service: str, job_id: str):
169+
return self._call(self.url, service + "._check_job", [job_id])
170+
171+
def _submit_job(self, service_method: str, args: list[Any], service_ver: str = None):
172+
context = self._set_up_context(service_ver)
173+
mod, meth = service_method.split(".")
174+
return self._call(self.url, mod + "._" + meth + "_submit", args, context)
175+
176+
def run_job(self, service_method: str, args: list[Any], service_ver: str = None):
177+
"""
178+
Run a SDK method asynchronously.
179+
Required arguments:
180+
service_method - the service and method to run, e.g. myserv.mymeth.
181+
args - a list of arguments to the method.
182+
Optional arguments:
183+
service_ver - the version of the service to run, e.g. a git hash
184+
or dev/beta/release.
185+
"""
186+
mod = service_method.split(".")[0]
187+
job_id = self._submit_job(service_method, args, service_ver)
188+
backoff_index = -1
189+
check_job_failures = 0
190+
while check_job_failures < _CHECK_JOB_RETRIES:
191+
backoff_index, backoff = _get_next_backoff(backoff_index)
192+
_time.sleep(backoff)
193+
try:
194+
job_state = self._check_job(mod, job_id)
195+
except (ConnectionError, _ProtocolError):
196+
_traceback.print_exc()
197+
check_job_failures += 1
198+
else:
199+
if job_state["finished"]:
200+
if not job_state["result"]:
201+
return None
202+
if len(job_state["result"]) == 1:
203+
return job_state["result"][0]
204+
return job_state["result"]
205+
raise RuntimeError(f"_check_job failed {check_job_failures} times and exceeded limit")
206+
207+
def call_method(
208+
self, service_method: str, args: list[Any], *, service_ver: str | None = None
209+
):
170210
"""
171211
Call a standard or dynamic service synchronously.
172212
Required arguments:

test/test_sdk_baseclient.py

Lines changed: 151 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,24 @@
44
import os
55
import pytest
66
import re
7+
import requests
78
from requests.exceptions import HTTPError, ReadTimeout
89
import semver
10+
import shutil
11+
import subprocess
12+
import tempfile
913
import threading
1014
import time
15+
from urllib3.exceptions import ProtocolError
1116

1217
from kbase import sdk_baseclient
1318

1419

1520
_VERSION = "0.1.0"
16-
_MOCKSERVER_PORT = 31590 # should be fine, find an empty port otherwise
21+
# should be fine, find an empty ports otherwise
22+
_MOCKSERVER_PORT = 31590
23+
_CALLBACK_SERVER_PORT = 31591
24+
_CALLBACK_SERVER_IMAGE = "ghcr.io/kbase/jobrunner:pr-116"
1725

1826

1927
@pytest.fixture(scope="module")
@@ -54,6 +62,72 @@ def mockserver():
5462
server.shutdown()
5563

5664

65+
def _wait_for_callback(callback_url: str):
66+
interval = 1
67+
limit = 120
68+
start = time.monotonic()
69+
err = None
70+
print("waiting for callback server to start")
71+
while time.monotonic() - start < limit:
72+
try:
73+
res = requests.get(callback_url)
74+
restext = res.text
75+
if res.status_code == 200 and res.text == "[{}]":
76+
print(f"Callback server is up at {callback_url}")
77+
return
78+
except Exception as e:
79+
err = e
80+
print("waiting for CBS")
81+
time.sleep(interval)
82+
raise IOError(f"Callback server did not start, last response: {restext}") from err
83+
84+
85+
@pytest.fixture(scope="module")
86+
def callback(url_and_token):
87+
# Tried using the temp path pytest fixture but kept getting lots of warnings
88+
tmpdir = tempfile.mkdtemp(prefix="callback_server_data_")
89+
container_name = f"sdk_baseclient_test_{str(time.time()).replace('.', '_')}"
90+
dockercmd = [
91+
"docker", "run",
92+
"--platform=linux/amd64", # until we have multiarch images
93+
"--name", container_name,
94+
"--rm",
95+
# TODO SECURITY when CBS allows, use a file instead
96+
# https://github.com/kbase/JobRunner/issues/90
97+
"-e", f"KB_AUTH_TOKEN={url_and_token[1]}",
98+
"-e", f"KB_BASE_URL={url_and_token[0]}/services/",
99+
"-e", f"JOB_DIR={tmpdir}",
100+
"-e", "CALLBACK_IP=localhost",
101+
"-e", f"CALLBACK_PORT={_CALLBACK_SERVER_PORT}",
102+
"-e", "DEBUG_RUNNER=true", # prints logs from containers
103+
"-v", "/var/run/docker.sock:/run/docker.sock",
104+
"-v", f"{tmpdir}:{tmpdir}",
105+
"-p", f"{_CALLBACK_SERVER_PORT}:{_CALLBACK_SERVER_PORT}",
106+
_CALLBACK_SERVER_IMAGE
107+
]
108+
proc = subprocess.Popen(dockercmd)
109+
callback_url = f"http://localhost:{_CALLBACK_SERVER_PORT}"
110+
_wait_for_callback(callback_url)
111+
112+
try:
113+
yield callback_url
114+
finally:
115+
subprocess.check_call(["docker", "stop", container_name])
116+
proc.wait(timeout=10)
117+
dockercmd = [
118+
"docker", "run",
119+
"--platform=linux/amd64", # until we have multiarch images
120+
"--name", container_name,
121+
"--rm",
122+
"-v", f"{tmpdir}:{tmpdir}",
123+
"--entrypoint", "bash",
124+
_CALLBACK_SERVER_IMAGE,
125+
"-c", f"rm -rf {tmpdir}/*", # need to use bash for the globbing
126+
]
127+
subprocess.check_call(dockercmd)
128+
shutil.rmtree(tmpdir)
129+
130+
57131
def test_version():
58132
assert sdk_baseclient.__version__ == _VERSION
59133

@@ -82,6 +156,7 @@ def test_call_method_basic_passed_token(url_and_token):
82156

83157

84158
def test_call_method_basic_env_token(url_and_token):
159+
# Tests returning a single value
85160
os.environ["KB_AUTH_TOKEN"] = url_and_token[1]
86161
try:
87162
_test_call_method_basic(url_and_token[0] + "/services/ws", None)
@@ -103,12 +178,11 @@ def _test_call_method_basic(url: str, token: str | None):
103178
assert res is None
104179

105180

106-
def test_serialize_sets_and_list_return(url_and_token):
107-
"""
108-
Tests
109-
* Serializing set and frozenset
110-
* Methods that return a list vs. a single value (save_objects).
111-
"""
181+
# TODO add test for service that returns > 1 value. Not sure if any services do this
182+
183+
184+
def test_serialize_sets(url_and_token):
185+
# Tests serializing set and frozenset
112186
bc = sdk_baseclient.SDKBaseClient(url_and_token[0] + "/services/ws", token=url_and_token[1])
113187
ws_name = f"sdk_baseclient_test_{time.time()}"
114188
try:
@@ -231,9 +305,9 @@ def test_dynamic_service(url_and_token):
231305
ver = res["version"]
232306
del res["version"]
233307
assert res == {
234-
'git_url': 'https://github.com/kbaseapps/HTMLFileSetServ',
235-
'message': '',
236-
'state': 'OK',
308+
"git_url": "https://github.com/kbaseapps/HTMLFileSetServ",
309+
"message": "",
310+
"state": "OK",
237311
}
238312
assert semver.Version.parse(ver) > semver.Version.parse("0.0.8")
239313

@@ -246,8 +320,72 @@ def test_dynamic_service_with_service_version(url_and_token):
246320
res = bc.call_method("HTMLFileSetServ.status", [], service_ver="0.0.8")
247321
del res["git_commit_hash"]
248322
assert res == {
249-
'git_url': 'https://github.com/kbaseapps/HTMLFileSetServ',
250-
'message': '',
251-
'state': 'OK',
323+
"git_url": "https://github.com/kbaseapps/HTMLFileSetServ",
324+
"message": "",
325+
"state": "OK",
252326
"version": "0.0.8"
253327
}
328+
329+
330+
###
331+
# Async job tests
332+
#
333+
# All of the 3 ways of calling services use the same underlying _call method, so we don't
334+
# reiterate those tests every time.
335+
###
336+
337+
338+
def test_run_job_with_service_ver(url_and_token, callback):
339+
bc = sdk_baseclient.SDKBaseClient(callback, token=url_and_token[1], timeout=10)
340+
res = bc.run_job(
341+
"njs_sdk_test_2.run",
342+
# force backoff with a wait
343+
[{"id": "simplejob2", "wait": 1}],
344+
# it seems semvers don't work for unreleased modules
345+
service_ver="9d6b868bc0bfdb61c79cf2569ff7b9abffd4c67f"
346+
)
347+
assert res == {
348+
"id": "simplejob2",
349+
"name": "njs_sdk_test_2",
350+
"hash": "9d6b868bc0bfdb61c79cf2569ff7b9abffd4c67f",
351+
"wait": 1,
352+
}
353+
354+
355+
def test_run_job_no_return(url_and_token, callback):
356+
bc = sdk_baseclient.SDKBaseClient(callback, token=url_and_token[1], timeout=10)
357+
res = bc.run_job("HelloServiceDeluxe.how_rude", ["Georgette"])
358+
assert res is None
359+
360+
361+
def test_run_job_list_return(url_and_token, callback):
362+
bc = sdk_baseclient.SDKBaseClient(callback, token=url_and_token[1], timeout=10)
363+
res = bc.run_job("HelloServiceDeluxe.say_hellos", ["JimBob", "Gengulphus"])
364+
assert res == [
365+
'Hi JimBob, you santimonious lickspittle', # the dork that wrote this module can't spell
366+
'Hi Gengulphus, what a lovely and scintillating person you are',
367+
]
368+
369+
370+
def test_run_job_failure(url_and_token, callback, requests_mock):
371+
requests_mock.post(callback, [
372+
{"json": {"result": ["job_id"]}},
373+
{"exc": ConnectionError("oopsie")},
374+
{"exc": ProtocolError("oh dang")},
375+
{"exc": ConnectionError("so unreliable omg")},
376+
])
377+
bc = sdk_baseclient.SDKBaseClient(callback, token=url_and_token[1], timeout=10)
378+
with pytest.raises(RuntimeError, match="_check_job failed 3 times and exceeded limit"):
379+
bc.run_job("HelloServiceDeluxe.say_hellos", ["JimBob"])
380+
381+
382+
def test_run_job_failure_recovery(url_and_token, callback, requests_mock):
383+
requests_mock.post(callback, [
384+
{"json": {"result": ["job_id"]}},
385+
{"exc": ConnectionError("oopsie")},
386+
{"exc": ProtocolError("oh dang")},
387+
{"json": {"result": [{"finished": 1, "result": ["meh"]}]}},
388+
])
389+
bc = sdk_baseclient.SDKBaseClient(callback, token=url_and_token[1], timeout=10)
390+
res = bc.run_job("HelloServiceDeluxe.say_hellos", ["JimBob"])
391+
assert res == "meh"

uv.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)