diff --git a/wbia/web/job_engine.py b/wbia/web/job_engine.py index 903d34526..cd5c02a2e 100644 --- a/wbia/web/job_engine.py +++ b/wbia/web/job_engine.py @@ -784,9 +784,30 @@ def __init__(jobiface, id_, port_dict, ibs=None): jobiface._init_lock = threading.Lock() jobiface.engine_recieve_socket = None jobiface.collect_recieve_socket = None + # Per-thread SQLite readers for job status/result queries. + # Bypasses the collector ZMQ bottleneck for read-only operations. + # Each thread gets its own connection via threading.local() to + # avoid interleaved execute()/fetchone() on a shared connection. + jobiface._reader_local = threading.local() print('JobInterface ports:') ut.print_dict(jobiface.port_dict) + def _get_reader_store(jobiface): + """Return a per-thread JobStore for direct SQLite access. + + Each Gunicorn thread gets its own connection, so reads are fully + concurrent. SQLite WAL mode allows concurrent reads while the + collector writes. + """ + store = getattr(jobiface._reader_local, 'store', None) + if store is not None: + return store + from wbia.web.job_store import JobStore + shelve_path = jobiface.ibs.get_shelves_path() + store = JobStore(join(shelve_path, 'jobs.db')) + jobiface._reader_local.store = store + return store + def _ensure_sockets(jobiface): """Lazily create ZMQ sockets on first use (post-fork), or recreate after a timeout reset destroyed one or both sockets. @@ -1258,39 +1279,55 @@ def queue_job( return jobid def get_job_id_list(jobiface): - if False: # jobiface.verbose >= 1: - print('----') - print('Request list of job ids') - pair_msg = dict(action='job_id_list') - return jobiface._collect_request(pair_msg) + """Read job ID list directly from SQLite (no ZMQ round-trip).""" + store = jobiface._get_reader_store() + return {'status': 'ok', 'jobid_list': store.get_job_ids()} def get_job_status(jobiface, jobid): - if jobiface.verbose >= 1: - print('----') - print('Request status of jobid={!r}'.format(jobid)) - pair_msg = dict(action='job_status', jobid=jobid) - return jobiface._collect_request(pair_msg) + """Read job status directly from SQLite (no ZMQ round-trip).""" + store = jobiface._get_reader_store() + status = store.get_status(jobid) or 'unknown' + return {'status': 'ok', 'jobid': jobid, 'jobstatus': status} def get_job_status_dict(jobiface, limit=0): - if False: # jobiface.verbose >= 1: - print('----') - print('Request list of job ids') - pair_msg = dict(action='job_status_dict', limit=limit) - return jobiface._collect_request(pair_msg) + """Read all job statuses directly from SQLite (no ZMQ round-trip).""" + store = jobiface._get_reader_store() + return { + 'status': 'ok', + 'json_result': store.get_job_status_dict(limit=limit), + } def get_job_metadata(jobiface, jobid): - if jobiface.verbose >= 1: - print('----') - print('Request metadata of jobid={!r}'.format(jobid)) - pair_msg = dict(action='job_input', jobid=jobid) - return jobiface._collect_request(pair_msg) + """Read job metadata directly from SQLite (no ZMQ round-trip).""" + store = jobiface._get_reader_store() + if not store.job_exists(jobid): + return {'status': 'invalid', 'jobid': jobid, 'json_result': None} + metadata = store.get_metadata(jobid) + reply = {'status': 'ok', 'jobid': jobid, 'json_result': metadata} + if metadata is None: + reply['status'] = 'corrupted' + return reply def get_job_result(jobiface, jobid): - if jobiface.verbose >= 1: - print('----') - print('Request result of jobid={!r}'.format(jobid)) - pair_msg = dict(action='job_result', jobid=jobid) - return jobiface._collect_request(pair_msg) + """Read job result directly from SQLite (no ZMQ round-trip).""" + store = jobiface._get_reader_store() + if not store.job_exists(jobid): + return {'status': 'invalid', 'jobid': jobid, 'json_result': None} + status = store.get_status(jobid) + result_data = store.get_result(jobid) + if result_data is None: + if status in ('corrupted', 'completed'): + reply_status = 'corrupted' + elif status == 'suppressed': + reply_status = 'suppressed' + else: + reply_status = status + return {'status': reply_status, 'jobid': jobid, 'json_result': None} + return { + 'status': result_data['exec_status'], + 'jobid': jobid, + 'json_result': ut.from_json(result_data['json_result']), + } def get_unpacked_result(jobiface, jobid): reply = jobiface.get_job_result(jobid)