From d1391f4007247158fe1e10357331e84344be30e0 Mon Sep 17 00:00:00 2001 From: Chava Goldshtein Date: Mon, 11 Aug 2025 14:45:15 +0300 Subject: [PATCH 1/2] feat: add selective HTTP error retries for network failures Add HTTPError to retryable errors for specific status codes (408, 429, 500, 503, 504, 507) and increase max retries to 3. Improves reliability for temporary network issues. Fixes #254 --- ckanext/xloader/config_declaration.yaml | 10 +- ckanext/xloader/job_exceptions.py | 4 + ckanext/xloader/jobs.py | 122 +++++++++++++++++------- ckanext/xloader/tests/test_jobs.py | 31 ++++++ 4 files changed, 134 insertions(+), 33 deletions(-) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 4b529839..fafd905c 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -189,7 +189,7 @@ groups: required: false - key: ckanext.xloader.search_update_chunks default: 100000 - example: True + example: 1000 description: | The number of rows to process in each batch when populating the full-text search index. Chunked processing prevents database timeouts and memory @@ -198,3 +198,11 @@ groups: improve performance but may cause timeouts on very large tables. type: int required: false + - key: ckanext.xloader.max_retries + default: 1 + example: 3 + description: | + Maximum number of retry attempts for failed jobs due to temporary errors + like database deadlocks or network timeouts. Set to 0 to disable retries. + type: int + required: false diff --git a/ckanext/xloader/job_exceptions.py b/ckanext/xloader/job_exceptions.py index 587c94f5..88f8f2e3 100644 --- a/ckanext/xloader/job_exceptions.py +++ b/ckanext/xloader/job_exceptions.py @@ -52,3 +52,7 @@ def __str__(self): class LoaderError(JobError): '''Exception that's raised if a load fails''' pass + +class XLoaderTimeoutError(JobError): + """Custom timeout exception that can be retried""" + pass \ No newline at end of file diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 3a678991..e3f48a68 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -22,7 +22,7 @@ from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h from . import db, loader -from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError +from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, XLoaderTimeoutError from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url @@ -41,11 +41,13 @@ CHUNK_SIZE = 16 * 1024 # 16kb DOWNLOAD_TIMEOUT = 30 -MAX_RETRIES = 1 +MAX_RETRIES = int(config.get('ckanext.xloader.max_retries', 1)) RETRYABLE_ERRORS = ( errors.DeadlockDetected, errors.LockNotAvailable, errors.ObjectInUse, + HTTPError, + XLoaderTimeoutError ) # Retries can only occur in cases where the datastore entry exists, # so use the standard timeout @@ -53,6 +55,38 @@ APITOKEN_HEADER_NAME = config.get('apitoken_header_name', 'Authorization') +def is_retryable_error(error): + """ + Determine if an error should trigger a retry attempt. + + Checks if the error is a temporary/transient condition that might + succeed on retry. Returns True for retryable HTTP status codes and + other temporary errors. + + Retryable HTTP status codes: + - 408 Request Timeout + - 429 Too Many Requests + - 500 Internal Server Error + - 502 Bad Gateway + - 503 Service Unavailable + - 504 Gateway Timeout + - 507 Insufficient Storage + - 522 Connection Timed Out (Cloudflare) + - 524 A Timeout Occurred (Cloudflare) + + :param error: Exception object to check + :type error: Exception + :return: True if error should be retried, False otherwise + :rtype: bool + """ + if isinstance(error, HTTPError): + retryable_status_codes = {408, 429, 500, 502, 503, 504, 507, 522, 524} + return error.status_code in retryable_status_codes + else: + return True + return False + + # input = { # 'api_key': user['apikey'], # 'job_type': 'xloader_to_datastore', @@ -112,36 +146,12 @@ def xloader_data_into_datastore(input): job_dict['error'] = str(e) log.error('xloader error: job_id %s already exists', job_id) errored = True - except JobError as e: - db.mark_job_as_errored(job_id, str(e)) - job_dict['status'] = 'error' - job_dict['error'] = str(e) - log.error('xloader error: %s, %s', e, traceback.format_exc()) - errored = True except Exception as e: - if isinstance(e, RETRYABLE_ERRORS): - tries = job_dict['metadata'].get('tries', 0) - if tries < MAX_RETRIES: - tries = tries + 1 - log.info("Job %s failed due to temporary error [%s], retrying", job_id, e) - logger.info("Job failed due to temporary error [%s], retrying", e) - job_dict['status'] = 'pending' - job_dict['metadata']['tries'] = tries - enqueue_job( - xloader_data_into_datastore, - [input], - title="retry xloader_data_into_datastore: resource: {} attempt {}".format( - job_dict['metadata']['resource_id'], tries), - rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT) - ) - return None - - db.mark_job_as_errored( - job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e)) - job_dict['status'] = 'error' - job_dict['error'] = str(e) - log.error('xloader error: %s, %s', e, traceback.format_exc()) - errored = True + error_state = {'errored': errored} + retry = handle_retryable_error(e, input, job_id, job_dict, logger, error_state) + if retry: + return None + errored = error_state['errored'] finally: # job_dict is defined in xloader_hook's docstring is_saved_ok = callback_xloader_hook(result_url=input['result_url'], @@ -151,6 +161,54 @@ def xloader_data_into_datastore(input): return 'error' if errored else None +def handle_retryable_error(e, input, job_id, job_dict, logger, error_state): + """ + Handle retryable errors by attempting to retry the job or marking it as failed. + + Checks if the error is retryable (database deadlocks, HTTP timeouts, etc.) and + within the retry limit. If so, enqueues a new job attempt. Otherwise, marks + the job as errored. + + :param e: The exception that occurred + :type e: Exception + :param input: Job input data containing metadata and API key + :type input: dict + :param job_id: Unique identifier for the current job + :type job_id: str + :param job_dict: Job status dictionary with metadata and status + :type job_dict: dict + :param logger: Logger instance for the current job + :type logger: logging.Logger + :param error_state: Mutable dict to track error state {'errored': bool} + :type error_state: dict + + :returns: True if job was retried, None otherwise + :rtype: bool or None + """ + if isinstance(e, RETRYABLE_ERRORS) and is_retryable_error(e): + tries = job_dict['metadata'].get('tries', 0) + if tries < MAX_RETRIES: + tries = tries + 1 + log.info("Job %s failed due to temporary error [%s], retrying", job_id, e) + logger.info("Job failed due to temporary error [%s], retrying", e) + job_dict['status'] = 'pending' + job_dict['metadata']['tries'] = tries + enqueue_job( + xloader_data_into_datastore, + [input], + title="retry xloader_data_into_datastore: resource: {} attempt {}".format( + job_dict['metadata']['resource_id'], tries), + rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT) + ) + return True + db.mark_job_as_errored( + job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e)) + job_dict['status'] = 'error' + job_dict['error'] = str(e) + log.error('xloader error: %s, %s', e, traceback.format_exc()) + error_state['errored'] = True + + def xloader_data_into_datastore_(input, job_dict, logger): '''This function: * downloads the resource (metadata) from CKAN @@ -380,7 +438,7 @@ def _download_resource_data(resource, data, api_key, logger): request_url=url, response=error) except requests.exceptions.Timeout: logger.warning('URL time out after %ss', DOWNLOAD_TIMEOUT) - raise JobError('Connection timed out after {}s'.format( + raise XLoaderTimeoutError('Connection timed out after {}s'.format( DOWNLOAD_TIMEOUT)) except requests.exceptions.RequestException as e: tmp_file.close() diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index f23e7821..67012cb3 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -201,6 +201,37 @@ def test_data_with_rq_job_timeout(self, cli, data): # make sure that the tmp file has been closed/deleted in job timeout exception handling assert file_suffix not in f + @pytest.mark.ckan_config("ckanext.xloader.max_retries", "2") + def test_retry_on_timeout_error(self, cli, data): + """Test that timeout errors trigger retry attempts.""" + + def mock_download_with_timeout(*args, **kwargs): + # Simulate timeout on first call, success on retry + if not hasattr(mock_download_with_timeout, 'call_count'): + mock_download_with_timeout.call_count = 0 + mock_download_with_timeout.call_count += 1 + + if mock_download_with_timeout.call_count == 1: + # First call - raise timeout error + raise jobs.XLoaderTimeoutError('Connection timed out after 30s') + else: + # Second call - return successful response + return get_response() + + self.enqueue(jobs.xloader_data_into_datastore, [data]) + + with mock.patch("ckanext.xloader.jobs._download_resource_data", mock_download_with_timeout): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + + # Check that retry was attempted + assert "Job failed due to temporary error" in stdout + assert "retrying" in stdout + assert "Express Load completed" in stdout + + # Verify resource was successfully loaded after retry + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + @pytest.mark.usefixtures("clean_db") class TestSetResourceMetadata(object): From 62f4aca9a27799f141f87ece089f61b693e0ba54 Mon Sep 17 00:00:00 2001 From: Chava Goldshtein Date: Tue, 16 Sep 2025 15:09:56 +0300 Subject: [PATCH 2/2] test: parametrize error handling tests Test all retryable errors (DB, HTTP timeouts) and non-retryable errors (4xx HTTP codes) with single parametrized test method. --- ckanext/xloader/tests/test_jobs.py | 98 +++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 22 deletions(-) diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index 67012cb3..a5866f3c 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -201,36 +201,90 @@ def test_data_with_rq_job_timeout(self, cli, data): # make sure that the tmp file has been closed/deleted in job timeout exception handling assert file_suffix not in f - @pytest.mark.ckan_config("ckanext.xloader.max_retries", "2") - def test_retry_on_timeout_error(self, cli, data): - """Test that timeout errors trigger retry attempts.""" + @pytest.mark.parametrize("error_type,should_retry", [ + # Retryable errors from RETRYABLE_ERRORS + ("DeadlockDetected", True), + ("LockNotAvailable", True), + ("ObjectInUse", True), + ("XLoaderTimeoutError", True), + # Retryable HTTP errors (status codes from is_retryable_error) + ("HTTPError_408", True), + ("HTTPError_429", True), + ("HTTPError_500", True), + ("HTTPError_502", True), + ("HTTPError_503", True), + ("HTTPError_504", True), + ("HTTPError_507", True), + ("HTTPError_522", True), + ("HTTPError_524", True), + # Non-retryable HTTP errors + ("HTTPError_400", False), + ("HTTPError_404", False), + ("HTTPError_403", False), + # Other non-retryable errors (not in RETRYABLE_ERRORS) + ("ValueError", False), + ("TypeError", False), + ]) + def test_retry_behavior(self, cli, data, error_type, should_retry): + """Test retry behavior for different error types.""" - def mock_download_with_timeout(*args, **kwargs): - # Simulate timeout on first call, success on retry - if not hasattr(mock_download_with_timeout, 'call_count'): - mock_download_with_timeout.call_count = 0 - mock_download_with_timeout.call_count += 1 + def create_mock_error(error_type): + if error_type == "DeadlockDetected": + from psycopg2 import errors + return errors.DeadlockDetected() + elif error_type == "LockNotAvailable": + from psycopg2 import errors + return errors.LockNotAvailable() + elif error_type == "ObjectInUse": + from psycopg2 import errors + return errors.ObjectInUse() + elif error_type == "XLoaderTimeoutError": + return jobs.XLoaderTimeoutError('Connection timed out after 30s') + elif error_type.startswith("HTTPError_"): + status_code = int(error_type.split("_")[1]) + return jobs.HTTPError("HTTP Error", status_code=status_code, request_url="test", response=None) + elif error_type == "ValueError": + return ValueError("Test error") + elif error_type == "TypeError": + return TypeError("Test error") + + + def mock_download_with_error(*args, **kwargs): + if not hasattr(mock_download_with_error, 'call_count'): + mock_download_with_error.call_count = 0 + mock_download_with_error.call_count += 1 - if mock_download_with_timeout.call_count == 1: - # First call - raise timeout error - raise jobs.XLoaderTimeoutError('Connection timed out after 30s') + if mock_download_with_error.call_count == 1: + # First call - raise the test error + raise create_mock_error(error_type) + elif should_retry: + # Second call - return successful response only if retryable + import tempfile + tmp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv') + tmp_file.write(_TEST_FILE_CONTENT) + tmp_file.flush() + return (tmp_file, 'd44fa65eda3675e11710682fdb5f1648') else: - # Second call - return successful response - return get_response() + # Non-retryable errors should not get a second chance + raise create_mock_error(error_type) self.enqueue(jobs.xloader_data_into_datastore, [data]) - with mock.patch("ckanext.xloader.jobs._download_resource_data", mock_download_with_timeout): + with mock.patch("ckanext.xloader.jobs._download_resource_data", mock_download_with_error): stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output - # Check that retry was attempted - assert "Job failed due to temporary error" in stdout - assert "retrying" in stdout - assert "Express Load completed" in stdout - - # Verify resource was successfully loaded after retry - resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) - assert resource["datastore_contains_all_records_of_source_file"] + if should_retry: + # Check that retry was attempted + assert "Job failed due to temporary error" in stdout + assert "retrying" in stdout + assert "Express Load completed" in stdout + # Verify resource was successfully loaded after retry + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + else: + # Check that job failed without retry - should have error messages + assert "xloader error:" in stdout or "error" in stdout.lower() + assert "Express Load completed" not in stdout @pytest.mark.usefixtures("clean_db")