|
18 | 18 | from rq.timeouts import JobTimeoutException
|
19 | 19 | import sqlalchemy as sa
|
20 | 20 |
|
21 |
| -from ckan import model |
22 | 21 | from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h
|
23 | 22 |
|
24 | 23 | from . import db, loader
|
25 |
| -from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, XLoaderTimeoutError |
| 24 | +from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, LoaderError, XLoaderTimeoutError |
26 | 25 | from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url
|
27 | 26 |
|
28 | 27 |
|
@@ -258,11 +257,12 @@ def xloader_data_into_datastore_(input, job_dict, logger):
|
258 | 257 | logger.info('File hash: %s', file_hash)
|
259 | 258 | resource['hash'] = file_hash
|
260 | 259 |
|
261 |
| - def direct_load(): |
| 260 | + def direct_load(allow_type_guessing=False): |
262 | 261 | fields = loader.load_csv(
|
263 | 262 | tmp_file.name,
|
264 | 263 | resource_id=resource['id'],
|
265 | 264 | mimetype=resource.get('format'),
|
| 265 | + allow_type_guessing=allow_type_guessing, |
266 | 266 | logger=logger)
|
267 | 267 | loader.calculate_record_count(
|
268 | 268 | resource_id=resource['id'], logger=logger)
|
@@ -318,24 +318,24 @@ def tabulator_load():
|
318 | 318 | direct_load()
|
319 | 319 | else:
|
320 | 320 | try:
|
321 |
| - direct_load() |
322 |
| - except JobError as e: |
| 321 | + direct_load(allow_type_guessing=True) |
| 322 | + except (JobError, LoaderError) as e: |
323 | 323 | logger.warning('Load using COPY failed: %s', e)
|
324 | 324 | logger.info('Trying again with tabulator')
|
325 | 325 | tabulator_load()
|
326 | 326 | except JobTimeoutException:
|
327 |
| - try: |
328 |
| - tmp_file.close() |
329 |
| - except FileNotFoundError: |
330 |
| - pass |
331 | 327 | logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT)
|
332 | 328 | raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT))
|
333 | 329 | except FileCouldNotBeLoadedError as e:
|
334 | 330 | logger.warning('Loading excerpt for this format not supported.')
|
335 | 331 | logger.error('Loading file raised an error: %s', e)
|
336 | 332 | raise JobError('Loading file raised an error: {}'.format(e))
|
337 |
| - |
338 |
| - tmp_file.close() |
| 333 | + finally: |
| 334 | + try: |
| 335 | + tmp_file.close() |
| 336 | + os.remove(tmp_file.name) |
| 337 | + except FileNotFoundError: |
| 338 | + pass |
339 | 339 |
|
340 | 340 | logger.info('Express Load completed')
|
341 | 341 |
|
@@ -439,7 +439,7 @@ def _download_resource_data(resource, data, api_key, logger):
|
439 | 439 | except requests.exceptions.Timeout:
|
440 | 440 | logger.warning('URL time out after %ss', DOWNLOAD_TIMEOUT)
|
441 | 441 | raise XLoaderTimeoutError('Connection timed out after {}s'.format(
|
442 |
| - DOWNLOAD_TIMEOUT)) |
| 442 | + DOWNLOAD_TIMEOUT)) |
443 | 443 | except requests.exceptions.RequestException as e:
|
444 | 444 | tmp_file.close()
|
445 | 445 | try:
|
@@ -525,7 +525,7 @@ def callback_xloader_hook(result_url, api_key, job_dict):
|
525 | 525 |
|
526 | 526 | try:
|
527 | 527 | result = requests.post(
|
528 |
| - modify_input_url(result_url), # modify with local config |
| 528 | + modify_input_url(result_url), # modify with local config |
529 | 529 | data=json.dumps(job_dict, cls=DatetimeJsonEncoder),
|
530 | 530 | verify=SSL_VERIFY,
|
531 | 531 | headers=headers)
|
@@ -572,7 +572,6 @@ def _get_user_from_key(api_key_or_token):
|
572 | 572 | return get_user_from_token(api_key_or_token)
|
573 | 573 |
|
574 | 574 |
|
575 |
| - |
576 | 575 | def get_resource_and_dataset(resource_id, api_key):
|
577 | 576 | """
|
578 | 577 | Gets available information about the resource and its dataset from CKAN
|
|
0 commit comments