Skip to content

Commit 031de33

Browse files
authored
Merge pull request ckan#42 from qld-gov-au/develop
Develop to master - make datastore more robust
2 parents e336c83 + feaca86 commit 031de33

File tree

4 files changed

+120
-83
lines changed

4 files changed

+120
-83
lines changed

.github/workflows/test.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
---
22
name: Tests
3-
on: [push, pull_request]
3+
on:
4+
push:
5+
pull_request:
6+
branches:
7+
- master
8+
49
jobs:
510
lint:
611
runs-on: ubuntu-latest

ckanext/xloader/jobs.py

Lines changed: 20 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
from rq import get_current_job
1717
import sqlalchemy as sa
1818

19-
import ckan.model as model
19+
from ckan import model
2020
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config, check_ckan_version
21-
import ckan.lib.search as search
2221

2322
from . import loader
2423
from . import db
2524
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
25+
from .utils import set_resource_metadata
2626

2727
try:
2828
from ckan.lib.api_token import get_user_from_token
@@ -244,7 +244,8 @@ def _download_resource_data(resource, data, api_key, logger):
244244
'''
245245
# check scheme
246246
url = resource.get('url')
247-
scheme = urlsplit(url).scheme
247+
url_parts = urlsplit(url)
248+
scheme = url_parts.scheme
248249
if scheme not in ('http', 'https', 'ftp'):
249250
raise JobError(
250251
'Only http, https, and ftp resources may be fetched.'
@@ -263,7 +264,17 @@ def _download_resource_data(resource, data, api_key, logger):
263264
# otherwise we won't get file from private resources
264265
headers['Authorization'] = api_key
265266

266-
response = get_response(url, headers)
267+
# Add a constantly changing parameter to bypass URL caching.
268+
# If we're running XLoader, then either the resource has
269+
# changed, or something went wrong and we want a clean start.
270+
# Either way, we don't want a cached file.
271+
download_url = url_parts._replace(
272+
query='{}&nonce={}'.format(url_parts.query, time.time())
273+
).geturl()
274+
else:
275+
download_url = url
276+
277+
response = get_response(download_url, headers)
267278

268279
cl = response.headers.get('content-length')
269280
if cl and int(cl) > MAX_CONTENT_LENGTH:
@@ -369,9 +380,12 @@ def set_datastore_active(data, resource, logger):
369380

370381
data['datastore_active'] = True
371382
logger.info('Setting resource.datastore_active = True')
383+
contains_all_records = data.get(
384+
'datastore_contains_all_records_of_source_file', True)
385+
data['datastore_contains_all_records_of_source_file'] = contains_all_records
372386
logger.info(
373-
'Setting resource.datastore_contains_all_records_of_source_file = {}'
374-
.format(data.get('datastore_contains_all_records_of_source_file')))
387+
'Setting resource.datastore_contains_all_records_of_source_file = %s',
388+
contains_all_records)
375389
set_resource_metadata(update_dict=data)
376390

377391

@@ -403,59 +417,6 @@ def callback_xloader_hook(result_url, api_key, job_dict):
403417
return result.status_code == requests.codes.ok
404418

405419

406-
def set_resource_metadata(update_dict):
407-
'''
408-
Set appropriate datastore_active flag on CKAN resource.
409-
410-
Called after creation or deletion of DataStore table.
411-
'''
412-
from ckan import model
413-
# We're modifying the resource extra directly here to avoid a
414-
# race condition, see issue #3245 for details and plan for a
415-
# better fix
416-
update_dict.update({
417-
'datastore_active': update_dict.get('datastore_active', True),
418-
'datastore_contains_all_records_of_source_file':
419-
update_dict.get('datastore_contains_all_records_of_source_file', True)
420-
})
421-
422-
q = model.Session.query(model.Resource). \
423-
filter(model.Resource.id == update_dict['resource_id'])
424-
resource = q.one()
425-
426-
# update extras in database for record
427-
extras = resource.extras
428-
extras.update(update_dict)
429-
q.update({'extras': extras}, synchronize_session=False)
430-
431-
# TODO: Remove resource_revision_table when dropping support for 2.8
432-
if hasattr(model, 'resource_revision_table'):
433-
model.Session.query(model.resource_revision_table).filter(
434-
model.ResourceRevision.id == update_dict['resource_id'],
435-
model.ResourceRevision.current is True
436-
).update({'extras': extras}, synchronize_session=False)
437-
model.Session.commit()
438-
439-
# get package with updated resource from solr
440-
# find changed resource, patch it and reindex package
441-
psi = search.PackageSearchIndex()
442-
solr_query = search.PackageSearchQuery()
443-
q = {
444-
'q': 'id:"{0}"'.format(resource.package_id),
445-
'fl': 'data_dict',
446-
'wt': 'json',
447-
'fq': 'site_id:"%s"' % config.get('ckan.site_id'),
448-
'rows': 1
449-
}
450-
for record in solr_query.run(q)['results']:
451-
solr_data_dict = json.loads(record['data_dict'])
452-
for resource in solr_data_dict['resources']:
453-
if resource['id'] == update_dict['resource_id']:
454-
resource.update(update_dict)
455-
psi.index_package(solr_data_dict)
456-
break
457-
458-
459420
def validate_input(input):
460421
# Especially validate metadata which is provided by the user
461422
if 'metadata' not in input:

ckanext/xloader/plugin.py

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22

33
import logging
44

5-
from ckan.plugins.toolkit import config
6-
import ckan.plugins as plugins
7-
import ckan.plugins.toolkit as toolkit
5+
from ckan import plugins
6+
from ckan.plugins import toolkit
87

9-
from ckanext.xloader import action, auth
10-
import ckanext.xloader.helpers as xloader_helpers
11-
from ckanext.xloader.loader import fulltext_function_exists, get_write_engine
8+
from . import action, auth, helpers as xloader_helpers, utils
9+
from .loader import fulltext_function_exists, get_write_engine
1210

1311
log = logging.getLogger(__name__)
1412

@@ -33,7 +31,7 @@ class XLoaderFormats(object):
3331
@classmethod
3432
def is_it_an_xloader_format(cls, format_):
3533
if cls.formats is None:
36-
cls._formats = config.get("ckanext.xloader.formats")
34+
cls._formats = toolkit.config.get("ckanext.xloader.formats")
3735
if cls._formats is not None:
3836
cls._formats = cls._formats.lower().split()
3937
else:
@@ -135,29 +133,49 @@ def notify(self, resource):
135133
self._submit_to_xloader(resource_dict)
136134

137135
# IResourceController
138-
if toolkit.check_ckan_version("2.10"):
139136

140-
def after_resource_create(self, context, resource_dict):
141-
self._submit_to_xloader(resource_dict)
137+
def after_resource_create(self, context, resource_dict):
138+
self._submit_to_xloader(resource_dict)
142139

143-
def before_resource_show(self, resource_dict):
144-
resource_dict[
145-
"datastore_contains_all_records_of_source_file"
146-
] = toolkit.asbool(
147-
resource_dict.get("datastore_contains_all_records_of_source_file")
148-
)
140+
def before_resource_show(self, resource_dict):
141+
resource_dict[
142+
"datastore_contains_all_records_of_source_file"
143+
] = toolkit.asbool(
144+
resource_dict.get("datastore_contains_all_records_of_source_file")
145+
)
149146

150-
else:
147+
def after_resource_update(self, context, resource_dict):
148+
""" Check whether the datastore is out of sync with the
149+
'datastore_active' flag. This can occur due to race conditions
150+
like https://github.com/ckan/ckan/issues/4663
151+
"""
152+
datastore_active = resource_dict.get('datastore_active', False)
153+
try:
154+
context = {'ignore_auth': True}
155+
if toolkit.get_action('datastore_info')(
156+
context=context, data_dict={'id': resource_dict['id']}):
157+
datastore_exists = True
158+
else:
159+
datastore_exists = False
160+
except toolkit.ObjectNotFound:
161+
datastore_exists = False
162+
163+
if datastore_active != datastore_exists:
164+
# datastore does exist; update flag
165+
utils.set_resource_metadata(
166+
{'resource_id': resource_dict['id'],
167+
'datastore_active': datastore_exists})
168+
169+
if not toolkit.check_ckan_version("2.10"):
151170

152171
def after_create(self, context, resource_dict):
153-
self._submit_to_xloader(resource_dict)
172+
self.after_resource_create(context, resource_dict)
154173

155174
def before_show(self, resource_dict):
156-
resource_dict[
157-
"datastore_contains_all_records_of_source_file"
158-
] = toolkit.asbool(
159-
resource_dict.get("datastore_contains_all_records_of_source_file")
160-
)
175+
self.before_resource_show(resource_dict)
176+
177+
def after_update(self, context, resource_dict):
178+
self.after_resource_update(context, resource_dict)
161179

162180
def _submit_to_xloader(self, resource_dict):
163181
context = {"ignore_auth": True, "defer_commit": True}

ckanext/xloader/utils.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
# encoding: utf-8
2+
3+
import json
4+
5+
from ckan import model
6+
from ckan.lib import search
17
import ckan.plugins as p
28

39

@@ -57,3 +63,50 @@ def get_xloader_user_apitoken():
5763

5864
site_user = p.toolkit.get_action('get_site_user')({'ignore_auth': True}, {})
5965
return site_user["apikey"]
66+
67+
68+
def set_resource_metadata(update_dict):
69+
'''
70+
Set appropriate datastore_active flag on CKAN resource.
71+
72+
Called after creation or deletion of DataStore table.
73+
'''
74+
# We're modifying the resource extra directly here to avoid a
75+
# race condition, see issue #3245 for details and plan for a
76+
# better fix
77+
78+
q = model.Session.query(model.Resource). \
79+
filter(model.Resource.id == update_dict['resource_id'])
80+
resource = q.one()
81+
82+
# update extras in database for record
83+
extras = resource.extras
84+
extras.update(update_dict)
85+
q.update({'extras': extras}, synchronize_session=False)
86+
87+
# TODO: Remove resource_revision_table when dropping support for 2.8
88+
if hasattr(model, 'resource_revision_table'):
89+
model.Session.query(model.resource_revision_table).filter(
90+
model.ResourceRevision.id == update_dict['resource_id'],
91+
model.ResourceRevision.current is True
92+
).update({'extras': extras}, synchronize_session=False)
93+
model.Session.commit()
94+
95+
# get package with updated resource from solr
96+
# find changed resource, patch it and reindex package
97+
psi = search.PackageSearchIndex()
98+
solr_query = search.PackageSearchQuery()
99+
q = {
100+
'q': 'id:"{0}"'.format(resource.package_id),
101+
'fl': 'data_dict',
102+
'wt': 'json',
103+
'fq': 'site_id:"%s"' % p.toolkit.config.get('ckan.site_id'),
104+
'rows': 1
105+
}
106+
for record in solr_query.run(q)['results']:
107+
solr_data_dict = json.loads(record['data_dict'])
108+
for resource in solr_data_dict['resources']:
109+
if resource['id'] == update_dict['resource_id']:
110+
resource.update(update_dict)
111+
psi.index_package(solr_data_dict)
112+
break

0 commit comments

Comments
 (0)