Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ckanext/xloader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def is_resource_supported_by_xloader(res_dict, check_access=True):
try:
is_supported_url_type = url_type not in toolkit.h.datastore_rw_resource_url_types()
except AttributeError:
is_supported_url_type = (url_type == 'upload')
is_supported_url_type = (url_type in ['upload', 'None'])
else:
is_supported_url_type = True
return (is_supported_format or is_datastore_active) and user_has_access and is_supported_url_type
Expand Down
3 changes: 2 additions & 1 deletion ckanext/xloader/job_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class LoaderError(JobError):
'''Exception that's raised if a load fails'''
pass


class XLoaderTimeoutError(JobError):
"""Custom timeout exception that can be retried"""
pass
pass
27 changes: 13 additions & 14 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
from rq.timeouts import JobTimeoutException
import sqlalchemy as sa

from ckan import model
from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h

from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, XLoaderTimeoutError
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, LoaderError, XLoaderTimeoutError
from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url


Expand Down Expand Up @@ -258,11 +257,12 @@ def xloader_data_into_datastore_(input, job_dict, logger):
logger.info('File hash: %s', file_hash)
resource['hash'] = file_hash

def direct_load():
def direct_load(allow_type_guessing=False):
fields = loader.load_csv(
tmp_file.name,
resource_id=resource['id'],
mimetype=resource.get('format'),
allow_type_guessing=allow_type_guessing,
logger=logger)
loader.calculate_record_count(
resource_id=resource['id'], logger=logger)
Expand Down Expand Up @@ -318,24 +318,24 @@ def tabulator_load():
direct_load()
else:
try:
direct_load()
except JobError as e:
direct_load(allow_type_guessing=True)
except (JobError, LoaderError) as e:
logger.warning('Load using COPY failed: %s', e)
logger.info('Trying again with tabulator')
tabulator_load()
except JobTimeoutException:
try:
tmp_file.close()
except FileNotFoundError:
pass
logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT)
raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT))
except FileCouldNotBeLoadedError as e:
logger.warning('Loading excerpt for this format not supported.')
logger.error('Loading file raised an error: %s', e)
raise JobError('Loading file raised an error: {}'.format(e))

tmp_file.close()
finally:
try:
tmp_file.close()
os.remove(tmp_file.name)
except FileNotFoundError:
pass

logger.info('Express Load completed')

Expand Down Expand Up @@ -439,7 +439,7 @@ def _download_resource_data(resource, data, api_key, logger):
except requests.exceptions.Timeout:
logger.warning('URL time out after %ss', DOWNLOAD_TIMEOUT)
raise XLoaderTimeoutError('Connection timed out after {}s'.format(
DOWNLOAD_TIMEOUT))
DOWNLOAD_TIMEOUT))
except requests.exceptions.RequestException as e:
tmp_file.close()
try:
Expand Down Expand Up @@ -525,7 +525,7 @@ def callback_xloader_hook(result_url, api_key, job_dict):

try:
result = requests.post(
modify_input_url(result_url), # modify with local config
modify_input_url(result_url), # modify with local config
data=json.dumps(job_dict, cls=DatetimeJsonEncoder),
verify=SSL_VERIFY,
headers=headers)
Expand Down Expand Up @@ -572,7 +572,6 @@ def _get_user_from_key(api_key_or_token):
return get_user_from_token(api_key_or_token)



def get_resource_and_dataset(resource_id, api_key):
"""
Gets available information about the resource and its dataset from CKAN
Expand Down
Loading
Loading