diff --git a/ckanext/xloader/helpers.py b/ckanext/xloader/helpers.py index 25e6ba83..c7bdd018 100644 --- a/ckanext/xloader/helpers.py +++ b/ckanext/xloader/helpers.py @@ -40,7 +40,7 @@ def is_resource_supported_by_xloader(res_dict, check_access=True): try: is_supported_url_type = url_type not in toolkit.h.datastore_rw_resource_url_types() except AttributeError: - is_supported_url_type = (url_type == 'upload') + is_supported_url_type = (url_type in ['upload', 'None']) else: is_supported_url_type = True return (is_supported_format or is_datastore_active) and user_has_access and is_supported_url_type diff --git a/ckanext/xloader/job_exceptions.py b/ckanext/xloader/job_exceptions.py index 88f8f2e3..2303f3ed 100644 --- a/ckanext/xloader/job_exceptions.py +++ b/ckanext/xloader/job_exceptions.py @@ -53,6 +53,7 @@ class LoaderError(JobError): '''Exception that's raised if a load fails''' pass + class XLoaderTimeoutError(JobError): """Custom timeout exception that can be retried""" - pass \ No newline at end of file + pass diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index e3f48a68..cd550a32 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -18,11 +18,10 @@ from rq.timeouts import JobTimeoutException import sqlalchemy as sa -from ckan import model from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h from . import db, loader -from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, XLoaderTimeoutError +from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, LoaderError, XLoaderTimeoutError from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url @@ -258,11 +257,12 @@ def xloader_data_into_datastore_(input, job_dict, logger): logger.info('File hash: %s', file_hash) resource['hash'] = file_hash - def direct_load(): + def direct_load(allow_type_guessing=False): fields = loader.load_csv( tmp_file.name, resource_id=resource['id'], mimetype=resource.get('format'), + allow_type_guessing=allow_type_guessing, logger=logger) loader.calculate_record_count( resource_id=resource['id'], logger=logger) @@ -318,24 +318,24 @@ def tabulator_load(): direct_load() else: try: - direct_load() - except JobError as e: + direct_load(allow_type_guessing=True) + except (JobError, LoaderError) as e: logger.warning('Load using COPY failed: %s', e) logger.info('Trying again with tabulator') tabulator_load() except JobTimeoutException: - try: - tmp_file.close() - except FileNotFoundError: - pass logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) except FileCouldNotBeLoadedError as e: logger.warning('Loading excerpt for this format not supported.') logger.error('Loading file raised an error: %s', e) raise JobError('Loading file raised an error: {}'.format(e)) - - tmp_file.close() + finally: + try: + tmp_file.close() + os.remove(tmp_file.name) + except FileNotFoundError: + pass logger.info('Express Load completed') @@ -439,7 +439,7 @@ def _download_resource_data(resource, data, api_key, logger): except requests.exceptions.Timeout: logger.warning('URL time out after %ss', DOWNLOAD_TIMEOUT) raise XLoaderTimeoutError('Connection timed out after {}s'.format( - DOWNLOAD_TIMEOUT)) + DOWNLOAD_TIMEOUT)) except requests.exceptions.RequestException as e: tmp_file.close() try: @@ -525,7 +525,7 @@ def callback_xloader_hook(result_url, api_key, job_dict): try: result = requests.post( - modify_input_url(result_url), # modify with local config + modify_input_url(result_url), # modify with local config data=json.dumps(job_dict, cls=DatetimeJsonEncoder), verify=SSL_VERIFY, headers=headers) @@ -572,7 +572,6 @@ def _get_user_from_key(api_key_or_token): return get_user_from_token(api_key_or_token) - def get_resource_and_dataset(resource_id, api_key): """ Gets available information about the resource and its dataset from CKAN diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 23f9ea09..241ff9de 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -2,10 +2,10 @@ from __future__ import absolute_import import datetime +from enum import Enum import itertools from six import text_type as str, binary_type import os -import os.path import tempfile from decimal import Decimal @@ -36,6 +36,24 @@ SINGLE_BYTE_ENCODING = 'cp1252' +class FieldMatch(Enum): + """ Enumerates the possible match results between existing and new fields. + + EXACT_MATCH indicates that the field count, names and types are identical, + allowing a datastore table to be truncated and reused. + + NAME_MATCH indicates that the field count and names are the same, + but one or more field types have changed, so the table must be dropped + and recreated, but the Data Dictionary can be preserved if applicable. + + MISMATCH indicates that the field count or names have changed, + so the table must be dropped and recreated by guessing the types. + """ + EXACT_MATCH = 1, + NAME_MATCH = 2, + MISMATCH = 3 + + class UnknownEncodingStream(object): """ Provides a context manager that wraps a Tabulator stream and tries multiple encodings if one fails. @@ -83,6 +101,8 @@ def detect_encoding(file_path): def _fields_match(fields, existing_fields, logger): ''' Check whether all columns have the same names and types as previously, independent of ordering. + + Returns one of the values of FieldMatch. ''' # drop the generated '_id' field for index in range(len(existing_fields)): @@ -94,24 +114,24 @@ def _fields_match(fields, existing_fields, logger): field_count = len(fields) if field_count != len(existing_fields): logger.info("Fields do not match; there are now %s fields but previously %s", field_count, len(existing_fields)) - return False + return FieldMatch.MISMATCH - # ensure each field is present in both collections with the same type + # ensure each field is present in both collections and check for type changes + type_changed = False for index in range(field_count): field_id = fields[index]['id'] for existing_index in range(field_count): existing_field_id = existing_fields[existing_index]['id'] if field_id == existing_field_id: - if fields[index]['type'] == existing_fields[existing_index]['type']: - break - else: - logger.info("Fields do not match; new type for %s field is %s but existing type is %s", + if fields[index]['type'] != existing_fields[existing_index]['type']: + logger.info("Field has changed; new type for %s field is %s but existing type is %s", field_id, fields[index]["type"], existing_fields[existing_index]['type']) - return False + type_changed = True + break else: logger.info("Fields do not match; no existing entry found for %s", field_id) - return False - return True + return FieldMatch.MISMATCH + return FieldMatch.NAME_MATCH if type_changed else FieldMatch.EXACT_MATCH def _clear_datastore_resource(resource_id): @@ -123,20 +143,23 @@ def _clear_datastore_resource(resource_id): conn.execute(sa.text('TRUNCATE TABLE "{}" RESTART IDENTITY'.format(resource_id))) -def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): - '''Loads a CSV into DataStore. Does not create the indexes.''' - - decoding_result = detect_encoding(csv_filepath) - logger.info("load_csv: Decoded encoding: %s", decoding_result) +def _read_metadata(table_filepath, mimetype, logger): # Determine the header row + logger.info('Determining column names and types') + decoding_result = detect_encoding(table_filepath) + logger.debug("Decoded encoding: %s", decoding_result) try: - file_format = os.path.splitext(csv_filepath)[1].strip('.') - with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream: + file_format = os.path.splitext(table_filepath)[1].strip('.') + with UnknownEncodingStream(table_filepath, file_format, decoding_result, + skip_rows=[{'type': 'preset', 'value': 'blank'}], + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] - with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream: + with UnknownEncodingStream(table_filepath, file_format, decoding_result, + skip_rows=[{'type': 'preset', 'value': 'blank'}], + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) @@ -144,7 +167,43 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): raise FileCouldNotBeLoadedError(e) # Some headers might have been converted from strings to floats and such. - headers = encode_headers(headers) + return ( + file_format, + decoding_result, + header_offset, + encode_headers(headers), + stream, + ) + + +def _read_existing_fields(resource_id): + # get column info from existing table + existing = datastore_resource_exists(resource_id) + if existing: + if p.toolkit.check_ckan_version(min_version='2.11'): + ds_info = p.toolkit.get_action('datastore_info')({'ignore_auth': True}, {'id': resource_id}) + existing_fields = ds_info.get('fields', []) + else: + existing_fields = existing.get('fields', []) + existing_info = dict((f['id'], f['info']) + for f in existing_fields + if 'info' in f) + existing_fields_by_headers = dict((f['id'], f) + for f in existing_fields) + return (True, existing_info, existing_fields, existing_fields_by_headers) + else: + return (False, None, None, None) + + +def load_csv(csv_filepath, resource_id, mimetype='text/csv', allow_type_guessing=False, logger=None): + '''Loads a CSV into DataStore. Does not create the indexes. + + allow_type_guessing: Whether to fall back to Tabulator type-guessing + in the event that the resource already existed but its structure has + changed. + ''' + + file_format, decoding_result, header_offset, headers, stream = _read_metadata(csv_filepath, mimetype, logger) # Get the list of rows to skip. The rows in the tabulator stream are # numbered starting with 1. @@ -157,10 +216,12 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): logger.warning('Could not determine delimiter from file, use default ","') delimiter = ',' + # Strip leading and trailing whitespace, then truncate to maximum length, + # then strip again in case the truncation exposed a space. headers = [ header.strip()[:MAX_COLUMN_LENGTH].strip() for header in headers - if header.strip() + if header and header.strip() ] # TODO worry about csv header name problems @@ -171,171 +232,161 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): # to one that pgloader will understand. logger.info('Ensuring character coding is UTF8') f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False) - try: - # datastore db connection - engine = get_write_engine() - # get column info from existing table - existing = datastore_resource_exists(resource_id) - existing_info = {} - if existing: - if p.toolkit.check_ckan_version(min_version='2.11'): - ds_info = p.toolkit.get_action('datastore_info')({'ignore_auth': True}, {'id': resource_id}) - existing_fields = ds_info.get('fields', []) - else: - existing_fields = existing.get('fields', []) - existing_info = dict((f['id'], f['info']) - for f in existing_fields - if 'info' in f) - existing_fields_by_headers = dict((f['id'], f) - for f in existing_fields) - - # Column types are either set (overridden) in the Data Dictionary page - # or default to text type (which is robust) - fields = [ - {'id': header_name, - 'type': existing_info.get(header_name, {}) - .get('type_override') or 'text', - } - for header_name in headers] - - # Maintain data dictionaries from matching column names - for f in fields: - if f['id'] in existing_info: - f['info'] = existing_info[f['id']] - f['strip_extra_white'] = existing_info[f['id']].get('strip_extra_white') if 'strip_extra_white' in existing_info[f['id']] \ - else existing_fields_by_headers[f['id']].get('strip_extra_white', True) - - ''' - Delete or truncate existing datastore table before proceeding, - depending on whether any fields have changed. - Otherwise the COPY will append to the existing table. - And if the fields have significantly changed, it may also fail. - ''' - if _fields_match(fields, existing_fields, logger): - logger.info('Clearing records for "%s" from DataStore.', resource_id) - _clear_datastore_resource(resource_id) - else: - logger.info('Deleting "%s" from DataStore.', resource_id) - delete_datastore_resource(resource_id) - else: - fields = [ - {'id': header_name, - 'type': 'text', - 'strip_extra_white': True} - for header_name in headers] - - logger.info('Fields: %s', fields) - - def _make_whitespace_stripping_iter(super_iter): - def strip_white_space_iter(): - for row in super_iter(): - if len(row) == len(fields): - for _index, _cell in enumerate(row): - # only strip white space if strip_extra_white is True - if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str): - row[_index] = _cell.strip() - yield row - return strip_white_space_iter - - save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} - try: - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, - skip_rows=skip_rows) as stream: - stream.iter = _make_whitespace_stripping_iter(stream.iter) - stream.save(**save_args) - except (EncodingError, UnicodeDecodeError): - with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, - skip_rows=skip_rows) as stream: - stream.iter = _make_whitespace_stripping_iter(stream.iter) - stream.save(**save_args) - csv_filepath = f_write.name - - # Create table - from ckan import model - context = {'model': model, 'ignore_auth': True} - data_dict = dict( - resource_id=resource_id, - fields=fields, - ) - data_dict['records'] = None # just create an empty table - data_dict['force'] = True # TODO check this - I don't fully - # understand read-only/datastore resources - try: - p.toolkit.get_action('datastore_create')(context, data_dict) - except p.toolkit.ValidationError as e: - if 'fields' in e.error_dict: - # e.g. {'message': None, 'error_dict': {'fields': [u'"***" is not a valid field name']}, '_error_summary': None} # noqa - error_message = e.error_dict['fields'][0] - raise LoaderError('Error with field definition: {}' - .format(error_message)) - else: - raise LoaderError( - 'Validation error when creating the database table: {}' - .format(str(e))) - except Exception as e: - raise LoaderError('Could not create the database table: {}' - .format(e)) + # get column info from existing table + existing, existing_info, existing_fields, existing_fields_by_headers = _read_existing_fields(resource_id) + if existing: + # Column types are either set (overridden) in the Data Dictionary page + # or default to text type (which is robust) + fields = [ + {'id': header_name, + 'type': existing_info.get(header_name, {}) + .get('type_override') or 'text', + } + for header_name in headers] - # datastore_active is switched on by datastore_create - # TODO temporarily disable it until the load is complete + # Maintain data dictionaries from matching column names + for f in fields: + if f['id'] in existing_info: + f['info'] = existing_info[f['id']] + f['strip_extra_white'] = existing_info[f['id']].get('strip_extra_white') if 'strip_extra_white' in existing_info[f['id']] \ + else existing_fields_by_headers[f['id']].get('strip_extra_white', True) - with engine.begin() as conn: - _disable_fulltext_trigger(conn, resource_id) + ''' + Delete or truncate existing datastore table before proceeding, + depending on whether any fields have changed. + Otherwise the COPY will append to the existing table. + And if the fields have significantly changed, it may also fail. + ''' + fields_match = _fields_match(fields, existing_fields, logger) + if fields_match == FieldMatch.EXACT_MATCH: + logger.info('Clearing records for "%s" from DataStore.', resource_id) + _clear_datastore_resource(resource_id) + else: + logger.info('Deleting "%s" from DataStore.', resource_id) + delete_datastore_resource(resource_id) + # if file structure has changed, + # and it wasn't just from a Data Dictionary override, + # then we need to re-guess types + if allow_type_guessing and fields_match == FieldMatch.MISMATCH: + raise LoaderError("File structure has changed, reverting to Tabulator") + else: + fields = [ + {'id': header_name, + 'type': 'text', + 'strip_extra_white': True} + for header_name in headers] + + logger.info('Fields: %s', fields) + + def _make_whitespace_stripping_iter(super_iter): + def strip_white_space_iter(): + for row in super_iter(): + if len(row) == len(fields): + for _index, _cell in enumerate(row): + # only strip white space if strip_extra_white is True + if fields[_index].get('strip_extra_white', True) and isinstance(_cell, str): + row[_index] = _cell.strip() + yield row + return strip_white_space_iter + + save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} + try: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result, + skip_rows=skip_rows) as stream: + stream.iter = _make_whitespace_stripping_iter(stream.iter) + stream.save(**save_args) + except (EncodingError, UnicodeDecodeError): + with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, + skip_rows=skip_rows) as stream: + stream.iter = _make_whitespace_stripping_iter(stream.iter) + stream.save(**save_args) + csv_filepath = f_write.name + + # Create table + from ckan import model + context = {'model': model, 'ignore_auth': True} + data_dict = dict( + resource_id=resource_id, + fields=fields, + ) + data_dict['records'] = None # just create an empty table + data_dict['force'] = True # TODO check this - I don't fully + # understand read-only/datastore resources + try: + p.toolkit.get_action('datastore_create')(context, data_dict) + except p.toolkit.ValidationError as e: + if 'fields' in e.error_dict: + # e.g. {'message': None, 'error_dict': {'fields': [u'"***" is not a valid field name']}, '_error_summary': None} # noqa + error_message = e.error_dict['fields'][0] + raise LoaderError('Error with field definition: {}' + .format(error_message)) + else: + raise LoaderError( + 'Validation error when creating the database table: {}' + .format(str(e))) + except Exception as e: + raise LoaderError('Could not create the database table: {}' + .format(e)) - with engine.begin() as conn: - context['connection'] = conn - _drop_indexes(context, data_dict, False) + # datastore_active is switched on by datastore_create + # TODO temporarily disable it until the load is complete - logger.info('Copying to database...') + engine = get_write_engine() + with engine.begin() as conn: + _disable_fulltext_trigger(conn, resource_id) - # Options for loading into postgres: - # 1. \copy - can't use as that is a psql meta-command and not accessible - # via psycopg2 - # 2. COPY - requires the db user to have superuser privileges. This is - # dangerous. It is also not available on AWS, for example. - # 3. pgloader method? - as described in its docs: - # Note that while the COPY command is restricted to read either from - # its standard input or from a local file on the server's file system, - # the command line tool psql implements a \copy command that knows - # how to stream a file local to the client over the network and into - # the PostgreSQL server, using the same protocol as pgloader uses. - # 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids - # the superuser issue. <-- picked - - with engine.begin() as conn: - cur = conn.connection.cursor() - try: - with open(csv_filepath, 'rb') as f: - # can't use :param for table name because params are only - # for filter values that are single quoted. - try: - cur.copy_expert( - "COPY \"{resource_id}\" ({column_names}) " - "FROM STDIN " - "WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, " - " ENCODING '{encoding}');" - .format( - resource_id=resource_id, - column_names=', '.join(['"{}"'.format(h) - for h in headers]), - delimiter=delimiter, - encoding='UTF8', - ), - f) - except psycopg2.DataError as e: - # e is a str but with foreign chars e.g. - # 'extra data: "paul,pa\xc3\xbcl"\n' - # but logging and exceptions need a normal (7 bit) str - error_str = str(e) - logger.warning(error_str) - raise LoaderError('Error during the load into PostgreSQL:' - ' {}'.format(error_str)) - - finally: - cur.close() - finally: - os.remove(csv_filepath) # i.e. the tempfile + with engine.begin() as conn: + context['connection'] = conn + _drop_indexes(context, data_dict, False) + + logger.info('Copying to database...') + + # Options for loading into postgres: + # 1. \copy - can't use as that is a psql meta-command and not accessible + # via psycopg2 + # 2. COPY - requires the db user to have superuser privileges. This is + # dangerous. It is also not available on AWS, for example. + # 3. pgloader method? - as described in its docs: + # Note that while the COPY command is restricted to read either from + # its standard input or from a local file on the server's file system, + # the command line tool psql implements a \copy command that knows + # how to stream a file local to the client over the network and into + # the PostgreSQL server, using the same protocol as pgloader uses. + # 4. COPY FROM STDIN - not quite as fast as COPY from a file, but avoids + # the superuser issue. <-- picked + + with engine.begin() as conn: + cur = conn.connection.cursor() + try: + with open(csv_filepath, 'rb') as f: + # can't use :param for table name because params are only + # for filter values that are single quoted. + try: + cur.copy_expert( + "COPY \"{resource_id}\" ({column_names}) " + "FROM STDIN " + "WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, " + " ENCODING '{encoding}');" + .format( + resource_id=resource_id, + column_names=', '.join(['"{}"'.format(h) + for h in headers]), + delimiter=delimiter, + encoding='UTF8', + ), + f) + except psycopg2.DataError as e: + # e is a str but with foreign chars e.g. + # 'extra data: "paul,pa\xc3\xbcl"\n' + # but logging and exceptions need a normal (7 bit) str + error_str = str(e) + logger.warning(error_str) + raise LoaderError('Error during the load into PostgreSQL:' + ' {}'.format(error_str)) + + finally: + cur.close() logger.info('...copying done') @@ -383,44 +434,9 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): Largely copied from datapusher - see below. Is slower than load_csv. ''' - # Determine the header row - logger.info('Determining column names and types') - decoding_result = detect_encoding(table_filepath) - logger.info("load_table: Decoded encoding: %s", decoding_result) - try: - file_format = os.path.splitext(table_filepath)[1].strip('.') - with UnknownEncodingStream(table_filepath, file_format, decoding_result, - skip_rows=[{'type': 'preset', 'value': 'blank'}], - post_parse=[TypeConverter().convert_types]) as stream: - header_offset, headers = headers_guess(stream.sample) - except TabulatorException: - try: - file_format = mimetype.lower().split('/')[-1] - with UnknownEncodingStream(table_filepath, file_format, decoding_result, - skip_rows=[{'type': 'preset', 'value': 'blank'}], - post_parse=[TypeConverter().convert_types]) as stream: - header_offset, headers = headers_guess(stream.sample) - except TabulatorException as e: - raise LoaderError('Tabulator error: {}'.format(e)) - except Exception as e: - raise FileCouldNotBeLoadedError(e) - - existing = datastore_resource_exists(resource_id) - existing_info = None - if existing: - if p.toolkit.check_ckan_version(min_version='2.11'): - ds_info = p.toolkit.get_action('datastore_info')({'ignore_auth': True}, {'id': resource_id}) - existing_fields = ds_info.get('fields', []) - else: - existing_fields = existing.get('fields', []) - existing_info = dict( - (f['id'], f['info']) - for f in existing_fields if 'info' in f) - existing_fields_by_headers = dict((f['id'], f) - for f in existing_fields) + file_format, decoding_result, header_offset, headers, stream = _read_metadata(table_filepath, mimetype, logger) - # Some headers might have been converted from strings to floats and such. - headers = encode_headers(headers) + existing, existing_info, existing_fields, existing_fields_by_headers = _read_existing_fields(resource_id) # Get the list of rows to skip. The rows in the tabulator stream are # numbered starting with 1. We also want to skip the header row. @@ -434,7 +450,7 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): fields = [] # override with types user requested - if existing_info: + if existing: types = [ { 'text': str, @@ -485,7 +501,7 @@ def row_iterator(): for field in zip(headers, types)] # Maintain data dictionaries from matching column names - if existing_info: + if existing: for h in headers_dicts: if h['id'] in existing_info: h['info'] = existing_info[h['id']] @@ -512,7 +528,7 @@ def row_iterator(): And if the fields have significantly changed, it may also fail. ''' if existing: - if _fields_match(headers_dicts, existing_fields, logger): + if _fields_match(headers_dicts, existing_fields, logger) == FieldMatch.EXACT_MATCH: logger.info('Clearing records for "%s" from DataStore.', resource_id) _clear_datastore_resource(resource_id) else: @@ -688,7 +704,7 @@ def _populate_fulltext(connection, resource_id, fields, logger): connection: Database connection object resource_id (str): The datastore table identifier fields (list): List of dicts with column 'id' (name) and 'type' - (text/numeric/timestamp) + (text/numeric/timestamp) logger: Logger instance for progress tracking Note: @@ -726,8 +742,8 @@ def _populate_fulltext(connection, resource_id, fields, logger): identifier(field['id']) + ('::text' if field['type'] != 'text' else '') # Cast non-text types ) - for field in fields - if not field['id'].startswith('_') # Skip system columns like _id, _full_text + # Skip system columns like _id, _full_text + for field in fields if not field['id'].startswith('_') ), first=start, end=start + chunks diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index dcedfd5f..9a210367 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -1,7 +1,5 @@ # encoding: utf-8 -from six import text_type as str - import ckan.plugins as p import ckanext.datastore.logic.schema as dsschema diff --git a/ckanext/xloader/tests/samples/simple-with-extra-column.csv b/ckanext/xloader/tests/samples/simple-with-extra-column.csv new file mode 100644 index 00000000..1fade95c --- /dev/null +++ b/ckanext/xloader/tests/samples/simple-with-extra-column.csv @@ -0,0 +1,7 @@ +date,temperature,place,foo +2011-01-01,1,Galway,1 +2011-01-02,-1,Galway,2 +2011-01-03,0,Galway,3 +2011-01-01,6,Berkeley,4 +,,Berkeley,5 +2011-01-03,5,,6 diff --git a/ckanext/xloader/tests/test_action.py b/ckanext/xloader/tests/test_action.py index 4497ce98..df0cb875 100644 --- a/ckanext/xloader/tests/test_action.py +++ b/ckanext/xloader/tests/test_action.py @@ -14,6 +14,7 @@ @pytest.mark.usefixtures("clean_db", "with_plugins") @pytest.mark.ckan_config("ckan.plugins", "datastore xloader") class TestAction(object): + def test_submit(self): # checks that xloader_submit enqueues the resource (to be xloadered) user = factories.User() @@ -118,7 +119,6 @@ def test_status(self): assert status["status"] == "pending" - def test_xloader_user_api_token_from_config(self): sysadmin = factories.SysadminWithToken() apikey = sysadmin["token"] diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index a5866f3c..c9fbaee7 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -13,7 +13,6 @@ from unittest import mock from ckanext.xloader import jobs -from ckanext.xloader.utils import get_xloader_user_apitoken _TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10" @@ -247,7 +246,6 @@ def create_mock_error(error_type): return ValueError("Test error") elif error_type == "TypeError": return TypeError("Test error") - def mock_download_with_error(*args, **kwargs): if not hasattr(mock_download_with_error, 'call_count'): diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index 09f87887..c1ddacfb 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -716,6 +716,28 @@ def test_reload(self, Session): ] assert len(self._get_records(Session, resource_id)) == 6 + def test_reload_fallback(self, Session): + csv_filepath = get_sample_filepath("simple.csv") + resource = factories.Resource() + resource_id = resource['id'] + loader.load_csv( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + + with pytest.raises(LoaderError): + # Loading a file with different structure should trigger an error + # so we fall back to Tabulator + loader.load_csv( + get_sample_filepath("simple-with-extra-column.csv"), + resource_id=resource_id, + mimetype="text/csv", + allow_type_guessing=True, + logger=logger, + ) + @pytest.mark.skipif( not p.toolkit.check_ckan_version(min_version="2.7"), reason="Requires CKAN 2.7 - see https://github.com/ckan/ckan/pull/3557", diff --git a/ckanext/xloader/tests/test_utils.py b/ckanext/xloader/tests/test_utils.py index 52886851..2dfcafd1 100644 --- a/ckanext/xloader/tests/test_utils.py +++ b/ckanext/xloader/tests/test_utils.py @@ -3,6 +3,7 @@ from ckan.plugins import toolkit from ckanext.xloader import utils + def test_private_modify_url_no_change(): url = "https://ckan.example.com/dataset" assert utils._modify_url(url, "https://ckan.example.com") == url @@ -15,7 +16,7 @@ def test_private_modify_url_no_change(): ("https://ckan.example.org/resource/123", "https://ckan.example.org", "https://ckan.example.org/resource/123"), ("http://old-ckan.com/resource/456", "http://new-ckan.com", "http://new-ckan.com/resource/456"), ("https://sub.example.com/path", "https://ckan.example.com", "https://ckan.example.com/path"), - ("ftp://fileserver.com/file", "https://ckan.example.com", "ftp://fileserver.com/file"), ##should never happen + ("ftp://fileserver.com/file", "https://ckan.example.com", "ftp://fileserver.com/file"), # should never happen ("https://ckan.example.org/resource/789", "https://xloader.example.org", "https://xloader.example.org/resource/789"), ("https://ckan.example.org/dataset/data", "https://xloader.example.org", "https://xloader.example.org/dataset/data"), ("https://ckan.example.org/resource/123?foo=bar", "https://xloader.example.org", "https://xloader.example.org/resource/123?foo=bar"), @@ -63,7 +64,6 @@ def test_modify_input_url(input_url, ckan_site_url, xloader_site_url, is_altered assert response == input_url - def test_modify_input_url_no_xloader_site(): url = "https://ckan.example.org/dataset" with patch.dict(toolkit.config, {"ckan.site_url": "https://ckan.example.org", "ckanext.xloader.site_url": None}): diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 0089052f..9e022091 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -1,27 +1,22 @@ # encoding: utf-8 +from collections import defaultdict +from decimal import Decimal import json import datetime +import logging import re - from six import text_type as str, binary_type +from urllib.parse import urlunparse, urlparse from ckan import model from ckan.lib import search -from collections import defaultdict -from decimal import Decimal - -import ckan.plugins as p -from ckan.plugins.toolkit import config, h, _ +import ckan.plugins.toolkit as tk from .job_exceptions import JobError -from logging import getLogger - +log = logging.getLogger(__name__) -log = getLogger(__name__) - -from urllib.parse import urlunparse, urlparse # resource.formats accepted by ckanext-xloader. Must be lowercase here. DEFAULT_FORMATS = [ @@ -43,7 +38,7 @@ class XLoaderFormats(object): @classmethod def is_it_an_xloader_format(cls, format_): if cls.formats is None: - cls._formats = config.get("ckanext.xloader.formats") + cls._formats = tk.config.get("ckanext.xloader.formats") if cls._formats is not None: # use config value. preserves empty list as well. cls._formats = cls._formats.lower().split() @@ -55,7 +50,7 @@ def is_it_an_xloader_format(cls, format_): def requires_successful_validation_report(): - return p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)) + return tk.asbool(tk.config.get('ckanext.xloader.validation.requires_successful_report', False)) def awaiting_validation(res_dict): @@ -77,17 +72,18 @@ def awaiting_validation(res_dict): # check for one of the main actions from ckanext-validation # in the case that users extend the Validation plugin class # and rename the plugin entry-point. - p.toolkit.get_action('resource_validation_show') + tk.get_action('resource_validation_show') is_validation_plugin_loaded = True except KeyError: is_validation_plugin_loaded = False if not is_validation_plugin_loaded: # the validation plugin is not loaded but required, log a warning - log.warning('ckanext.xloader.validation.requires_successful_report requires the ckanext-validation plugin to be activated.') + log.warning('ckanext.xloader.validation.requires_successful_report ' + 'requires the ckanext-validation plugin to be activated.') return False - if (p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)) + if (tk.asbool(tk.config.get('ckanext.xloader.validation.enforce_schema', True)) or res_dict.get('schema', None)) and res_dict.get('validation_status', None) != 'success': # either validation.enforce_schema is turned on or it is off and there is a schema, @@ -101,12 +97,12 @@ def awaiting_validation(res_dict): def resource_data(id, resource_id, rows=None): - if p.toolkit.request.method == "POST": + if tk.request.method == "POST": context = { "ignore_auth": True, } - resource_dict = p.toolkit.get_action("resource_show")( + resource_dict = tk.get_action("resource_show")( context, { "id": resource_id, @@ -114,41 +110,41 @@ def resource_data(id, resource_id, rows=None): ) if awaiting_validation(resource_dict): - h.flash_error(_("Cannot upload resource %s to the DataStore " - "because the resource did not pass validation yet.") % resource_id) - return p.toolkit.redirect_to( + tk.h.flash_error(tk._("Cannot upload resource %s to the DataStore " + "because the resource did not pass validation yet.") % resource_id) + return tk.redirect_to( "xloader.resource_data", id=id, resource_id=resource_id ) try: - p.toolkit.get_action("xloader_submit")( + tk.get_action("xloader_submit")( None, { "resource_id": resource_id, "ignore_hash": True, # user clicked the reload button }, ) - except p.toolkit.ValidationError: + except tk.ValidationError: pass - return p.toolkit.redirect_to( + return tk.redirect_to( "xloader.resource_data", id=id, resource_id=resource_id ) try: - pkg_dict = p.toolkit.get_action("package_show")(None, {"id": id}) - resource = p.toolkit.get_action("resource_show")(None, {"id": resource_id}) - except (p.toolkit.ObjectNotFound, p.toolkit.NotAuthorized): - return p.toolkit.abort(404, p.toolkit._("Resource not found")) + pkg_dict = tk.get_action("package_show")(None, {"id": id}) + resource = tk.get_action("resource_show")(None, {"id": resource_id}) + except (tk.ObjectNotFound, tk.NotAuthorized): + return tk.abort(404, tk._("Resource not found")) try: - xloader_status = p.toolkit.get_action("xloader_status")( + xloader_status = tk.get_action("xloader_status")( None, {"resource_id": resource_id} ) - except p.toolkit.ObjectNotFound: + except tk.ObjectNotFound: xloader_status = {} - except p.toolkit.NotAuthorized: - return p.toolkit.abort(403, p.toolkit._("Not authorized to see this page")) + except tk.NotAuthorized: + return tk.abort(403, tk._("Not authorized to see this page")) extra_vars = { "status": xloader_status, @@ -157,7 +153,7 @@ def resource_data(id, resource_id, rows=None): } if rows: extra_vars["rows"] = rows - return p.toolkit.render( + return tk.render( "xloader/resource_data.html", extra_vars=extra_vars, ) @@ -170,11 +166,10 @@ def get_xloader_user_apitoken(): method returns the api_token set in the config file and defaults to the site_user. """ - api_token = p.toolkit.config.get('ckanext.xloader.api_token') + api_token = tk.config.get('ckanext.xloader.api_token') if api_token and api_token != 'NOT_SET': return api_token - raise p.toolkit.ValidationError({u'ckanext.xloader.api_token': u'NOT_SET, please provide valid api token'}) - + raise tk.ValidationError({u'ckanext.xloader.api_token': u'NOT_SET, please provide valid api token'}) def _modify_url(input_url: str, base_url: str) -> str: @@ -193,12 +188,12 @@ def _modify_url(input_url: str, base_url: str) -> str: return input_url # replace scheme: "http/https" and netloc:"//:@:/" new_url = urlunparse( - (parsed_base_url.scheme, - parsed_base_url.netloc, - parsed_input_url.path, - parsed_input_url.params, - parsed_input_url.query, - parsed_input_url.fragment)) + (parsed_base_url.scheme, + parsed_base_url.netloc, + parsed_input_url.path, + parsed_input_url.params, + parsed_input_url.query, + parsed_input_url.fragment)) return new_url @@ -218,21 +213,21 @@ def modify_input_url(input_url: str) -> str: or the original URL if conditions aren't met """ - xloader_site_url = config.get('ckanext.xloader.site_url') + xloader_site_url = tk.config.get('ckanext.xloader.site_url') if not xloader_site_url: return input_url parsed_input_url = urlparse(input_url) input_base_url = f"{parsed_input_url.scheme}://{parsed_input_url.netloc}" - parsed_ckan_site_url = urlparse(config.get('ckan.site_url')) + parsed_ckan_site_url = urlparse(tk.config.get('ckan.site_url')) ckan_base_url = f"{parsed_ckan_site_url.scheme}://{parsed_ckan_site_url.netloc}" - xloader_ignore_regex = config.get('ckanext.xloader.site_url_ignore_path_regex') + xloader_ignore_regex = tk.config.get('ckanext.xloader.site_url_ignore_path_regex') - #Don't alter non-matching base url's. + # Don't alter non-matching base URLs. if input_base_url != ckan_base_url: return input_url - #And not any urls on the ignore regex + # And not any URLs on the ignore regex elif xloader_ignore_regex and re.search(xloader_ignore_regex, input_url): return input_url @@ -269,7 +264,7 @@ def set_resource_metadata(update_dict): 'q': 'id:"{0}"'.format(resource.package_id), 'fl': 'data_dict', 'wt': 'json', - 'fq': 'site_id:"%s"' % p.toolkit.config.get('ckan.site_id'), + 'fq': 'site_id:"%s"' % tk.config.get('ckan.site_id'), 'rows': 1 } for record in solr_query.run(q)['results']: @@ -390,8 +385,8 @@ def type_guess(rows, types=TYPES, strict=False): def datastore_resource_exists(resource_id): context = {'model': model, 'ignore_auth': True} try: - response = p.toolkit.get_action('datastore_search')(context, dict( + response = tk.get_action('datastore_search')(context, dict( id=resource_id, limit=0)) - except p.toolkit.ObjectNotFound: + except tk.ObjectNotFound: return False return response or {'fields': []}