Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ groups:
required: false
- key: ckanext.xloader.search_update_chunks
default: 100000
example: True
example: 1000
description: |
The number of rows to process in each batch when populating the full-text
search index. Chunked processing prevents database timeouts and memory
Expand All @@ -198,3 +198,11 @@ groups:
improve performance but may cause timeouts on very large tables.
type: int
required: false
- key: ckanext.xloader.max_retries
default: 1
example: 3
description: |
Maximum number of retry attempts for failed jobs due to temporary errors
like database deadlocks or network timeouts. Set to 0 to disable retries.
type: int
required: false
4 changes: 4 additions & 0 deletions ckanext/xloader/job_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ def __str__(self):
class LoaderError(JobError):
'''Exception that's raised if a load fails'''
pass

class XLoaderTimeoutError(JobError):
"""Custom timeout exception that can be retried"""
pass
122 changes: 90 additions & 32 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h

from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, XLoaderTimeoutError
from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url


Expand All @@ -41,18 +41,52 @@
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

MAX_RETRIES = 1
MAX_RETRIES = int(config.get('ckanext.xloader.max_retries', 1))
RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
errors.ObjectInUse,
HTTPError,
XLoaderTimeoutError
)
# Retries can only occur in cases where the datastore entry exists,
# so use the standard timeout
RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600')
APITOKEN_HEADER_NAME = config.get('apitoken_header_name', 'Authorization')


def is_retryable_error(error):
"""
Determine if an error should trigger a retry attempt.

Checks if the error is a temporary/transient condition that might
succeed on retry. Returns True for retryable HTTP status codes and
other temporary errors.

Retryable HTTP status codes:
- 408 Request Timeout
- 429 Too Many Requests
- 500 Internal Server Error
- 502 Bad Gateway
- 503 Service Unavailable
- 504 Gateway Timeout
- 507 Insufficient Storage
- 522 Connection Timed Out (Cloudflare)
- 524 A Timeout Occurred (Cloudflare)

:param error: Exception object to check
:type error: Exception
:return: True if error should be retried, False otherwise
:rtype: bool
"""
if isinstance(error, HTTPError):
retryable_status_codes = {408, 429, 500, 502, 503, 504, 507, 522, 524}
return error.status_code in retryable_status_codes
else:
return True
return False


# input = {
# 'api_key': user['apikey'],
# 'job_type': 'xloader_to_datastore',
Expand Down Expand Up @@ -112,36 +146,12 @@ def xloader_data_into_datastore(input):
job_dict['error'] = str(e)
log.error('xloader error: job_id %s already exists', job_id)
errored = True
except JobError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
logger.info("Job failed due to temporary error [%s], retrying", e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries
enqueue_job(
xloader_data_into_datastore,
[input],
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
job_dict['metadata']['resource_id'], tries),
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None

db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
error_state = {'errored': errored}
retry = handle_retryable_error(e, input, job_id, job_dict, logger, error_state)
if retry:
return None
errored = error_state['errored']
finally:
# job_dict is defined in xloader_hook's docstring
is_saved_ok = callback_xloader_hook(result_url=input['result_url'],
Expand All @@ -151,6 +161,54 @@ def xloader_data_into_datastore(input):
return 'error' if errored else None


def handle_retryable_error(e, input, job_id, job_dict, logger, error_state):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function name should perhaps start with an underscore since it's internal.

"""
Handle retryable errors by attempting to retry the job or marking it as failed.

Checks if the error is retryable (database deadlocks, HTTP timeouts, etc.) and
within the retry limit. If so, enqueues a new job attempt. Otherwise, marks
the job as errored.

:param e: The exception that occurred
:type e: Exception
:param input: Job input data containing metadata and API key
:type input: dict
:param job_id: Unique identifier for the current job
:type job_id: str
:param job_dict: Job status dictionary with metadata and status
:type job_dict: dict
:param logger: Logger instance for the current job
:type logger: logging.Logger
:param error_state: Mutable dict to track error state {'errored': bool}
:type error_state: dict

:returns: True if job was retried, None otherwise
:rtype: bool or None
"""
if isinstance(e, RETRYABLE_ERRORS) and is_retryable_error(e):
tries = job_dict['metadata'].get('tries', 0)
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
logger.info("Job failed due to temporary error [%s], retrying", e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries
enqueue_job(
xloader_data_into_datastore,
[input],
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
job_dict['metadata']['resource_id'], tries),
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return True
db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: %s, %s', e, traceback.format_exc())
error_state['errored'] = True


def xloader_data_into_datastore_(input, job_dict, logger):
'''This function:
* downloads the resource (metadata) from CKAN
Expand Down Expand Up @@ -380,7 +438,7 @@ def _download_resource_data(resource, data, api_key, logger):
request_url=url, response=error)
except requests.exceptions.Timeout:
logger.warning('URL time out after %ss', DOWNLOAD_TIMEOUT)
raise JobError('Connection timed out after {}s'.format(
raise XLoaderTimeoutError('Connection timed out after {}s'.format(
DOWNLOAD_TIMEOUT))
except requests.exceptions.RequestException as e:
tmp_file.close()
Expand Down
85 changes: 85 additions & 0 deletions ckanext/xloader/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,91 @@ def test_data_with_rq_job_timeout(self, cli, data):
# make sure that the tmp file has been closed/deleted in job timeout exception handling
assert file_suffix not in f

@pytest.mark.parametrize("error_type,should_retry", [
# Retryable errors from RETRYABLE_ERRORS
("DeadlockDetected", True),
("LockNotAvailable", True),
("ObjectInUse", True),
("XLoaderTimeoutError", True),
# Retryable HTTP errors (status codes from is_retryable_error)
("HTTPError_408", True),
("HTTPError_429", True),
("HTTPError_500", True),
("HTTPError_502", True),
("HTTPError_503", True),
("HTTPError_504", True),
("HTTPError_507", True),
("HTTPError_522", True),
("HTTPError_524", True),
# Non-retryable HTTP errors
("HTTPError_400", False),
("HTTPError_404", False),
("HTTPError_403", False),
# Other non-retryable errors (not in RETRYABLE_ERRORS)
("ValueError", False),
("TypeError", False),
])
def test_retry_behavior(self, cli, data, error_type, should_retry):
"""Test retry behavior for different error types."""

def create_mock_error(error_type):
if error_type == "DeadlockDetected":
from psycopg2 import errors
return errors.DeadlockDetected()
elif error_type == "LockNotAvailable":
from psycopg2 import errors
return errors.LockNotAvailable()
elif error_type == "ObjectInUse":
from psycopg2 import errors
return errors.ObjectInUse()
elif error_type == "XLoaderTimeoutError":
return jobs.XLoaderTimeoutError('Connection timed out after 30s')
elif error_type.startswith("HTTPError_"):
status_code = int(error_type.split("_")[1])
return jobs.HTTPError("HTTP Error", status_code=status_code, request_url="test", response=None)
elif error_type == "ValueError":
return ValueError("Test error")
elif error_type == "TypeError":
return TypeError("Test error")


def mock_download_with_error(*args, **kwargs):
if not hasattr(mock_download_with_error, 'call_count'):
mock_download_with_error.call_count = 0
mock_download_with_error.call_count += 1

if mock_download_with_error.call_count == 1:
# First call - raise the test error
raise create_mock_error(error_type)
elif should_retry:
# Second call - return successful response only if retryable
import tempfile
tmp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv')
tmp_file.write(_TEST_FILE_CONTENT)
tmp_file.flush()
return (tmp_file, 'd44fa65eda3675e11710682fdb5f1648')
else:
# Non-retryable errors should not get a second chance
raise create_mock_error(error_type)

self.enqueue(jobs.xloader_data_into_datastore, [data])

with mock.patch("ckanext.xloader.jobs._download_resource_data", mock_download_with_error):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output

if should_retry:
# Check that retry was attempted
assert "Job failed due to temporary error" in stdout
assert "retrying" in stdout
assert "Express Load completed" in stdout
# Verify resource was successfully loaded after retry
resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"]
else:
# Check that job failed without retry - should have error messages
assert "xloader error:" in stdout or "error" in stdout.lower()
assert "Express Load completed" not in stdout


@pytest.mark.usefixtures("clean_db")
class TestSetResourceMetadata(object):
Expand Down
Loading