Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 62 additions & 25 deletions wbia/web/job_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,30 @@ def __init__(jobiface, id_, port_dict, ibs=None):
jobiface._init_lock = threading.Lock()
jobiface.engine_recieve_socket = None
jobiface.collect_recieve_socket = None
# Per-thread SQLite readers for job status/result queries.
# Bypasses the collector ZMQ bottleneck for read-only operations.
# Each thread gets its own connection via threading.local() to
# avoid interleaved execute()/fetchone() on a shared connection.
jobiface._reader_local = threading.local()
print('JobInterface ports:')
ut.print_dict(jobiface.port_dict)

def _get_reader_store(jobiface):
"""Return a per-thread JobStore for direct SQLite access.

Each Gunicorn thread gets its own connection, so reads are fully
concurrent. SQLite WAL mode allows concurrent reads while the
collector writes.
"""
store = getattr(jobiface._reader_local, 'store', None)
if store is not None:
return store
from wbia.web.job_store import JobStore
shelve_path = jobiface.ibs.get_shelves_path()
store = JobStore(join(shelve_path, 'jobs.db'))
jobiface._reader_local.store = store
return store

def _ensure_sockets(jobiface):
"""Lazily create ZMQ sockets on first use (post-fork), or recreate
after a timeout reset destroyed one or both sockets.
Expand Down Expand Up @@ -1258,39 +1279,55 @@ def queue_job(
return jobid

def get_job_id_list(jobiface):
if False: # jobiface.verbose >= 1:
print('----')
print('Request list of job ids')
pair_msg = dict(action='job_id_list')
return jobiface._collect_request(pair_msg)
"""Read job ID list directly from SQLite (no ZMQ round-trip)."""
store = jobiface._get_reader_store()
return {'status': 'ok', 'jobid_list': store.get_job_ids()}

def get_job_status(jobiface, jobid):
if jobiface.verbose >= 1:
print('----')
print('Request status of jobid={!r}'.format(jobid))
pair_msg = dict(action='job_status', jobid=jobid)
return jobiface._collect_request(pair_msg)
"""Read job status directly from SQLite (no ZMQ round-trip)."""
store = jobiface._get_reader_store()
status = store.get_status(jobid) or 'unknown'
return {'status': 'ok', 'jobid': jobid, 'jobstatus': status}

def get_job_status_dict(jobiface, limit=0):
if False: # jobiface.verbose >= 1:
print('----')
print('Request list of job ids')
pair_msg = dict(action='job_status_dict', limit=limit)
return jobiface._collect_request(pair_msg)
"""Read all job statuses directly from SQLite (no ZMQ round-trip)."""
store = jobiface._get_reader_store()
return {
'status': 'ok',
'json_result': store.get_job_status_dict(limit=limit),
}

def get_job_metadata(jobiface, jobid):
if jobiface.verbose >= 1:
print('----')
print('Request metadata of jobid={!r}'.format(jobid))
pair_msg = dict(action='job_input', jobid=jobid)
return jobiface._collect_request(pair_msg)
"""Read job metadata directly from SQLite (no ZMQ round-trip)."""
store = jobiface._get_reader_store()
if not store.job_exists(jobid):
return {'status': 'invalid', 'jobid': jobid, 'json_result': None}
metadata = store.get_metadata(jobid)
reply = {'status': 'ok', 'jobid': jobid, 'json_result': metadata}
if metadata is None:
reply['status'] = 'corrupted'
return reply

def get_job_result(jobiface, jobid):
if jobiface.verbose >= 1:
print('----')
print('Request result of jobid={!r}'.format(jobid))
pair_msg = dict(action='job_result', jobid=jobid)
return jobiface._collect_request(pair_msg)
"""Read job result directly from SQLite (no ZMQ round-trip)."""
store = jobiface._get_reader_store()
if not store.job_exists(jobid):
return {'status': 'invalid', 'jobid': jobid, 'json_result': None}
status = store.get_status(jobid)
result_data = store.get_result(jobid)
if result_data is None:
if status in ('corrupted', 'completed'):
reply_status = 'corrupted'
elif status == 'suppressed':
reply_status = 'suppressed'
else:
reply_status = status
return {'status': reply_status, 'jobid': jobid, 'json_result': None}
return {
'status': result_data['exec_status'],
'jobid': jobid,
'json_result': ut.from_json(result_data['json_result']),
}

def get_unpacked_result(jobiface, jobid):
reply = jobiface.get_job_result(jobid)
Expand Down
Loading