Skip to content

Commit 594891a

Browse files
authored
Merge pull request #256 perf: implement chunked fulltext indexing for large datasets from cgoldshtein/master
perf: implement chunked fulltext indexing for large datasets
2 parents 2abf0aa + 6a9fecf commit 594891a

File tree

2 files changed

+91
-24
lines changed

2 files changed

+91
-24
lines changed

ckanext/xloader/config_declaration.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,14 @@ groups:
187187
they will also display "complete", "active", "inactive", and "unknown".
188188
type: bool
189189
required: false
190+
- key: ckanext.xloader.search_update_chunks
191+
default: 100000
192+
example: True
193+
description: |
194+
The number of rows to process in each batch when populating the full-text
195+
search index. Chunked processing prevents database timeouts and memory
196+
exhaustion when indexing very large datasets (4GB+ files with millions of rows).
197+
Smaller values reduce memory usage but increase processing time. Larger values
198+
improve performance but may cause timeouts on very large tables.
199+
type: int
200+
required: false

ckanext/xloader/loader.py

Lines changed: 80 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def strip_white_space_iter():
341341

342342
logger.info('Creating search index...')
343343
with engine.begin() as conn:
344-
_populate_fulltext(conn, resource_id, fields=fields)
344+
_populate_fulltext(conn, resource_id, fields=fields, logger=logger)
345345
logger.info('...search index created')
346346

347347
return fields
@@ -659,30 +659,86 @@ def _enable_fulltext_trigger(connection, resource_id):
659659
.format(table=identifier(resource_id, True))))
660660

661661

662-
def _populate_fulltext(connection, resource_id, fields):
663-
'''Populates the _full_text column. i.e. the same as datastore_run_triggers
664-
but it runs in 1/9 of the time.
665-
666-
The downside is that it reimplements the code that calculates the text to
667-
index, breaking DRY. And its annoying to pass in the column names.
668-
669-
fields: list of dicts giving the each column's 'id' (name) and 'type'
670-
(text/numeric/timestamp)
662+
def _get_rows_count_of_resource(connection, table):
663+
count_query = ''' SELECT count(_id) from {table} '''.format(table=table)
664+
results = connection.execute(count_query)
665+
rows_count = int(results.first()[0])
666+
return rows_count
667+
668+
669+
def _populate_fulltext(connection, resource_id, fields, logger):
670+
'''Populates the _full_text column for full-text search functionality.
671+
672+
This function creates a PostgreSQL tsvector (text search vector) for each row
673+
by concatenating all non-system columns. It's equivalent to datastore_run_triggers
674+
but runs approximately 9x faster by using direct SQL updates.
675+
676+
To handle very large datasets (e.g., 4GB+ files with millions of rows), the update
677+
operation is partitioned into chunks to prevent:
678+
- Database statement timeouts
679+
- Memory exhaustion
680+
- Lock contention that could block other operations
681+
- Transaction log overflow
682+
683+
The chunking mechanism processes rows in batches based on their _id values,
684+
with chunk size configurable via 'ckanext.xloader.search_update_chunks'
685+
(default: 100,000 rows per chunk).
686+
687+
Args:
688+
connection: Database connection object
689+
resource_id (str): The datastore table identifier
690+
fields (list): List of dicts with column 'id' (name) and 'type'
691+
(text/numeric/timestamp)
692+
logger: Logger instance for progress tracking
693+
694+
Note:
695+
This reimplements CKAN's text indexing logic for performance,
696+
breaking DRY principle but providing significant speed improvements.
671697
'''
672-
stmt = sa.update(sa.table(resource_id, sa.column("_full_text"))).values(
673-
_full_text=sa.text("to_tsvector({})".format(
674-
" || ' ' || ".join(
675-
'coalesce({}, \'\')'.format(
676-
identifier(field['id'])
677-
+ ('::text' if field['type'] != 'text' else '')
678-
)
679-
for field in fields
680-
if not field['id'].startswith('_')
681-
)
682-
))
683-
)
684-
685-
connection.execute(stmt)
698+
try:
699+
# Get total row count to determine chunking strategy
700+
rows_count = _get_rows_count_of_resource(connection, identifier(resource_id))
701+
except Exception as e:
702+
rows_count = ''
703+
logger.info("Failed to get resource rows count: {} ".format(str(e)))
704+
raise
705+
706+
if rows_count:
707+
# Configure chunk size - prevents timeouts and memory issues on large datasets
708+
# Default 100,000 rows per chunk balances performance vs. resource usage
709+
chunks = int(config.get('ckanext.xloader.search_update_chunks', 100000))
710+
711+
# Process table in chunks using _id range queries
712+
# This approach ensures consistent chunk sizes and allows resuming if interrupted
713+
for start in range(0, rows_count, chunks):
714+
try:
715+
# Build SQL to update _full_text column with concatenated searchable content
716+
sql = \
717+
'''
718+
UPDATE {table}
719+
SET _full_text = to_tsvector({cols}) WHERE _id BETWEEN {first} and {end};
720+
'''.format(
721+
table=identifier(resource_id),
722+
# Concatenate all user columns (excluding system columns starting with '_')
723+
# coalesce() handles NULL values by converting them to empty strings
724+
cols=" || ' ' || ".join(
725+
'coalesce({}, \'\')'.format(
726+
identifier(field['id'])
727+
+ ('::text' if field['type'] != 'text' else '') # Cast non-text types
728+
)
729+
for field in fields
730+
if not field['id'].startswith('_') # Skip system columns like _id, _full_text
731+
),
732+
first=start,
733+
end=start + chunks
734+
)
735+
connection.execute(sql)
736+
logger.info("Indexed rows {first} to {end} of {total}".format(
737+
first=start, end=min(start + chunks, rows_count), total=rows_count))
738+
except Exception as e:
739+
# Log chunk-specific errors but continue processing remaining chunks
740+
logger.error("Failed to index rows {first}-{end}: {error}".format(
741+
first=start, end=start + chunks, error=str(e)))
686742

687743

688744
def calculate_record_count(resource_id, logger):

0 commit comments

Comments
 (0)