From bdc9107d0771ff1c782765d095e5e35cc94672e4 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Tue, 12 Aug 2025 18:21:54 -0700 Subject: [PATCH 01/19] Add high-performance bulk loading optimizations for large files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add memory configuration options (--memory, --heap-size) for Docker/JVM - Implement chunked parallel loading with configurable chunk size and workers - Add CBOR format support via /update/cbor endpoint for faster binary loading - Auto-configure Solr performance settings (RAM buffer, disable autocommits) - Single commit at end of all uploads instead of per-file commits - Add configure-performance command for manual Solr tuning These optimizations should significantly improve loading performance for large files (25GB+) through: - Parallel processing of file chunks - Optimized memory allocation - Reduced commit overhead - Binary format support 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/cli.py | 149 ++++++++++++++++++++++++++-- linkml_solr/utils/solr_bulkload.py | 152 ++++++++++++++++++++++++++++- 2 files changed, 292 insertions(+), 9 deletions(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index afff7d1..48a9af2 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -8,7 +8,8 @@ from linkml_runtime.linkml_model import SchemaDefinition from linkml_runtime.loaders import yaml_loader from linkml_solr import SolrQueryEngine, SolrEndpoint, DEFAULT_CORE, DEFAULT_SOLR_URL -from linkml_solr.utils.solr_bulkload import bulkload_file +from linkml_solr.utils.solr_bulkload import bulkload_file, bulkload_chunked +import requests @click.group() @@ -44,6 +45,7 @@ def main(verbose: int, quiet: bool): show_default=True, help='solr core.') @click.option('--format', '-f', + type=click.Choice(['csv', 'json', 'cbor']), default='csv', show_default=True, help='input format.') @@ -52,17 +54,79 @@ def main(verbose: int, quiet: bool): help='solr url.') @click.option('--processor', '-p', help='Processor argument to pass when bulk loading to Solr') +@click.option('--chunk-size', '-c', + default=100000, + show_default=True, + help='Number of rows per chunk for large files') +@click.option('--parallel-workers', '-w', + default=4, + show_default=True, + help='Number of parallel workers for chunked loading') +@click.option('--chunked/--no-chunked', + default=False, + show_default=True, + help='Use chunked parallel loading for large files') +@click.option('--auto-configure/--no-auto-configure', + default=True, + show_default=True, + help='Automatically configure Solr for optimal performance') +@click.option('--ram-buffer', + default=2048, + show_default=True, + help='RAM buffer size in MB (used with auto-configure)') @click.argument('files', nargs=-1) -def bulkload(files, format, schema, url, core, processor=None): +def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_workers, chunked, auto_configure, ram_buffer): """ - Convert multiple golr yaml schemas to linkml + Bulk load files into Solr with optional chunking and performance optimization """ - inputs = {} if schema is not None: with open(schema) as stream: schema_obj = yaml_loader.load(stream, target_class=SchemaDefinition) - for f in files: - bulkload_file(f, format=format, schema=schema_obj, core=core, base_url=url, processor=processor) + else: + schema_obj = None + + # Auto-configure Solr for performance if requested + if auto_configure: + print("Configuring Solr for optimal bulk loading performance...") + configure_solr_performance(url, core, ram_buffer, disable_autocommit=True) + + total_loaded = 0 + + try: + for f in files: + print(f"Processing file: {f}") + + if chunked and format in ['csv', 'cbor']: + # Use chunked loading for large files + docs_loaded = bulkload_chunked( + csv_file=f, + base_url=url, + core=core, + schema=schema_obj, + chunk_size=chunk_size, + max_workers=parallel_workers, + format=format, + processor=processor + ) + total_loaded += docs_loaded + print(f"Loaded {docs_loaded} documents from {f}") + else: + # Use standard bulkload + bulkload_file(f, format=format, schema=schema_obj, core=core, base_url=url, processor=processor, commit=False) + print(f"Loaded file {f}") + + # Commit all changes at the end + print("Committing all changes...") + if commit_solr(url, core): + print(f"Successfully committed {total_loaded} documents to Solr") + else: + print("Warning: Commit may have failed") + + except Exception as e: + print(f"Error during bulk loading: {e}") + # Try to commit what we have + commit_solr(url, core) + raise @main.command() @click.option('--schema', '-s', @@ -95,7 +159,15 @@ def bulkload(files, format, schema, url, core, processor=None): default=DEFAULT_SOLR_URL, show_default=True, help='solr url.') -def start_server(schema, kill, container, url, core, port, sleep: int, create_schema): +@click.option('--memory', '-m', + default='4g', + show_default=True, + help='Docker memory limit (e.g., 4g, 8g)') +@click.option('--heap-size', '-j', + default='3g', + show_default=True, + help='JVM heap size (e.g., 2g, 4g)') +def start_server(schema, kill, container, url, core, port, sleep: int, create_schema, memory, heap_size): """ Starts a solr server (via Docker) """ @@ -109,6 +181,10 @@ def start_server(schema, kill, container, url, core, port, sleep: int, create_sc container, '-p', f'{port}:{port}', + '-m', + memory, + '-e', + f'SOLR_JAVA_MEM=-Xms{heap_size} -Xmx{heap_size}', 'solr:8', 'solr-precreate', core] @@ -193,5 +269,64 @@ def create_schema(schema, url, core, debug, dry_run, top_class): print(gen.serialize()) +def configure_solr_performance(url, core, ram_buffer_mb=2048, disable_autocommit=True): + """ + Configure Solr for optimal bulk loading performance + """ + config_url = f"{url}/{core}/config" + + if disable_autocommit: + # Disable autocommit + response = requests.post(config_url, + headers={'Content-type': 'application/json'}, + json={ + "set-property": { + "updateHandler.autoCommit.maxTime": -1, + "updateHandler.autoSoftCommit.maxTime": -1 + } + }) + print(f"Disabled autocommit: {response.status_code}") + + # Set RAM buffer size + response = requests.post(config_url, + headers={'Content-type': 'application/json'}, + json={ + "set-property": { + "updateHandler.indexConfig.ramBufferSizeMB": ram_buffer_mb + } + }) + print(f"Set RAM buffer to {ram_buffer_mb}MB: {response.status_code}") + + +def commit_solr(url, core): + """ + Commit changes to Solr + """ + commit_url = f"{url}/{core}/update?commit=true" + response = requests.post(commit_url) + print(f"Committed changes: {response.status_code}") + return response.status_code == 200 + + +@main.command() +@click.option('--url', '-u', + default=DEFAULT_SOLR_URL, + help='solr url.') +@click.option('--core', '-C', + default=DEFAULT_CORE, + help='solr core.') +@click.option('--ram-buffer', + default=2048, + help='RAM buffer size in MB.') +@click.option('--enable/--disable', + default=False, + help='Enable or disable autocommit.') +def configure_performance(url, core, ram_buffer, enable): + """ + Configure Solr performance settings for bulk loading + """ + configure_solr_performance(url, core, ram_buffer, not enable) + + if __name__ == '__main__': main() diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 8a087ff..9af86af 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -1,7 +1,13 @@ -from typing import List +from typing import List, Optional, Iterator import logging import subprocess +import pandas as pd +import cbor2 +import tempfile +import os +from concurrent.futures import ThreadPoolExecutor, as_completed from linkml_runtime.linkml_model.meta import SchemaDefinition, SlotDefinitionName +import requests def _get_multivalued_slots(schema: SchemaDefinition) -> List[SlotDefinitionName]: return [s.name for s in schema.slots.values() if s.multivalued] @@ -13,6 +19,7 @@ def bulkload_file(f, core=None, schema: SchemaDefinition = None, processor: str = None, + commit: bool = False, ): """ Bulkload a file using solr bulkload API @@ -30,16 +37,157 @@ def bulkload_file(f, separator = '%09' internal_separator = '%7C' parts = [f'f.{s}.split=true&f.{s}.separator={internal_separator}' for s in mvslots] - url = f'{base_url}/{core}/update?{"&".join(parts)}&commit=true&separator={separator}' + commit_param = 'true' if commit else 'false' + url = f'{base_url}/{core}/update?{"&".join(parts)}&commit={commit_param}&separator={separator}' if (processor is not None): url = f'{url}&processor={processor}' if format == 'csv': ct = 'application/csv' elif format == 'json': ct = 'application/json' + elif format == 'cbor': + ct = 'application/cbor' + url = f'{base_url}/{core}/update/cbor?commit={commit_param}' else: raise Exception(f'Unknown format {format}') command = ['curl', url, '-T', f'{f}', '-X', 'POST', '-H', f'Content-type:{ct}'] print(command) subprocess.run(command) + +def csv_to_cbor_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: + """ + Convert a chunk of CSV data to CBOR format + + :param csv_file: Input CSV file path + :param chunk_start: Starting row (0-based) + :param chunk_size: Number of rows to process + :param output_file: Output CBOR file path + :return: Number of documents processed + """ + try: + df = pd.read_csv(csv_file, skiprows=range(1, chunk_start + 1), nrows=chunk_size) + docs = df.to_dict('records') + + with open(output_file, 'wb') as f: + cbor2.dump(docs, f) + + return len(docs) + except Exception as e: + print(f"Error processing chunk starting at {chunk_start}: {e}") + return 0 + + +def bulkload_chunked(csv_file: str, + base_url: str, + core: str, + schema: SchemaDefinition, + chunk_size: int = 100000, + max_workers: int = 4, + format: str = 'csv', + processor: str = None) -> int: + """ + Load a large CSV file in chunks with parallel processing + + :param csv_file: Path to the CSV file + :param base_url: Solr base URL + :param core: Solr core name + :param schema: LinkML schema definition + :param chunk_size: Number of rows per chunk + :param max_workers: Number of parallel workers + :param format: Output format ('csv' or 'cbor') + :param processor: Solr processor to use + :return: Total number of documents loaded + """ + # Get total row count + total_rows = sum(1 for _ in open(csv_file)) - 1 # Subtract header + print(f"Processing {total_rows} rows in chunks of {chunk_size}") + + total_loaded = 0 + temp_files = [] + + try: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + + for chunk_start in range(0, total_rows, chunk_size): + actual_chunk_size = min(chunk_size, total_rows - chunk_start) + + if format == 'cbor': + # Convert chunk to CBOR + temp_file = tempfile.NamedTemporaryFile(suffix='.cbor', delete=False) + temp_file.close() + temp_files.append(temp_file.name) + + future = executor.submit( + csv_to_cbor_chunk, + csv_file, + chunk_start, + actual_chunk_size, + temp_file.name + ) + futures.append((future, temp_file.name, 'cbor')) + else: + # For CSV, create chunk file + temp_file = tempfile.NamedTemporaryFile(suffix='.csv', delete=False) + temp_file.close() + temp_files.append(temp_file.name) + + future = executor.submit( + _create_csv_chunk, + csv_file, + chunk_start, + actual_chunk_size, + temp_file.name + ) + futures.append((future, temp_file.name, 'csv')) + + # Process chunks as they complete + for future, temp_file, file_format in futures: + try: + docs_processed = future.result() + if docs_processed > 0: + # Upload the chunk + bulkload_file( + temp_file, + format=file_format, + base_url=base_url, + core=core, + schema=schema, + processor=processor, + commit=False + ) + total_loaded += docs_processed + print(f"Loaded chunk: {docs_processed} docs (Total: {total_loaded})") + except Exception as e: + print(f"Error processing chunk {temp_file}: {e}") + + finally: + # Clean up temporary files + for temp_file in temp_files: + try: + os.unlink(temp_file) + except: + pass + + return total_loaded + + +def _create_csv_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: + """ + Create a CSV chunk file with header + + :param csv_file: Input CSV file path + :param chunk_start: Starting row (0-based) + :param chunk_size: Number of rows to process + :param output_file: Output CSV file path + :return: Number of rows processed + """ + try: + df = pd.read_csv(csv_file, skiprows=range(1, chunk_start + 1), nrows=chunk_size) + df.to_csv(output_file, index=False) + return len(df) + except Exception as e: + print(f"Error creating CSV chunk starting at {chunk_start}: {e}") + return 0 + From 3f2be33aa583deb8bfcf05398afac0e29ef3399b Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Tue, 12 Aug 2025 19:09:36 -0700 Subject: [PATCH 02/19] Add required dependencies for performance optimizations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add cbor2 for binary format support - Add pandas for CSV chunk processing - Add requests for HTTP API configuration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index aee85ed..004a0d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,9 @@ linkml-runtime = ">=1.7.4" pysolr = "^3.9.0" linkml-dataops = "^0.1.0" jsonasobj = "^1.3.1" +cbor2 = "^5.4.0" +pandas = "^1.3.0" +requests = "^2.25.0" [tool.poetry.dev-dependencies] tox = "^3.24.5" From 003860fb280c2d8a4168848d7485166a3a5feb7f Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Tue, 12 Aug 2025 19:10:23 -0700 Subject: [PATCH 03/19] Bump pandas dependency to modern version MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update pandas from ^1.3.0 to ^2.0.0 for better performance and compatibility 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 004a0d0..e3b941a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ pysolr = "^3.9.0" linkml-dataops = "^0.1.0" jsonasobj = "^1.3.1" cbor2 = "^5.4.0" -pandas = "^1.3.0" +pandas = "^2.0.0" requests = "^2.25.0" [tool.poetry.dev-dependencies] From d7f1ec166e4b5b18d1678e37c73eafff5483aa91 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Tue, 12 Aug 2025 19:35:08 -0700 Subject: [PATCH 04/19] Replace pandas with DuckDB for robust TSV/CSV processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace pandas with DuckDB for better handling of malformed data - Add ignore_errors=true to skip bad rows instead of failing - Auto-detect TSV vs CSV based on file extension - More efficient row counting and chunk processing for large files - Set duckdb dependency to '*' for maximum compatibility This should resolve issues with inconsistent field counts in TSV files. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/utils/solr_bulkload.py | 72 +++++++++++++++++++++++++----- pyproject.toml | 2 +- 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 9af86af..9a35962 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -1,7 +1,7 @@ from typing import List, Optional, Iterator import logging import subprocess -import pandas as pd +import duckdb import cbor2 import tempfile import os @@ -57,21 +57,37 @@ def bulkload_file(f, def csv_to_cbor_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: """ - Convert a chunk of CSV data to CBOR format + Convert a chunk of CSV/TSV data to CBOR format using DuckDB - :param csv_file: Input CSV file path + :param csv_file: Input CSV/TSV file path :param chunk_start: Starting row (0-based) :param chunk_size: Number of rows to process :param output_file: Output CBOR file path :return: Number of documents processed """ try: - df = pd.read_csv(csv_file, skiprows=range(1, chunk_start + 1), nrows=chunk_size) - docs = df.to_dict('records') + conn = duckdb.connect() + + # Auto-detect separator and read chunk with DuckDB + sep = '\t' if csv_file.endswith('.tsv') else ',' + query = f""" + SELECT * FROM read_csv_auto('{csv_file}', + delim='{sep}', + ignore_errors=true, + header=true) + LIMIT {chunk_size} OFFSET {chunk_start} + """ + + result = conn.execute(query).fetchall() + columns = [desc[0] for desc in conn.description] + + # Convert to list of dicts + docs = [dict(zip(columns, row)) for row in result] with open(output_file, 'wb') as f: cbor2.dump(docs, f) + conn.close() return len(docs) except Exception as e: print(f"Error processing chunk starting at {chunk_start}: {e}") @@ -99,8 +115,17 @@ def bulkload_chunked(csv_file: str, :param processor: Solr processor to use :return: Total number of documents loaded """ - # Get total row count - total_rows = sum(1 for _ in open(csv_file)) - 1 # Subtract header + # Get total row count using DuckDB (more reliable for large files) + conn = duckdb.connect() + sep = '\t' if csv_file.endswith('.tsv') else ',' + count_query = f""" + SELECT COUNT(*) FROM read_csv_auto('{csv_file}', + delim='{sep}', + ignore_errors=true, + header=true) + """ + total_rows = conn.execute(count_query).fetchone()[0] + conn.close() print(f"Processing {total_rows} rows in chunks of {chunk_size}") total_loaded = 0 @@ -175,18 +200,41 @@ def bulkload_chunked(csv_file: str, def _create_csv_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: """ - Create a CSV chunk file with header + Create a CSV/TSV chunk file with header using DuckDB - :param csv_file: Input CSV file path + :param csv_file: Input CSV/TSV file path :param chunk_start: Starting row (0-based) :param chunk_size: Number of rows to process :param output_file: Output CSV file path :return: Number of rows processed """ try: - df = pd.read_csv(csv_file, skiprows=range(1, chunk_start + 1), nrows=chunk_size) - df.to_csv(output_file, index=False) - return len(df) + conn = duckdb.connect() + + # Auto-detect separator + sep = '\t' if csv_file.endswith('.tsv') else ',' + + # Export chunk directly to CSV + query = f""" + COPY ( + SELECT * FROM read_csv_auto('{csv_file}', + delim='{sep}', + ignore_errors=true, + header=true) + LIMIT {chunk_size} OFFSET {chunk_start} + ) TO '{output_file}' (FORMAT CSV, HEADER true) + """ + + conn.execute(query) + + # Count rows to return + count_query = f""" + SELECT COUNT(*) FROM read_csv_auto('{output_file}', header=true) + """ + count = conn.execute(count_query).fetchone()[0] + + conn.close() + return count except Exception as e: print(f"Error creating CSV chunk starting at {chunk_start}: {e}") return 0 diff --git a/pyproject.toml b/pyproject.toml index e3b941a..e462e67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ pysolr = "^3.9.0" linkml-dataops = "^0.1.0" jsonasobj = "^1.3.1" cbor2 = "^5.4.0" -pandas = "^2.0.0" +duckdb = "*" requests = "^2.25.0" [tool.poetry.dev-dependencies] From 892f60bd4abe52e8e6025a16d15f4c7bfaa26a00 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Tue, 12 Aug 2025 19:42:54 -0700 Subject: [PATCH 05/19] Fix commit function missing Content-Type header MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add proper Content-Type header to Solr commit requests to avoid 'Missing ContentType' error. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index 48a9af2..1a6fd4d 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -303,7 +303,7 @@ def commit_solr(url, core): Commit changes to Solr """ commit_url = f"{url}/{core}/update?commit=true" - response = requests.post(commit_url) + response = requests.post(commit_url, headers={'Content-Type': 'application/json'}, json={}) print(f"Committed changes: {response.status_code}") return response.status_code == 200 From 9dfb461bd883e6268c324eaa63572acb5f32de62 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 10:24:20 -0700 Subject: [PATCH 06/19] Implement true parallel upload processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move upload operations inside ThreadPoolExecutor for genuine parallelism - Add HTTP connection pooling with 20 concurrent connections - Use as_completed() to process uploads as they finish - Combine chunk creation and upload into single parallel tasks - Add automatic temp file cleanup per chunk This should dramatically improve performance as workers now upload simultaneously instead of sequentially. 8 workers will actually process 8 uploads concurrently to Solr. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/utils/solr_bulkload.py | 179 ++++++++++++++++++----------- 1 file changed, 111 insertions(+), 68 deletions(-) diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 9a35962..0f54c56 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -6,9 +6,30 @@ import tempfile import os from concurrent.futures import ThreadPoolExecutor, as_completed +import threading from linkml_runtime.linkml_model.meta import SchemaDefinition, SlotDefinitionName import requests +# Global session for connection pooling +_session_lock = threading.Lock() +_global_session = None + +def get_http_session(): + """Get a shared HTTP session with connection pooling""" + global _global_session + with _session_lock: + if _global_session is None: + _global_session = requests.Session() + # Configure connection pooling for parallel uploads + adapter = requests.adapters.HTTPAdapter( + pool_connections=20, + pool_maxsize=20, + pool_block=True + ) + _global_session.mount('http://', adapter) + _global_session.mount('https://', adapter) + return _global_session + def _get_multivalued_slots(schema: SchemaDefinition) -> List[SlotDefinitionName]: return [s.name for s in schema.slots.values() if s.multivalued] @@ -50,9 +71,18 @@ def bulkload_file(f, url = f'{base_url}/{core}/update/cbor?commit={commit_param}' else: raise Exception(f'Unknown format {format}') - command = ['curl', url, '-T', f'{f}', '-X', 'POST', '-H', f'Content-type:{ct}'] - print(command) - subprocess.run(command) + # Use direct HTTP with connection pooling for better performance + try: + session = get_http_session() + with open(f, 'rb') as file_data: + response = session.post(url, data=file_data, headers={'Content-Type': ct}, timeout=300) + print(f"Uploaded {f}: {response.status_code}") + if response.status_code != 200: + print(f"Error response: {response.text}") + return response.status_code == 200 + except Exception as e: + print(f"Error uploading {f}: {e}") + return False def csv_to_cbor_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: @@ -94,12 +124,60 @@ def csv_to_cbor_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_f return 0 +def process_and_upload_chunk(csv_file: str, chunk_start: int, chunk_size: int, + base_url: str, core: str, schema: SchemaDefinition, + format: str, processor: str) -> int: + """ + Process a chunk (create temp file) and upload it to Solr in one parallel task + """ + temp_file = None + try: + if format == 'cbor': + temp_file = tempfile.NamedTemporaryFile(suffix='.cbor', delete=False) + temp_file.close() + docs_processed = csv_to_cbor_chunk(csv_file, chunk_start, chunk_size, temp_file.name) + else: + temp_file = tempfile.NamedTemporaryFile(suffix='.csv', delete=False) + temp_file.close() + docs_processed = _create_csv_chunk(csv_file, chunk_start, chunk_size, temp_file.name) + + if docs_processed > 0: + success = bulkload_file( + temp_file.name, + format=format, + base_url=base_url, + core=core, + schema=schema, + processor=processor, + commit=False + ) + if success: + print(f"Chunk {chunk_start}-{chunk_start+chunk_size}: {docs_processed} docs uploaded") + return docs_processed + else: + print(f"Failed to upload chunk {chunk_start}-{chunk_start+chunk_size}") + return 0 + else: + return 0 + + except Exception as e: + print(f"Error processing chunk {chunk_start}: {e}") + return 0 + finally: + # Clean up temp file + if temp_file and os.path.exists(temp_file.name): + try: + os.unlink(temp_file.name) + except: + pass + + def bulkload_chunked(csv_file: str, base_url: str, core: str, schema: SchemaDefinition, - chunk_size: int = 100000, - max_workers: int = 4, + chunk_size: int = 500000, + max_workers: int = 8, format: str = 'csv', processor: str = None) -> int: """ @@ -126,74 +204,39 @@ def bulkload_chunked(csv_file: str, """ total_rows = conn.execute(count_query).fetchone()[0] conn.close() - print(f"Processing {total_rows} rows in chunks of {chunk_size}") + print(f"Processing {total_rows} rows in chunks of {chunk_size} with {max_workers} parallel workers") total_loaded = 0 - temp_files = [] - try: - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [] - - for chunk_start in range(0, total_rows, chunk_size): - actual_chunk_size = min(chunk_size, total_rows - chunk_start) - - if format == 'cbor': - # Convert chunk to CBOR - temp_file = tempfile.NamedTemporaryFile(suffix='.cbor', delete=False) - temp_file.close() - temp_files.append(temp_file.name) - - future = executor.submit( - csv_to_cbor_chunk, - csv_file, - chunk_start, - actual_chunk_size, - temp_file.name - ) - futures.append((future, temp_file.name, 'cbor')) - else: - # For CSV, create chunk file - temp_file = tempfile.NamedTemporaryFile(suffix='.csv', delete=False) - temp_file.close() - temp_files.append(temp_file.name) - - future = executor.submit( - _create_csv_chunk, - csv_file, - chunk_start, - actual_chunk_size, - temp_file.name - ) - futures.append((future, temp_file.name, 'csv')) + # Submit all chunk processing and upload tasks in parallel + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + + for chunk_start in range(0, total_rows, chunk_size): + actual_chunk_size = min(chunk_size, total_rows - chunk_start) - # Process chunks as they complete - for future, temp_file, file_format in futures: - try: - docs_processed = future.result() - if docs_processed > 0: - # Upload the chunk - bulkload_file( - temp_file, - format=file_format, - base_url=base_url, - core=core, - schema=schema, - processor=processor, - commit=False - ) - total_loaded += docs_processed - print(f"Loaded chunk: {docs_processed} docs (Total: {total_loaded})") - except Exception as e: - print(f"Error processing chunk {temp_file}: {e}") - - finally: - # Clean up temporary files - for temp_file in temp_files: + # Submit the entire process+upload as one parallel task + future = executor.submit( + process_and_upload_chunk, + csv_file, + chunk_start, + actual_chunk_size, + base_url, + core, + schema, + format, + processor + ) + futures.append(future) + + # Process results as they complete (truly parallel!) + for future in as_completed(futures): try: - os.unlink(temp_file) - except: - pass + docs_loaded = future.result() + total_loaded += docs_loaded + print(f"Progress: {total_loaded}/{total_rows} documents loaded") + except Exception as e: + print(f"Error in parallel chunk processing: {e}") return total_loaded From df59eb2bb024b7838e7f7be564d1779b21998709 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 10:44:28 -0700 Subject: [PATCH 07/19] Add detailed error logging for Solr configuration failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Show actual error response text when Solr configuration requests fail, instead of just HTTP status codes. This will help debug issues like the 400 error when setting RAM buffer size. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/cli.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index 1a6fd4d..90cf2ed 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -55,11 +55,11 @@ def main(verbose: int, quiet: bool): @click.option('--processor', '-p', help='Processor argument to pass when bulk loading to Solr') @click.option('--chunk-size', '-c', - default=100000, + default=500000, show_default=True, help='Number of rows per chunk for large files') @click.option('--parallel-workers', '-w', - default=4, + default=8, show_default=True, help='Number of parallel workers for chunked loading') @click.option('--chunked/--no-chunked', @@ -285,7 +285,11 @@ def configure_solr_performance(url, core, ram_buffer_mb=2048, disable_autocommit "updateHandler.autoSoftCommit.maxTime": -1 } }) - print(f"Disabled autocommit: {response.status_code}") + if response.status_code == 200: + print(f"Disabled autocommit: {response.status_code}") + else: + print(f"Failed to disable autocommit: {response.status_code}") + print(f"Error response: {response.text}") # Set RAM buffer size response = requests.post(config_url, @@ -295,7 +299,11 @@ def configure_solr_performance(url, core, ram_buffer_mb=2048, disable_autocommit "updateHandler.indexConfig.ramBufferSizeMB": ram_buffer_mb } }) - print(f"Set RAM buffer to {ram_buffer_mb}MB: {response.status_code}") + if response.status_code == 200: + print(f"Set RAM buffer to {ram_buffer_mb}MB: {response.status_code}") + else: + print(f"Failed to set RAM buffer to {ram_buffer_mb}MB: {response.status_code}") + print(f"Error response: {response.text}") def commit_solr(url, core): From 551043b95988c859e3a9e462f70439425ea90426 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 11:01:25 -0700 Subject: [PATCH 08/19] Remove CBOR support and fix RAM buffer configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove CBOR format (not supported in Solr 8.x) - Replace with CSV to JSON conversion for performance comparison - Fix RAM buffer configuration to use Docker environment variables - Remove failing HTTP API call for RAM buffer setting - Add --ram-buffer-mb option to start-server command - Remove cbor2 dependency Now works with Solr 8.x and properly configures RAM buffer via SOLR_OPTS. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- index.md | 123 +++++ kitchen_sink.md | 18 + linkml_solr/cli.py | 27 +- linkml_solr/solrschemagen.py | 2 +- linkml_solr/utils/solr_bulkload.py | 23 +- model.yaml | 781 +++++++++++++++++++++++++++++ pyproject.toml | 1 - similarity.yaml | 217 ++++++++ test-docker-env | 0 9 files changed, 1161 insertions(+), 31 deletions(-) create mode 100644 index.md create mode 100644 kitchen_sink.md create mode 100644 model.yaml create mode 100644 similarity.yaml create mode 100644 test-docker-env diff --git a/index.md b/index.md new file mode 100644 index 0000000..2945620 --- /dev/null +++ b/index.md @@ -0,0 +1,123 @@ +# kitchen_sink + +Kitchen Sink Schema + +This schema does not do anything useful. It exists to test all features of linkml. + +This particular text field exists to demonstrate markdown within a text field: + +Lists: + + * a + * b + * c + +And links, e.g to [Person](Person.md) + +URI: https://w3id.org/linkml/tests/kitchen_sink + +Name: kitchen_sink + + + +## Classes + +| Class | Description | +| --- | --- | +| [Activity](Activity.md) | a provence-generating activity | +| [Address](Address.md) | None | +| [Agent](Agent.md) | a provence-generating agent | +| [Dataset](Dataset.md) | None | +| [Event](Event.md) | None | +|         [BirthEvent](BirthEvent.md) | None | +|         [EmploymentEvent](EmploymentEvent.md) | None | +|         [MarriageEvent](MarriageEvent.md) | None | +|         [MedicalEvent](MedicalEvent.md) | None | +| [HasAliases](HasAliases.md) | None | +| [Organization](Organization.md) | None | +|         [Company](Company.md) | None | +| [Person](Person.md) | None | +| [Place](Place.md) | None | +| [Relationship](Relationship.md) | None | +|         [FamilialRelationship](FamilialRelationship.md) | None | +| [WithLocation](WithLocation.md) | None | + + + +## Slots + +| Slot | Description | +| --- | --- | +| [acted_on_behalf_of](acted_on_behalf_of.md) | | +| [activities](activities.md) | | +| [activity_set](activity_set.md) | | +| [addresses](addresses.md) | | +| [age_in_years](age_in_years.md) | | +| [agent_set](agent_set.md) | | +| [aliases](aliases.md) | | +| [ceo](ceo.md) | | +| [city](city.md) | | +| [companies](companies.md) | | +| [description](description.md) | | +| [employed_at](employed_at.md) | | +| [ended_at_time](ended_at_time.md) | | +| [has_birth_event](has_birth_event.md) | | +| [has_employment_history](has_employment_history.md) | | +| [has_familial_relationships](has_familial_relationships.md) | | +| [has_marriage_history](has_marriage_history.md) | | +| [has_medical_history](has_medical_history.md) | | +| [id](id.md) | | +| [in_location](in_location.md) | | +| [is_current](is_current.md) | | +| [married_to](married_to.md) | | +| [name](name.md) | | +| [persons](persons.md) | | +| [related_to](related_to.md) | | +| [started_at_time](started_at_time.md) | | +| [street](street.md) | | +| [type](type.md) | | +| [used](used.md) | | +| [was_associated_with](was_associated_with.md) | | +| [was_generated_by](was_generated_by.md) | | +| [was_informed_by](was_informed_by.md) | | + + +## Enumerations + +| Enumeration | Description | +| --- | --- | +| [DiagnosisType](DiagnosisType.md) | | +| [FamilialRelationshipType](FamilialRelationshipType.md) | | + + +## Types + +| Type | Description | +| --- | --- | +| [Boolean](Boolean.md) | A binary (true or false) value | +| [Curie](Curie.md) | a compact URI | +| [Date](Date.md) | a date (year, month and day) in an idealized calendar | +| [DateOrDatetime](DateOrDatetime.md) | Either a date or a datetime | +| [Datetime](Datetime.md) | The combination of a date and time | +| [Decimal](Decimal.md) | A real number with arbitrary precision that conforms to the xsd:decimal speci... | +| [Double](Double.md) | A real number that conforms to the xsd:double specification | +| [Float](Float.md) | A real number that conforms to the xsd:float specification | +| [Integer](Integer.md) | An integer | +| [Jsonpath](Jsonpath.md) | A string encoding a JSON Path | +| [Jsonpointer](Jsonpointer.md) | A string encoding a JSON Pointer | +| [Ncname](Ncname.md) | Prefix part of CURIE | +| [Nodeidentifier](Nodeidentifier.md) | A URI, CURIE or BNODE that represents a node in a model | +| [Objectidentifier](Objectidentifier.md) | A URI or CURIE that represents an object in the model | +| [Sparqlpath](Sparqlpath.md) | A string encoding a SPARQL Property Path | +| [String](String.md) | A character string | +| [Time](Time.md) | A time object represents a (local) time of day, independent of any particular... | +| [Uri](Uri.md) | a complete URI | +| [Uriorcurie](Uriorcurie.md) | a URI or a CURIE | + + +## Subsets + +| Subset | Description | +| --- | --- | +| [SubsetA](SubsetA.md) | test subset A | +| [SubsetB](SubsetB.md) | test subset B | diff --git a/kitchen_sink.md b/kitchen_sink.md new file mode 100644 index 0000000..be6229b --- /dev/null +++ b/kitchen_sink.md @@ -0,0 +1,18 @@ +# kitchen_sink + +Kitchen Sink Schema + +This schema does not do anything useful. It exists to test all features of linkml. + +This particular text field exists to demonstrate markdown within a text field: + +Lists: + + * a + * b + * c + +And links, e.g to [Person](Person.md) + +URI: https://w3id.org/linkml/tests/kitchen_sink + diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index 90cf2ed..ddffba1 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -45,7 +45,7 @@ def main(verbose: int, quiet: bool): show_default=True, help='solr core.') @click.option('--format', '-f', - type=click.Choice(['csv', 'json', 'cbor']), + type=click.Choice(['csv', 'json']), default='csv', show_default=True, help='input format.') @@ -96,7 +96,7 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w for f in files: print(f"Processing file: {f}") - if chunked and format in ['csv', 'cbor']: + if chunked and format in ['csv', 'json']: # Use chunked loading for large files docs_loaded = bulkload_chunked( csv_file=f, @@ -167,7 +167,11 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w default='3g', show_default=True, help='JVM heap size (e.g., 2g, 4g)') -def start_server(schema, kill, container, url, core, port, sleep: int, create_schema, memory, heap_size): +@click.option('--ram-buffer-mb', + default=512, + show_default=True, + help='Solr RAM buffer size in MB for bulk loading') +def start_server(schema, kill, container, url, core, port, sleep: int, create_schema, memory, heap_size, ram_buffer_mb): """ Starts a solr server (via Docker) """ @@ -185,6 +189,8 @@ def start_server(schema, kill, container, url, core, port, sleep: int, create_sc memory, '-e', f'SOLR_JAVA_MEM=-Xms{heap_size} -Xmx{heap_size}', + '-e', + f'SOLR_OPTS=-Dsolr.ramBufferSizeMB={ram_buffer_mb}', 'solr:8', 'solr-precreate', core] @@ -291,19 +297,8 @@ def configure_solr_performance(url, core, ram_buffer_mb=2048, disable_autocommit print(f"Failed to disable autocommit: {response.status_code}") print(f"Error response: {response.text}") - # Set RAM buffer size - response = requests.post(config_url, - headers={'Content-type': 'application/json'}, - json={ - "set-property": { - "updateHandler.indexConfig.ramBufferSizeMB": ram_buffer_mb - } - }) - if response.status_code == 200: - print(f"Set RAM buffer to {ram_buffer_mb}MB: {response.status_code}") - else: - print(f"Failed to set RAM buffer to {ram_buffer_mb}MB: {response.status_code}") - print(f"Error response: {response.text}") + # Note: RAM buffer size must be set via environment variables when starting Solr + print(f"Note: RAM buffer ({ram_buffer_mb}MB) should be set via SOLR_OPTS environment variable when starting Solr") def commit_solr(url, core): diff --git a/linkml_solr/solrschemagen.py b/linkml_solr/solrschemagen.py index f57f1b1..c342e1a 100644 --- a/linkml_solr/solrschemagen.py +++ b/linkml_solr/solrschemagen.py @@ -9,7 +9,7 @@ from linkml.utils.generator import Generator, shared_arguments -from linkml_solr.solrschema import * +from linkml_solr.solrschcema import * # Map from underlying python data type to solr equivalent # Note: The underlying types are a union of any built-in python datatype + any type defined in diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 0f54c56..3a5b36d 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -2,8 +2,8 @@ import logging import subprocess import duckdb -import cbor2 import tempfile +import json import os from concurrent.futures import ThreadPoolExecutor, as_completed import threading @@ -66,9 +66,6 @@ def bulkload_file(f, ct = 'application/csv' elif format == 'json': ct = 'application/json' - elif format == 'cbor': - ct = 'application/cbor' - url = f'{base_url}/{core}/update/cbor?commit={commit_param}' else: raise Exception(f'Unknown format {format}') # Use direct HTTP with connection pooling for better performance @@ -85,14 +82,14 @@ def bulkload_file(f, return False -def csv_to_cbor_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: +def csv_to_json_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: """ - Convert a chunk of CSV/TSV data to CBOR format using DuckDB + Convert a chunk of CSV/TSV data to JSON format using DuckDB :param csv_file: Input CSV/TSV file path :param chunk_start: Starting row (0-based) :param chunk_size: Number of rows to process - :param output_file: Output CBOR file path + :param output_file: Output JSON file path :return: Number of documents processed """ try: @@ -111,11 +108,11 @@ def csv_to_cbor_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_f result = conn.execute(query).fetchall() columns = [desc[0] for desc in conn.description] - # Convert to list of dicts + # Convert to list of dicts for JSON docs = [dict(zip(columns, row)) for row in result] - with open(output_file, 'wb') as f: - cbor2.dump(docs, f) + with open(output_file, 'w') as f: + json.dump(docs, f) conn.close() return len(docs) @@ -132,10 +129,10 @@ def process_and_upload_chunk(csv_file: str, chunk_start: int, chunk_size: int, """ temp_file = None try: - if format == 'cbor': - temp_file = tempfile.NamedTemporaryFile(suffix='.cbor', delete=False) + if format == 'json': + temp_file = tempfile.NamedTemporaryFile(suffix='.json', delete=False) temp_file.close() - docs_processed = csv_to_cbor_chunk(csv_file, chunk_start, chunk_size, temp_file.name) + docs_processed = csv_to_json_chunk(csv_file, chunk_start, chunk_size, temp_file.name) else: temp_file = tempfile.NamedTemporaryFile(suffix='.csv', delete=False) temp_file.close() diff --git a/model.yaml b/model.yaml new file mode 100644 index 0000000..b66b48b --- /dev/null +++ b/model.yaml @@ -0,0 +1,781 @@ +id: https://w3id.org/monarch/monarch-py +name: monarch-py +description: Data models for the Monarch Initiative data access library +prefixes: + linkml: https://w3id.org/linkml/ + biolink: https://w3id.org/biolink/vocab/ +imports: + - linkml:types + - similarity +default_range: string + +enums: + AssociationDirectionEnum: + description: >- + The directionality of an association as it relates to a specified entity, with edges being categorized + as incoming or outgoing + permissible_values: + incoming: + description: >- + An association for which a specified entity is the object or part of the object closure + outgoing: + description: >- + An association for which a specified entity is the subject or part of the subject closure + +classes: + Association: + slots: + - id + - category + - subject + - original_subject + - subject_namespace + - subject_category + - subject_closure + - subject_label + - subject_closure_label + - subject_taxon + - subject_taxon_label + - predicate + - object + - original_object + - object_namespace + - object_category + - object_closure + - object_label + - object_closure_label + - object_taxon + - object_taxon_label + - primary_knowledge_source + - aggregator_knowledge_source + - negated + - pathway + - evidence_count + - knowledge_level + - agent_type + - has_evidence + - has_evidence_links + - has_count + - has_total + - has_percentage + - has_quotient + - grouping_key + - provided_by + - provided_by_link + - publications + - publications_links + - frequency_qualifier + - onset_qualifier + - sex_qualifier + - stage_qualifier + - qualifiers + - qualifiers_label + - qualifiers_namespace + - qualifiers_category + - qualifiers_closure + - qualifiers_closure_label + - qualifier + - qualifier_label + - qualifier_namespace + - qualifier_category + - qualifier_closure + - qualifier_closure_label + - frequency_qualifier_label + - frequency_qualifier_namespace + - frequency_qualifier_category + - frequency_qualifier_closure + - frequency_qualifier_closure_label + - onset_qualifier_label + - onset_qualifier_namespace + - onset_qualifier_category + - onset_qualifier_closure + - onset_qualifier_closure_label + - sex_qualifier_label + - sex_qualifier_namespace + - sex_qualifier_category + - sex_qualifier_closure + - sex_qualifier_closure_label + - stage_qualifier_label + - stage_qualifier_namespace + - stage_qualifier_category + - stage_qualifier_closure + - stage_qualifier_closure_label + AssociationCount: + is_a: FacetValue + slots: + - category + slot_usage: + category: + multivalued: false + AssociationCountList: + description: Container class for a list of association counts + slots: + - items + slot_usage: + items: + range: AssociationCount + AssociationResults: + is_a: Results + slots: + - items + slot_usage: + items: + range: Association + CompactAssociation: + slots: + - category + - subject + - subject_label + - predicate + - object + - object_label + - negated + CompactAssociationResults: + is_a: Results + slots: + - items + slot_usage: + items: + range: CompactAssociation + AssociationTableResults: + is_a: Results + slots: + - items + slot_usage: + items: + range: DirectionalAssociation + AssociationTypeMapping: + description: >- + A data class to hold the necessary information to produce association type counts for given + entities with appropriate directional labels + slots: + - subject_label + - object_label + - symmetric + - category + slot_usage: + subject_label: + description: A label to describe the subjects of the association type as a whole for use in the UI + object_label: + description: A label to describe the objects of the association type as a whole for use in the UI + symmetric: + description: >- + Whether the association type is symmetric, meaning that the subject and object labels should be + interchangeable + ifabsent: false + required: true + category: + description: The biolink category to use in queries for this association type + required: true + multivalued: false + CategoryGroupedAssociationResults: + is_a: Results + slots: + - counterpart_category + - items + slot_usage: + items: + range: Association + DirectionalAssociation: + is_a: Association + description: >- + An association that gives it's direction relative to a specified entity + slots: + - direction + ExpandedCurie: + description: A curie bundled along with its expanded url + slots: + - id + - url + Entity: + description: Represents an Entity in the Monarch KG data model + slots: + - id + - category + - name + - full_name + - deprecated + - description + - xref + - provided_by + - in_taxon + - in_taxon_label + - symbol + - synonym + - uri + - iri + - namespace + - has_phenotype + - has_phenotype_label + - has_phenotype_closure + - has_phenotype_closure_label + - has_phenotype_count + EntityResults: + is_a: Results + slots: + - items + slot_usage: + items: + range: Entity + FacetValue: + slots: + - label + - count + FacetField: + slots: + - label + - facet_values + HistoPheno: + slots: + - id + - items + slot_usage: + items: + range: HistoBin + HistoBin: + is_a: FacetValue + slots: + - id + Mapping: + description: >- + A minimal class to hold a SSSOM mapping + slots: + - subject_id + - subject_label + - predicate_id + - object_id + - object_label + - mapping_justification + - id + MappingResults: + description: SSSOM Mappings returned as a results collection + is_a: Results + slots: + - items + slot_usage: + items: + range: Mapping + MultiEntityAssociationResults: + is_a: Results + slots: + - id + - name + - associated_categories + Node: + description: UI container class extending Entity with additional information + is_a: Entity + slots: + - in_taxon + - in_taxon_label + - inheritance + - causal_gene + - causes_disease + - mappings + - external_links + - provided_by_link + - association_counts + - node_hierarchy + NodeHierarchy: + slots: + - super_classes + - sub_classes + Release: + description: >- + A class to hold information about a release of the Monarch KG + slots: + - version + - url + - kg + - sqlite + - solr + - neo4j + - metadata + - graph_stats + - qc_report + Results: + abstract: true + slots: + - limit + - offset + - total + SearchResult: + is_a: Entity + slots: + - highlight + - score + slot_usage: + category: + required: true + name: + required: true + SearchResults: + is_a: Results + slots: + - items + - facet_fields + - facet_queries + slot_usage: + items: + range: SearchResult + TextAnnotationResult: + slots: + - text + - tokens + - start + - end + +slots: + aggregator_knowledge_source: + multivalued: true + association_counts: + range: AssociationCount + multivalued: true + inlined_as_list: true + required: true + associated_categories: + range: CategoryGroupedAssociationResults + multivalued: true + inlined_as_list: true + required: true + category: + multivalued: false + causal_gene: + description: >- + A list of genes that are known to be causally associated with a disease + range: Entity + multivalued: true + inlined_as_list: true + causes_disease: + description: >- + A list of diseases that are known to be causally associated with a gene + range: Entity + multivalued: true + inlined_as_list: true + count: + description: count of documents + range: integer + counterpart_category: + description: >- + The category of the counterpart entity in a given association, + eg. the category of the entity that is not the subject + range: string + deprecated: + description: >- + A boolean flag indicating that an entity is no longer considered current or valid. + range: boolean + exact_mappings: + - oboInOwl:ObsoleteClass + description: + range: string + direction: + description: >- + The directionality of the association relative to a given entity for an association_count. + If the entity is the subject or in the subject closure, the direction is forwards, if it is + the object or in the object closure, the direction is backwards. + range: AssociationDirectionEnum + required: true + evidence_count: + description: count of supporting documents, evidence codes, and sources supplying evidence + range: integer + knowledge_level: + description: >- + Describes the level of knowledge expressed in a statement, based on the + reasoning or analysis methods used to generate the statement, or the + scope or specificity of what the statement expresses to be true. + slot_uri: biolink:knowledge_level + notes: >- + The range in this schema is represented as a string, but is constrained + to values from biolink:KnowledgeLevelEnum at ingest time + range: string + multivalued: false + required: true + agent_type: + description: >- + Describes the high-level category of agent who originally generated a + statement of knowledge or other type of information. + slot_uri: biolink:agent_type + notes: >- + The range in this schema is represented as a string, but is constrained + to values from biolink:AgentTypeEnum at ingest time + range: string + multivalued: false + required: true + external_links: + description: ExpandedCurie with id and url for xrefs + range: ExpandedCurie + multivalued: true + inlined_as_list: true + facet_fields: + description: Collection of facet field responses with the field values and counts + inlined: true + inlined_as_list: true + multivalued: true + range: FacetField + facet_queries: + description: Collection of facet query responses with the query string values and counts + inlined: true + inlined_as_list: true + multivalued: true + range: FacetValue + facet_values: + description: Collection of FacetValue label/value instances belonging to a FacetField + inlined: true + inlined_as_list: true + multivalued: true + range: FacetValue + frequency_qualifier: + range: string + full_name: + description: The long form name of an entity + range: string + grouping_key: + description: A concatenation of fields used to group associations with the same essential/defining properties + range: string + has_count: + description: count of out of has_total representing a frequency + range: integer + has_total: + description: total, devided by has_count, representing a frequency + range: integer + has_percentage: + description: percentage, which may be calculated from has_count and has_total, as 100 * quotient or provided directly, rounded to the integer level + range: float + has_quotient: + description: quotient, which should be 1/100 of has_percentage + range: float + has_evidence: + range: string + multivalued: true + has_evidence_links: + description: List of ExpandedCuries with id and url for evidence + range: ExpandedCurie + multivalued: true + inlined: true + inlined_as_list: true + has_phenotype: + description: >- + A list of phenotype identifiers that are known to be associated with this entity + range: string + multivalued: true + inlined_as_list: true + has_phenotype_label: + description: >- + A list of phenotype labels that are known to be associated with this entity + range: string + multivalued: true + inlined_as_list: true + has_phenotype_closure: + description: >- + A list of phenotype identifiers that are known to be associated with this entity expanded to include all ancestors + range: string + multivalued: true + inlined_as_list: true + has_phenotype_closure_label: + description: >- + A list of phenotype labels that are known to be associated with this entity expanded to include all ancestors + range: string + multivalued: true + inlined_as_list: true + has_phenotype_count: + description: >- + A count of the number of phenotypes that are known to be associated with this entity + range: integer + highlight: + description: matching text snippet containing html tags + range: string + id: + identifier: true + range: string + required: true + in_taxon: + description: The biolink taxon that the entity is in the closure of. + range: string + in_taxon_label: + description: The label of the biolink taxon that the entity is in the closure of. + range: string + inheritance: + range: Entity + inlined: true + items: + description: A collection of items, with the type to be overriden by slot_usage + range: string + inlined: true + inlined_as_list: true + multivalued: true + required: true + knowledge_source: + multivalued: true + label: + range: string + required: true + limit: + description: number of items to return in a response + range: integer + required: true + name: + range: string + namespace: + range: string + description: The namespace/prefix portion of this entity's identifier + negated: + range: boolean + node_hierarchy: + range: NodeHierarchy + inlined: true + object: + range: string + required: true + offset: + description: offset into the total number of items + range: integer + required: true + onset_qualifier: + range: string + original_object: + range: string + original_subject: + range: string + pathway: + range: string + predicate: + multivalued: false + range: string + required: true + primary_knowledge_source: + range: string + provided_by_link: + description: A link to the docs for the knowledge source that provided the node/edge. + range: ExpandedCurie + inlined: true + provided_by: + range: string + publications: + multivalued: true + publications_links: + description: List of ExpandedCuries with id and url for publications + range: ExpandedCurie + multivalued: true + inlined: true + inlined_as_list: true + score: + range: float + sex_qualifier: + range: string + stage_qualifier: + range: string + subject: + range: string + required: true + sub_classes: + range: Entity + multivalued: true + inlined: true + inlined_as_list: true + required: true + super_classes: + range: Entity + multivalued: true + inlined: true + inlined_as_list: true + required: true + symbol: + range: string + symmetric: + description: >- + Whether the association type is symmetric, i.e. the subject and object labels are interchangeable. + range: boolean + synonym: + multivalued: true + total: + description: total number of items matching a query + range: integer + required: true + xref: + multivalued: true + range: string + uri: + description: The URI of the entity + url: + range: string + iri: + range: string + subject_label: + is_a: name + description: The name of the subject entity + subject_namespace: + range: string + description: The namespace/prefix of the subject entity + subject_category: + is_a: category + description: The category of the subject entity + subject_closure: + multivalued: true + description: Field containing subject id and the ids of all of it's ancestors + subject_closure_label: + multivalued: true + description: Field containing subject name and the names of all of it's ancestors + subject_taxon: + is_a: in_taxon + subject_taxon_label: + is_a: in_taxon_label + object_label: + is_a: name + description: The name of the object entity + object_namespace: + range: string + description: The namespace/prefix of the object entity + object_category: + is_a: category + description: The category of the object entity + object_closure: + multivalued: true + description: Field containing object id and the ids of all of it's ancestors + object_closure_label: + multivalued: true + description: Field containing object name and the names of all of it's ancestors + object_taxon: + is_a: in_taxon + object_taxon_label: + is_a: in_taxon_label + qualifiers: + multivalued: true + qualifiers_label: + is_a: name + description: The name of the frequency_qualifier entity + qualifiers_namespace: + range: string + description: The namespace/prefix of the frequency_qualifier entity + qualifiers_category: + is_a: category + description: The category of the frequency_qualifier entity + qualifiers_closure: + multivalued: true + description: Field containing frequency_qualifier id and the ids of all of it's ancestors + qualifiers_closure_label: + multivalued: true + description: Field containing frequency_qualifier name and the names of all of it's ancestors + qualifier: + multivalued: true + qualifier_label: + is_a: name + description: The name of the frequency_qualifier entity + qualifier_namespace: + range: string + description: The namespace/prefix of the frequency_qualifier entity + qualifier_category: + is_a: category + description: The category of the frequency_qualifier entity + qualifier_closure: + multivalued: true + description: Field containing frequency_qualifier id and the ids of all of it's ancestors + qualifier_closure_label: + multivalued: true + description: Field containing frequency_qualifier name and the names of all of it's ancestors + frequency_qualifier_label: + is_a: name + description: The name of the frequency_qualifier entity + frequency_qualifier_namespace: + range: string + description: The namespace/prefix of the frequency_qualifier entity + frequency_qualifier_category: + is_a: category + description: The category of the frequency_qualifier entity + frequency_qualifier_closure: + multivalued: true + description: Field containing frequency_qualifier id and the ids of all of it's ancestors + frequency_qualifier_closure_label: + multivalued: true + description: Field containing frequency_qualifier name and the names of all of it's ancestors + onset_qualifier_label: + is_a: name + description: The name of the onset_qualifier entity + onset_qualifier_namespace: + range: string + description: The namespace/prefix of the onset_qualifier entity + onset_qualifier_category: + is_a: category + description: The category of the onset_qualifier entity + onset_qualifier_closure: + multivalued: true + description: Field containing onset_qualifier id and the ids of all of it's ancestors + onset_qualifier_closure_label: + multivalued: true + description: Field containing onset_qualifier name and the names of all of it's ancestors + sex_qualifier_label: + is_a: name + description: The name of the sex_qualifier entity + sex_qualifier_namespace: + range: string + description: The namespace/prefix of the sex_qualifier entity + sex_qualifier_category: + is_a: category + description: The category of the sex_qualifier entity + sex_qualifier_closure: + multivalued: true + description: Field containing sex_qualifier id and the ids of all of it's ancestors + sex_qualifier_closure_label: + multivalued: true + description: Field containing sex_qualifier name and the names of all of it's ancestors + stage_qualifier_label: + is_a: name + description: The name of the stage_qualifier entity + stage_qualifier_namespace: + range: string + description: The namespace/prefix of the stage_qualifier entity + stage_qualifier_category: + is_a: category + description: The category of the stage_qualifier entity + stage_qualifier_closure: + multivalued: true + description: Field containing stage_qualifier id and the ids of all of it's ancestors + stage_qualifier_closure_label: + multivalued: true + description: Field containing stage_qualifier name and the names of all of it's ancestors + + # sssom slots + mappings: + description: List of ExpandedCuries with id and url for mapped entities + range: ExpandedCurie + multivalued: true + inlined_as_list: true + subject_id: + range: string + required: true + # subject label is already included in this schema + predicate_id: + range: string + required: true + object_id: + range: string + required: true + # object label is already included in this schema + mapping_justification: + range: string + + # Text annotation + text: + description: text without tokens + range: string + inlined: true + tokens: + description: A collection of entities or concepts + range: Entity + inlined: true + inlined_as_list: true + multivalued: true + start: + description: start position of the annotation + range: integer + end: + description: end position of the annotation + range: integer + + # release slots + version: string + kg: string + sqlite: string + solr: string + neo4j: string + metadata: string + graph_stats: string + qc_report: string \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e462e67..aa24888 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,6 @@ linkml-runtime = ">=1.7.4" pysolr = "^3.9.0" linkml-dataops = "^0.1.0" jsonasobj = "^1.3.1" -cbor2 = "^5.4.0" duckdb = "*" requests = "^2.25.0" diff --git a/similarity.yaml b/similarity.yaml new file mode 100644 index 0000000..391baee --- /dev/null +++ b/similarity.yaml @@ -0,0 +1,217 @@ +id: https://w3id.org/monarch/monarch-py-similarity +name: monarch-py-similarity +description: Data models for the Monarch Initiative data access library +prefixes: + linkml: https://w3id.org/linkml/ +imports: + - linkml:types +default_range: string + +# This is a copy and paste from https://w3id.org/oak/similarity.yaml with some slots commented out because they +# conflicted with slots in the monarch-py schema + +classes: + PairwiseSimilarity: + abstract: true + description: >- + Abstract grouping for representing individual pairwise similarities + + TermPairwiseSimilarity: + is_a: PairwiseSimilarity + description: >- + A simple pairwise similarity between two atomic concepts/terms + slots: + - subject_id + - subject_label + - subject_source + - object_id + - object_label + - object_source + - ancestor_id + - ancestor_label + - ancestor_source + - object_information_content + - subject_information_content + - ancestor_information_content + - jaccard_similarity + - cosine_similarity + - dice_similarity + - phenodigm_score + + TermSetPairwiseSimilarity: + is_a: PairwiseSimilarity + description: >- + A simple pairwise similarity between two sets of concepts/terms + slots: + - subject_termset + - object_termset + - subject_best_matches + - object_best_matches + - average_score + - best_score + - metric + + TermInfo: + attributes: + id: + identifier: true + label: + slot_uri: rdfs:label + + BestMatch: + attributes: + match_source: + identifier: true + comments: + - note that the match_source is either the subject or the object + match_source_label: + match_target: + description: the entity matches + match_target_label: + score: + range: float + required: true + match_subsumer: + range: uriorcurie + match_subsumer_label: + similarity: + range: TermPairwiseSimilarity + required: true + + SemsimSearchResult: + slots: + - subject + - score + - similarity + slot_usage: + subject: + range: Entity + inlined: true + +types: + ZeroToOne: + typeof: float + minimum_value: 0 + maximum_value: 0 + NonNegativeFloat: + typeof: float + minimum_value: 0 + NegativeLogValue: + typeof: float + minimum_value: 0 + ItemCount: + typeof: integer + minimum_value: 0 + +slots: + similarity: + range: TermSetPairwiseSimilarity + # subject_id: + # slot_uri: sssom:subject_id + # required: true + # range: uriorcurie + # description: The first of the two entities being compared + # Excluded, since it conflicts with subject_label from this schema + # subject_label: + # slot_uri: sssom:subject_label + # description: the label or name for the first entity + subject_source: + slot_uri: sssom:subject_source + description: the source for the first entity + # object_id: + # slot_uri: sssom:object_id + # range: uriorcurie + # description: The second of the two entities being compared + # Excluded, since it conflicts with object_label from this schema + # object_label: + # slot_uri: sssom:object_label + # description: the label or name for the second entity + object_source: + slot_uri: sssom:object_source + description: the source for the second entity + ancestor_id: + range: uriorcurie + description: >- + the most recent common ancestor of the two compared entities. If there are multiple MRCAs then + the most informative one is selected + todos: + - decide on what to do when there are multiple possible ancestos + ancestor_label: + description: the name or label of the ancestor concept + ancestor_source: + # Excluded, conflicts with score from this schema + # score: + # abstract: true + # description: Abstract base slot for different kinds of scores + information_content: + abstract: true + aliases: + - IC + is_a: score + range: NegativeLogValue + description: The IC is the negative log of the probability of the concept + subject_information_content: + is_a: information_content + description: The IC of the subject + object_information_content: + is_a: information_content + description: The IC of the object + ancestor_information_content: + is_a: information_content + description: The IC of the object + jaccard_similarity: + is_a: score + range: ZeroToOne + description: The number of concepts in the intersection divided by the number in the union + cosine_similarity: + is_a: score + range: float + description: the dot product of two node embeddings divided by the product of their lengths + dice_similarity: + is_a: score + range: ZeroToOne + phenodigm_score: + is_a: score + range: NonNegativeFloat + description: the geometric mean of the jaccard similarity and the information content + equals_expression: sqrt({jaccard_similarity} * {information_content}) + overlap_coefficient: + is_a: score + range: ZeroToOne + subsumes_score: + is_a: score + range: ZeroToOne + subsumed_by_score: + is_a: score + range: ZeroToOne + intersection_count: + is_a: score + range: ItemCount + union_count: + is_a: score + range: ItemCount + # TermSets + subject_termset: + range: TermInfo + multivalued: true + inlined: true + object_termset: + range: TermInfo + multivalued: true + inlined: true + subject_best_matches: + range: BestMatch + multivalued: true + inlined: true + object_best_matches: + range: BestMatch + multivalued: true + inlined: true + metric: + range: uriorcurie + average_score: + range: float + required: false + best_score: + range: float + required: false diff --git a/test-docker-env b/test-docker-env new file mode 100644 index 0000000..e69de29 From ad97051b923958e7f869bc28fc86c89b069041cd Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 11:06:14 -0700 Subject: [PATCH 09/19] Fix typo in import statement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Correct 'solrschcema' to 'solrschema' in solrschemagen.py import. This was causing ModuleNotFoundError when running lsolr commands. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/solrschemagen.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkml_solr/solrschemagen.py b/linkml_solr/solrschemagen.py index c342e1a..f57f1b1 100644 --- a/linkml_solr/solrschemagen.py +++ b/linkml_solr/solrschemagen.py @@ -9,7 +9,7 @@ from linkml.utils.generator import Generator, shared_arguments -from linkml_solr.solrschcema import * +from linkml_solr.solrschema import * # Map from underlying python data type to solr equivalent # Note: The underlying types are a union of any built-in python datatype + any type defined in From 1d6781dfb1dbee4c5c94d22fe88f9913c616dfcf Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 11:10:54 -0700 Subject: [PATCH 10/19] Add timing information to bulkload command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Show total time taken and commit time separately to help compare performance between different formats and settings. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/cli.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index ddffba1..81ba646 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -10,6 +10,7 @@ from linkml_solr import SolrQueryEngine, SolrEndpoint, DEFAULT_CORE, DEFAULT_SOLR_URL from linkml_solr.utils.solr_bulkload import bulkload_file, bulkload_chunked import requests +import time @click.group() @@ -91,6 +92,7 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w configure_solr_performance(url, core, ram_buffer, disable_autocommit=True) total_loaded = 0 + start_time = time.time() try: for f in files: @@ -117,13 +119,21 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w # Commit all changes at the end print("Committing all changes...") + commit_start = time.time() if commit_solr(url, core): + commit_time = time.time() - commit_start + total_time = time.time() - start_time print(f"Successfully committed {total_loaded} documents to Solr") + print(f"Total time: {total_time:.2f}s (commit: {commit_time:.2f}s)") else: + total_time = time.time() - start_time print("Warning: Commit may have failed") + print(f"Total time: {total_time:.2f}s") except Exception as e: + total_time = time.time() - start_time print(f"Error during bulk loading: {e}") + print(f"Total time before error: {total_time:.2f}s") # Try to commit what we have commit_solr(url, core) raise From cff562621307aaa8e4d449dce1657cb488f735f6 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 11:13:52 -0700 Subject: [PATCH 11/19] Add granular timing for preprocessing vs upload phases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Break down timing to show: - Preprocessing time (chunk creation) - Upload time (parallel HTTP uploads) - Total processing time This helps identify whether bottlenecks are in data processing or network/Solr uploads. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/utils/solr_bulkload.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 3a5b36d..7206ea2 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -7,6 +7,7 @@ import os from concurrent.futures import ThreadPoolExecutor, as_completed import threading +import time from linkml_runtime.linkml_model.meta import SchemaDefinition, SlotDefinitionName import requests @@ -204,6 +205,7 @@ def bulkload_chunked(csv_file: str, print(f"Processing {total_rows} rows in chunks of {chunk_size} with {max_workers} parallel workers") total_loaded = 0 + preprocessing_start = time.time() # Submit all chunk processing and upload tasks in parallel with ThreadPoolExecutor(max_workers=max_workers) as executor: @@ -226,6 +228,10 @@ def bulkload_chunked(csv_file: str, ) futures.append(future) + preprocessing_time = time.time() - preprocessing_start + print(f"Preprocessing complete ({preprocessing_time:.2f}s) - starting parallel uploads...") + upload_start = time.time() + # Process results as they complete (truly parallel!) for future in as_completed(futures): try: @@ -234,6 +240,10 @@ def bulkload_chunked(csv_file: str, print(f"Progress: {total_loaded}/{total_rows} documents loaded") except Exception as e: print(f"Error in parallel chunk processing: {e}") + + upload_time = time.time() - upload_start + total_processing_time = time.time() - preprocessing_start + print(f"Upload complete! Processing: {preprocessing_time:.2f}s, Upload: {upload_time:.2f}s, Total: {total_processing_time:.2f}s") return total_loaded From 71a6eb75b010493153e40b8f4a78a7e8ac347f51 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 11:33:21 -0700 Subject: [PATCH 12/19] Add comprehensive throughput metrics (docs/sec) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Show documents per second for: - Upload phase (pure HTTP throughput) - Processing phase (including preprocessing) - Overall end-to-end (including commit) This makes it easy to compare performance across different settings, formats, and optimizations. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/cli.py | 10 ++++--- linkml_solr/utils/solr_bulkload.py | 42 +++++++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index 81ba646..5d0dcb4 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -60,9 +60,9 @@ def main(verbose: int, quiet: bool): show_default=True, help='Number of rows per chunk for large files') @click.option('--parallel-workers', '-w', - default=8, - show_default=True, - help='Number of parallel workers for chunked loading') + default=None, + type=int, + help='Number of parallel workers (default: auto-detect based on CPU cores)') @click.option('--chunked/--no-chunked', default=False, show_default=True, @@ -123,12 +123,16 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w if commit_solr(url, core): commit_time = time.time() - commit_start total_time = time.time() - start_time + overall_docs_per_sec = total_loaded / total_time if total_time > 0 else 0 print(f"Successfully committed {total_loaded} documents to Solr") print(f"Total time: {total_time:.2f}s (commit: {commit_time:.2f}s)") + print(f"Overall throughput: {overall_docs_per_sec:,.0f} docs/sec") else: total_time = time.time() - start_time + overall_docs_per_sec = total_loaded / total_time if total_time > 0 else 0 print("Warning: Commit may have failed") print(f"Total time: {total_time:.2f}s") + print(f"Overall throughput: {overall_docs_per_sec:,.0f} docs/sec") except Exception as e: total_time = time.time() - start_time diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 7206ea2..92fa02f 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -8,6 +8,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import threading import time +import os from linkml_runtime.linkml_model.meta import SchemaDefinition, SlotDefinitionName import requests @@ -35,6 +36,27 @@ def _get_multivalued_slots(schema: SchemaDefinition) -> List[SlotDefinitionName] return [s.name for s in schema.slots.values() if s.multivalued] +def get_optimal_worker_count(max_workers: Optional[int] = None) -> int: + """ + Determine optimal number of parallel workers based on system capabilities + + :param max_workers: Maximum workers to use, None for auto-detection + :return: Optimal number of workers + """ + if max_workers is not None: + return max_workers + + # Get CPU count + cpu_count = os.cpu_count() or 4 # Fallback to 4 if detection fails + + # For I/O bound HTTP uploads, use 2x CPU count but cap at reasonable limit + # This balances parallelism with system resource usage + optimal = min(cpu_count * 2, 16) + + # Minimum of 2 workers for any benefit + return max(optimal, 2) + + def bulkload_file(f, format='csv', base_url=None, @@ -175,7 +197,7 @@ def bulkload_chunked(csv_file: str, core: str, schema: SchemaDefinition, chunk_size: int = 500000, - max_workers: int = 8, + max_workers: Optional[int] = None, format: str = 'csv', processor: str = None) -> int: """ @@ -202,13 +224,21 @@ def bulkload_chunked(csv_file: str, """ total_rows = conn.execute(count_query).fetchone()[0] conn.close() - print(f"Processing {total_rows} rows in chunks of {chunk_size} with {max_workers} parallel workers") + + # Auto-detect optimal worker count + actual_workers = get_optimal_worker_count(max_workers) + cpu_count = os.cpu_count() or 'unknown' + + if max_workers is None: + print(f"Auto-detected {actual_workers} workers (CPU cores: {cpu_count})") + + print(f"Processing {total_rows} rows in chunks of {chunk_size} with {actual_workers} parallel workers") total_loaded = 0 preprocessing_start = time.time() # Submit all chunk processing and upload tasks in parallel - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with ThreadPoolExecutor(max_workers=actual_workers) as executor: futures = [] for chunk_start in range(0, total_rows, chunk_size): @@ -243,7 +273,13 @@ def bulkload_chunked(csv_file: str, upload_time = time.time() - upload_start total_processing_time = time.time() - preprocessing_start + + # Calculate throughput metrics + docs_per_sec = total_loaded / total_processing_time if total_processing_time > 0 else 0 + upload_docs_per_sec = total_loaded / upload_time if upload_time > 0 else 0 + print(f"Upload complete! Processing: {preprocessing_time:.2f}s, Upload: {upload_time:.2f}s, Total: {total_processing_time:.2f}s") + print(f"Throughput: {docs_per_sec:,.0f} docs/sec overall, {upload_docs_per_sec:,.0f} docs/sec upload") return total_loaded From 71875575018bb73c75f1707edbaaeae62c9f17dd Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 11:51:58 -0700 Subject: [PATCH 13/19] Add high-performance DuckDB to Solr bulk loading command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement new 'bulkload-db' command with: - Read-only DuckDB connections for safety - Parallel query execution with OFFSET/LIMIT chunking - Direct streaming (no temp files): DuckDB → JSON → HTTP - SQL filtering support (WHERE, columns, ORDER BY) - Auto-detected optimal worker count - Comprehensive timing and throughput metrics Usage: lsolr bulkload-db data.duckdb table_name [options] Expected performance: 50k-100k+ docs/sec vs 30k from CSV approach. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/cli.py | 100 +++++++++++++++- linkml_solr/utils/solr_bulkload.py | 178 +++++++++++++++++++++++++++++ 2 files changed, 277 insertions(+), 1 deletion(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index 5d0dcb4..0f31007 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -8,7 +8,7 @@ from linkml_runtime.linkml_model import SchemaDefinition from linkml_runtime.loaders import yaml_loader from linkml_solr import SolrQueryEngine, SolrEndpoint, DEFAULT_CORE, DEFAULT_SOLR_URL -from linkml_solr.utils.solr_bulkload import bulkload_file, bulkload_chunked +from linkml_solr.utils.solr_bulkload import bulkload_file, bulkload_chunked, bulkload_duckdb import requests import time @@ -142,6 +142,104 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w commit_solr(url, core) raise + +@main.command() +@click.option('--core', '-C', + default=DEFAULT_CORE, + show_default=True, + help='solr core.') +@click.option('--url', '-u', + default=DEFAULT_SOLR_URL, + help='solr url.') +@click.option('--schema', '-s', + help='Path to schema.') +@click.option('--chunk-size', '-c', + default=500000, + show_default=True, + help='Number of rows per chunk') +@click.option('--parallel-workers', '-w', + default=None, + type=int, + help='Number of parallel workers (default: auto-detect based on CPU cores)') +@click.option('--where', + help='SQL WHERE clause for filtering data') +@click.option('--columns', + help='Comma-separated list of columns to export (default: all)') +@click.option('--order-by', + help='SQL ORDER BY clause for consistent chunking') +@click.option('--auto-configure/--no-auto-configure', + default=True, + show_default=True, + help='Automatically configure Solr for optimal performance') +@click.option('--ram-buffer', + default=2048, + show_default=True, + help='RAM buffer size in MB (used with auto-configure)') +@click.argument('db_path') +@click.argument('table_name') +def bulkload_db(db_path, table_name, core, url, schema, chunk_size, parallel_workers, + where, columns, order_by, auto_configure, ram_buffer): + """ + Bulk load data from DuckDB database to Solr with high-performance parallel processing + + DB_PATH: Path to the DuckDB database file + TABLE_NAME: Name of the table to export + """ + if schema is not None: + with open(schema) as stream: + schema_obj = yaml_loader.load(stream, target_class=SchemaDefinition) + else: + schema_obj = None + + # Auto-configure Solr for performance if requested + if auto_configure: + print("Configuring Solr for optimal bulk loading performance...") + configure_solr_performance(url, core, ram_buffer, disable_autocommit=True) + + start_time = time.time() + + try: + print(f"Loading from DuckDB: {db_path} → table: {table_name}") + + total_loaded = bulkload_duckdb( + db_path=db_path, + table_name=table_name, + base_url=url, + core=core, + schema=schema_obj, + chunk_size=chunk_size, + max_workers=parallel_workers, + where_clause=where, + columns=columns, + order_by=order_by + ) + + # Commit all changes at the end + print("Committing all changes...") + commit_start = time.time() + if commit_solr(url, core): + commit_time = time.time() - commit_start + total_time = time.time() - start_time + overall_docs_per_sec = total_loaded / total_time if total_time > 0 else 0 + print(f"Successfully committed {total_loaded} documents to Solr") + print(f"Total time: {total_time:.2f}s (commit: {commit_time:.2f}s)") + print(f"Overall throughput: {overall_docs_per_sec:,.0f} docs/sec") + else: + total_time = time.time() - start_time + overall_docs_per_sec = total_loaded / total_time if total_time > 0 else 0 + print("Warning: Commit may have failed") + print(f"Total time: {total_time:.2f}s") + print(f"Overall throughput: {overall_docs_per_sec:,.0f} docs/sec") + + except Exception as e: + total_time = time.time() - start_time + print(f"Error during bulk loading: {e}") + print(f"Total time before error: {total_time:.2f}s") + # Try to commit what we have + commit_solr(url, core) + raise + + @main.command() @click.option('--schema', '-s', help='Path to LinkML yaml schema.') diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 92fa02f..1413edc 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -284,6 +284,184 @@ def bulkload_chunked(csv_file: str, return total_loaded +def query_duckdb_chunk(db_path: str, query: str, offset: int, chunk_size: int) -> tuple: + """ + Query a chunk of data from DuckDB with read-only connection + + :param db_path: Path to DuckDB database + :param query: Base SQL query (without LIMIT/OFFSET) + :param offset: Starting row offset + :param chunk_size: Number of rows to fetch + :return: (results, columns) tuple + """ + try: + # Open read-only connection for safety + conn = duckdb.connect(db_path, read_only=True) + + # Add LIMIT/OFFSET to the query + chunk_query = f"{query} LIMIT {chunk_size} OFFSET {offset}" + + result = conn.execute(chunk_query) + rows = result.fetchall() + columns = [desc[0] for desc in result.description] + + conn.close() + return rows, columns + except Exception as e: + print(f"Error querying DuckDB chunk at offset {offset}: {e}") + return [], [] + + +def upload_duckdb_chunk(db_path: str, query: str, offset: int, chunk_size: int, + base_url: str, core: str, schema: SchemaDefinition) -> int: + """ + Query DuckDB chunk and upload directly to Solr + + :param db_path: Path to DuckDB database + :param query: Base SQL query + :param offset: Starting row offset + :param chunk_size: Number of rows to process + :param base_url: Solr base URL + :param core: Solr core name + :param schema: LinkML schema definition + :return: Number of documents uploaded + """ + try: + # Query the chunk + rows, columns = query_duckdb_chunk(db_path, query, offset, chunk_size) + + if not rows: + return 0 + + # Convert to JSON for Solr + docs = [dict(zip(columns, row)) for row in rows] + json_data = json.dumps(docs) + + # Upload directly to Solr + session = get_http_session() + url = f'{base_url}/{core}/update/json/docs?commit=false' + + response = session.post(url, data=json_data, + headers={'Content-Type': 'application/json'}, + timeout=300) + + if response.status_code == 200: + print(f"Uploaded chunk offset {offset}: {len(docs)} docs") + return len(docs) + else: + print(f"Error uploading chunk offset {offset}: {response.status_code}") + print(f"Error response: {response.text}") + return 0 + + except Exception as e: + print(f"Error processing DuckDB chunk at offset {offset}: {e}") + return 0 + + +def bulkload_duckdb(db_path: str, + table_name: str, + base_url: str, + core: str, + schema: SchemaDefinition, + chunk_size: int = 500000, + max_workers: Optional[int] = None, + where_clause: Optional[str] = None, + columns: Optional[str] = None, + order_by: Optional[str] = None) -> int: + """ + Load data from DuckDB database to Solr with parallel processing + + :param db_path: Path to DuckDB database file + :param table_name: Name of table to export + :param base_url: Solr base URL + :param core: Solr core name + :param schema: LinkML schema definition + :param chunk_size: Number of rows per chunk + :param max_workers: Number of parallel workers (None for auto-detect) + :param where_clause: Optional SQL WHERE clause + :param columns: Optional comma-separated column list + :param order_by: Optional SQL ORDER BY clause + :return: Total number of documents loaded + """ + try: + # Build SQL query + column_list = columns if columns else "*" + base_query = f"SELECT {column_list} FROM {table_name}" + + if where_clause: + base_query += f" WHERE {where_clause}" + + if order_by: + base_query += f" ORDER BY {order_by}" + + # Get total row count with read-only connection + conn = duckdb.connect(db_path, read_only=True) + count_query = f"SELECT COUNT(*) FROM {table_name}" + if where_clause: + count_query += f" WHERE {where_clause}" + + total_rows = conn.execute(count_query).fetchone()[0] + conn.close() + + # Auto-detect optimal worker count + actual_workers = get_optimal_worker_count(max_workers) + cpu_count = os.cpu_count() or 'unknown' + + if max_workers is None: + print(f"Auto-detected {actual_workers} workers (CPU cores: {cpu_count})") + + print(f"Processing {total_rows} rows from DuckDB in chunks of {chunk_size} with {actual_workers} parallel workers") + print(f"Query: {base_query}") + + total_loaded = 0 + processing_start = time.time() + + # Submit all chunk queries and uploads in parallel + with ThreadPoolExecutor(max_workers=actual_workers) as executor: + futures = [] + + for offset in range(0, total_rows, chunk_size): + future = executor.submit( + upload_duckdb_chunk, + db_path, + base_query, + offset, + chunk_size, + base_url, + core, + schema + ) + futures.append(future) + + print(f"Submitted {len(futures)} parallel chunks - processing...") + upload_start = time.time() + + # Process results as they complete + for future in as_completed(futures): + try: + docs_loaded = future.result() + total_loaded += docs_loaded + print(f"Progress: {total_loaded}/{total_rows} documents loaded") + except Exception as e: + print(f"Error in parallel DuckDB chunk processing: {e}") + + upload_time = time.time() - upload_start + total_processing_time = time.time() - processing_start + + # Calculate throughput metrics + docs_per_sec = total_loaded / total_processing_time if total_processing_time > 0 else 0 + upload_docs_per_sec = total_loaded / upload_time if upload_time > 0 else 0 + + print(f"DuckDB upload complete! Upload: {upload_time:.2f}s, Total: {total_processing_time:.2f}s") + print(f"Throughput: {docs_per_sec:,.0f} docs/sec overall, {upload_docs_per_sec:,.0f} docs/sec upload") + + return total_loaded + + except Exception as e: + print(f"Error in DuckDB bulk loading: {e}") + return 0 + + def _create_csv_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_file: str) -> int: """ Create a CSV/TSV chunk file with header using DuckDB From 0dfe0dcdf3b863dfd2428bf67c0fcc36db20c1b1 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 13 Aug 2025 12:52:12 -0700 Subject: [PATCH 14/19] Adjust default performance settings for better balance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reduce default chunk size from 500k to 100k rows for better memory usage - Make worker auto-detection less aggressive: CPU × 1.5 instead of × 2 - Cap workers at 12 instead of 16 to avoid over-parallelization - Maintains manual override capability for fine-tuning This provides better out-of-the-box performance while allowing customization. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/cli.py | 4 ++-- linkml_solr/utils/solr_bulkload.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index 0f31007..c79fe21 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -56,7 +56,7 @@ def main(verbose: int, quiet: bool): @click.option('--processor', '-p', help='Processor argument to pass when bulk loading to Solr') @click.option('--chunk-size', '-c', - default=500000, + default=100000, show_default=True, help='Number of rows per chunk for large files') @click.option('--parallel-workers', '-w', @@ -154,7 +154,7 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w @click.option('--schema', '-s', help='Path to schema.') @click.option('--chunk-size', '-c', - default=500000, + default=100000, show_default=True, help='Number of rows per chunk') @click.option('--parallel-workers', '-w', diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 1413edc..0adf2b6 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -49,9 +49,9 @@ def get_optimal_worker_count(max_workers: Optional[int] = None) -> int: # Get CPU count cpu_count = os.cpu_count() or 4 # Fallback to 4 if detection fails - # For I/O bound HTTP uploads, use 2x CPU count but cap at reasonable limit - # This balances parallelism with system resource usage - optimal = min(cpu_count * 2, 16) + # For I/O bound HTTP uploads, use CPU count + 50% but cap at reasonable limit + # This balances parallelism with system resource usage without being too aggressive + optimal = min(int(cpu_count * 1.5), 12) # Minimum of 2 workers for any benefit return max(optimal, 2) @@ -196,7 +196,7 @@ def bulkload_chunked(csv_file: str, base_url: str, core: str, schema: SchemaDefinition, - chunk_size: int = 500000, + chunk_size: int = 100000, max_workers: Optional[int] = None, format: str = 'csv', processor: str = None) -> int: @@ -363,7 +363,7 @@ def bulkload_duckdb(db_path: str, base_url: str, core: str, schema: SchemaDefinition, - chunk_size: int = 500000, + chunk_size: int = 100000, max_workers: Optional[int] = None, where_clause: Optional[str] = None, columns: Optional[str] = None, From fcddaab638a31725ed81c64210088a330f3ec0a4 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Thu, 14 Aug 2025 13:42:15 -0700 Subject: [PATCH 15/19] Fix CSV parsing errors with malformed rows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds null_padding and max_line_size parameters to DuckDB CSV reader to handle rows with inconsistent column counts. This prevents bulk loading failures when encountering malformed CSV/TSV data. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/utils/solr_bulkload.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 0adf2b6..3b78238 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -124,7 +124,9 @@ def csv_to_json_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_f SELECT * FROM read_csv_auto('{csv_file}', delim='{sep}', ignore_errors=true, - header=true) + header=true, + null_padding=true, + max_line_size=1048576) LIMIT {chunk_size} OFFSET {chunk_start} """ @@ -220,7 +222,9 @@ def bulkload_chunked(csv_file: str, SELECT COUNT(*) FROM read_csv_auto('{csv_file}', delim='{sep}', ignore_errors=true, - header=true) + header=true, + null_padding=true, + max_line_size=1048576) """ total_rows = conn.execute(count_query).fetchone()[0] conn.close() @@ -484,7 +488,9 @@ def _create_csv_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_f SELECT * FROM read_csv_auto('{csv_file}', delim='{sep}', ignore_errors=true, - header=true) + header=true, + null_padding=true, + max_line_size=1048576) LIMIT {chunk_size} OFFSET {chunk_start} ) TO '{output_file}' (FORMAT CSV, HEADER true) """ @@ -493,7 +499,7 @@ def _create_csv_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_f # Count rows to return count_query = f""" - SELECT COUNT(*) FROM read_csv_auto('{output_file}', header=true) + SELECT COUNT(*) FROM read_csv_auto('{output_file}', header=true, ignore_errors=true, null_padding=true) """ count = conn.execute(count_query).fetchone()[0] From b344da5e989a0eb762d5a8a92546a05fdde07f7c Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Thu, 14 Aug 2025 17:29:40 -0700 Subject: [PATCH 16/19] Fix field concatenation issue by using TSV format consistently MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bulkload process was creating CSV files but Solr was configured to expect TSV (tab-separated) format with separator=%09. This caused all field names to be concatenated into one field and all values into a single array. Changes: - Remove problematic null_padding and max_line_size DuckDB parameters - Use DELIMITER '\t' in DuckDB COPY command to create TSV files - Ensure format consistency between export and Solr import 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- linkml_solr/utils/solr_bulkload.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 3b78238..11da720 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -124,9 +124,7 @@ def csv_to_json_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_f SELECT * FROM read_csv_auto('{csv_file}', delim='{sep}', ignore_errors=true, - header=true, - null_padding=true, - max_line_size=1048576) + header=true) LIMIT {chunk_size} OFFSET {chunk_start} """ @@ -222,9 +220,7 @@ def bulkload_chunked(csv_file: str, SELECT COUNT(*) FROM read_csv_auto('{csv_file}', delim='{sep}', ignore_errors=true, - header=true, - null_padding=true, - max_line_size=1048576) + header=true) """ total_rows = conn.execute(count_query).fetchone()[0] conn.close() @@ -488,18 +484,16 @@ def _create_csv_chunk(csv_file: str, chunk_start: int, chunk_size: int, output_f SELECT * FROM read_csv_auto('{csv_file}', delim='{sep}', ignore_errors=true, - header=true, - null_padding=true, - max_line_size=1048576) + header=true) LIMIT {chunk_size} OFFSET {chunk_start} - ) TO '{output_file}' (FORMAT CSV, HEADER true) + ) TO '{output_file}' (FORMAT CSV, DELIMITER '\t', HEADER true) """ conn.execute(query) # Count rows to return count_query = f""" - SELECT COUNT(*) FROM read_csv_auto('{output_file}', header=true, ignore_errors=true, null_padding=true) + SELECT COUNT(*) FROM read_csv_auto('{output_file}', header=true, ignore_errors=true) """ count = conn.execute(count_query).fetchone()[0] From f21877145ad3ac14e133778c718c12d1eb4927f4 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Fri, 22 Aug 2025 15:02:03 -0700 Subject: [PATCH 17/19] add processor option for db bulkload --- linkml_solr/cli.py | 7 +++++-- linkml_solr/utils/solr_bulkload.py | 10 +++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index c79fe21..4f24109 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -175,10 +175,12 @@ def bulkload(files, format, schema, url, core, processor, chunk_size, parallel_w default=2048, show_default=True, help='RAM buffer size in MB (used with auto-configure)') +@click.option('--processor', '-p', + help='Processor argument to pass when bulk loading to Solr') @click.argument('db_path') @click.argument('table_name') def bulkload_db(db_path, table_name, core, url, schema, chunk_size, parallel_workers, - where, columns, order_by, auto_configure, ram_buffer): + where, columns, order_by, auto_configure, ram_buffer, processor): """ Bulk load data from DuckDB database to Solr with high-performance parallel processing @@ -211,7 +213,8 @@ def bulkload_db(db_path, table_name, core, url, schema, chunk_size, parallel_wor max_workers=parallel_workers, where_clause=where, columns=columns, - order_by=order_by + order_by=order_by, + processor=processor ) # Commit all changes at the end diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index 11da720..a78b79d 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -313,7 +313,7 @@ def query_duckdb_chunk(db_path: str, query: str, offset: int, chunk_size: int) - def upload_duckdb_chunk(db_path: str, query: str, offset: int, chunk_size: int, - base_url: str, core: str, schema: SchemaDefinition) -> int: + base_url: str, core: str, schema: SchemaDefinition, processor: Optional[str] = None) -> int: """ Query DuckDB chunk and upload directly to Solr @@ -340,6 +340,8 @@ def upload_duckdb_chunk(db_path: str, query: str, offset: int, chunk_size: int, # Upload directly to Solr session = get_http_session() url = f'{base_url}/{core}/update/json/docs?commit=false' + if processor is not None: + url = f'{url}&processor={processor}' response = session.post(url, data=json_data, headers={'Content-Type': 'application/json'}, @@ -367,7 +369,8 @@ def bulkload_duckdb(db_path: str, max_workers: Optional[int] = None, where_clause: Optional[str] = None, columns: Optional[str] = None, - order_by: Optional[str] = None) -> int: + order_by: Optional[str] = None, + processor: Optional[str] = None) -> int: """ Load data from DuckDB database to Solr with parallel processing @@ -429,7 +432,8 @@ def bulkload_duckdb(db_path: str, chunk_size, base_url, core, - schema + schema, + processor ) futures.append(future) From 0f2f324819f28b9ed907eb19e31da7847daa9357 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Fri, 22 Aug 2025 15:27:12 -0700 Subject: [PATCH 18/19] batching improvements --- linkml_solr/utils/solr_bulkload.py | 61 ++++++++++++++++++------------ poetry.lock | 47 ++++++++++++++++++++++- 2 files changed, 82 insertions(+), 26 deletions(-) diff --git a/linkml_solr/utils/solr_bulkload.py b/linkml_solr/utils/solr_bulkload.py index a78b79d..7ccda5b 100644 --- a/linkml_solr/utils/solr_bulkload.py +++ b/linkml_solr/utils/solr_bulkload.py @@ -419,35 +419,46 @@ def bulkload_duckdb(db_path: str, total_loaded = 0 processing_start = time.time() - # Submit all chunk queries and uploads in parallel + # Process chunks in batches to control memory usage with ThreadPoolExecutor(max_workers=actual_workers) as executor: - futures = [] + # Create list of all chunk offsets + chunk_offsets = list(range(0, total_rows, chunk_size)) + total_chunks = len(chunk_offsets) - for offset in range(0, total_rows, chunk_size): - future = executor.submit( - upload_duckdb_chunk, - db_path, - base_query, - offset, - chunk_size, - base_url, - core, - schema, - processor - ) - futures.append(future) - - print(f"Submitted {len(futures)} parallel chunks - processing...") + print(f"Processing {total_chunks} chunks in batches of {actual_workers} workers") upload_start = time.time() - # Process results as they complete - for future in as_completed(futures): - try: - docs_loaded = future.result() - total_loaded += docs_loaded - print(f"Progress: {total_loaded}/{total_rows} documents loaded") - except Exception as e: - print(f"Error in parallel DuckDB chunk processing: {e}") + # Process chunks in batches of worker count + for batch_start in range(0, len(chunk_offsets), actual_workers): + batch_end = min(batch_start + actual_workers, len(chunk_offsets)) + batch_offsets = chunk_offsets[batch_start:batch_end] + + print(f"Processing batch {batch_start//actual_workers + 1}/{(total_chunks + actual_workers - 1)//actual_workers}: chunks {batch_start + 1}-{batch_end}") + + # Submit current batch + futures = [] + for offset in batch_offsets: + future = executor.submit( + upload_duckdb_chunk, + db_path, + base_query, + offset, + chunk_size, + base_url, + core, + schema, + processor + ) + futures.append(future) + + # Wait for current batch to complete + for future in as_completed(futures): + try: + docs_loaded = future.result() + total_loaded += docs_loaded + print(f"Progress: {total_loaded:,}/{total_rows:,} documents loaded") + except Exception as e: + print(f"Error in parallel DuckDB chunk processing: {e}") upload_time = time.time() - upload_start total_processing_time = time.time() - processing_start diff --git a/poetry.lock b/poetry.lock index 80ca592..4df6e58 100644 --- a/poetry.lock +++ b/poetry.lock @@ -269,6 +269,51 @@ files = [ {file = "distlib-0.3.8.tar.gz", hash = "sha256:1530ea13e350031b6312d8580ddb6b27a104275a31106523b8f123787f494f64"}, ] +[[package]] +name = "duckdb" +version = "1.3.2" +description = "DuckDB in-process database" +optional = false +python-versions = ">=3.7.0" +files = [ + {file = "duckdb-1.3.2-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:14676651b86f827ea10bf965eec698b18e3519fdc6266d4ca849f5af7a8c315e"}, + {file = "duckdb-1.3.2-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:e584f25892450757919639b148c2410402b17105bd404017a57fa9eec9c98919"}, + {file = "duckdb-1.3.2-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:84a19f185ee0c5bc66d95908c6be19103e184b743e594e005dee6f84118dc22c"}, + {file = "duckdb-1.3.2-cp310-cp310-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:186fc3f98943e97f88a1e501d5720b11214695571f2c74745d6e300b18bef80e"}, + {file = "duckdb-1.3.2-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6b7e6bb613b73745f03bff4bb412f362d4a1e158bdcb3946f61fd18e9e1a8ddf"}, + {file = "duckdb-1.3.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:1c90646b52a0eccda1f76b10ac98b502deb9017569e84073da00a2ab97763578"}, + {file = "duckdb-1.3.2-cp310-cp310-win_amd64.whl", hash = "sha256:4cdffb1e60defbfa75407b7f2ccc322f535fd462976940731dfd1644146f90c6"}, + {file = "duckdb-1.3.2-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:e1872cf63aae28c3f1dc2e19b5e23940339fc39fb3425a06196c5d00a8d01040"}, + {file = "duckdb-1.3.2-cp311-cp311-macosx_12_0_universal2.whl", hash = "sha256:db256c206056468ae6a9e931776bdf7debaffc58e19a0ff4fa9e7e1e82d38b3b"}, + {file = "duckdb-1.3.2-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:1d57df2149d6e4e0bd5198689316c5e2ceec7f6ac0a9ec11bc2b216502a57b34"}, + {file = "duckdb-1.3.2-cp311-cp311-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:54f76c8b1e2a19dfe194027894209ce9ddb073fd9db69af729a524d2860e4680"}, + {file = "duckdb-1.3.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:45bea70b3e93c6bf766ce2f80fc3876efa94c4ee4de72036417a7bd1e32142fe"}, + {file = "duckdb-1.3.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:003f7d36f0d8a430cb0e00521f18b7d5ee49ec98aaa541914c6d0e008c306f1a"}, + {file = "duckdb-1.3.2-cp311-cp311-win_amd64.whl", hash = "sha256:0eb210cedf08b067fa90c666339688f1c874844a54708562282bc54b0189aac6"}, + {file = "duckdb-1.3.2-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:2455b1ffef4e3d3c7ef8b806977c0e3973c10ec85aa28f08c993ab7f2598e8dd"}, + {file = "duckdb-1.3.2-cp312-cp312-macosx_12_0_universal2.whl", hash = "sha256:9d0ae509713da3461c000af27496d5413f839d26111d2a609242d9d17b37d464"}, + {file = "duckdb-1.3.2-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:72ca6143d23c0bf6426396400f01fcbe4785ad9ceec771bd9a4acc5b5ef9a075"}, + {file = "duckdb-1.3.2-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b49a11afba36b98436db83770df10faa03ebded06514cb9b180b513d8be7f392"}, + {file = "duckdb-1.3.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:36abdfe0d1704fe09b08d233165f312dad7d7d0ecaaca5fb3bb869f4838a2d0b"}, + {file = "duckdb-1.3.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:3380aae1c4f2af3f37b0bf223fabd62077dd0493c84ef441e69b45167188e7b6"}, + {file = "duckdb-1.3.2-cp312-cp312-win_amd64.whl", hash = "sha256:11af73963ae174aafd90ea45fb0317f1b2e28a7f1d9902819d47c67cc957d49c"}, + {file = "duckdb-1.3.2-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:a3418c973b06ac4e97f178f803e032c30c9a9f56a3e3b43a866f33223dfbf60b"}, + {file = "duckdb-1.3.2-cp313-cp313-macosx_12_0_universal2.whl", hash = "sha256:2a741eae2cf110fd2223eeebe4151e22c0c02803e1cfac6880dbe8a39fecab6a"}, + {file = "duckdb-1.3.2-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:51e62541341ea1a9e31f0f1ade2496a39b742caf513bebd52396f42ddd6525a0"}, + {file = "duckdb-1.3.2-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b3e519de5640e5671f1731b3ae6b496e0ed7e4de4a1c25c7a2f34c991ab64d71"}, + {file = "duckdb-1.3.2-cp313-cp313-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4732fb8cc60566b60e7e53b8c19972cb5ed12d285147a3063b16cc64a79f6d9f"}, + {file = "duckdb-1.3.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:97f7a22dcaa1cca889d12c3dc43a999468375cdb6f6fe56edf840e062d4a8293"}, + {file = "duckdb-1.3.2-cp313-cp313-win_amd64.whl", hash = "sha256:cd3d717bf9c49ef4b1016c2216517572258fa645c2923e91c5234053defa3fb5"}, + {file = "duckdb-1.3.2-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:18862e3b8a805f2204543d42d5f103b629cb7f7f2e69f5188eceb0b8a023f0af"}, + {file = "duckdb-1.3.2-cp39-cp39-macosx_12_0_universal2.whl", hash = "sha256:75ed129761b6159f0b8eca4854e496a3c4c416e888537ec47ff8eb35fda2b667"}, + {file = "duckdb-1.3.2-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:875193ae9f718bc80ab5635435de5b313e3de3ec99420a9b25275ddc5c45ff58"}, + {file = "duckdb-1.3.2-cp39-cp39-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:09b5fd8a112301096668903781ad5944c3aec2af27622bd80eae54149de42b42"}, + {file = "duckdb-1.3.2-cp39-cp39-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:10cb87ad964b989175e7757d7ada0b1a7264b401a79be2f828cf8f7c366f7f95"}, + {file = "duckdb-1.3.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:4389fc3812e26977034fe3ff08d1f7dbfe6d2d8337487b4686f2b50e254d7ee3"}, + {file = "duckdb-1.3.2-cp39-cp39-win_amd64.whl", hash = "sha256:07952ec6f45dd3c7db0f825d231232dc889f1f2490b97a4e9b7abb6830145a19"}, + {file = "duckdb-1.3.2.tar.gz", hash = "sha256:c658df8a1bc78704f702ad0d954d82a1edd4518d7a04f00027ec53e40f591ff5"}, +] + [[package]] name = "et-xmlfile" version = "1.1.0" @@ -1862,4 +1907,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "6fc01e57c7c15380b7dd514737f1b1bb95a5b7d573b7d6e90ea6182df76036f7" +content-hash = "0aefa155d312654cf35bf78516b649fccd9a86096e9b7188d28055612a528446" From 01985b28a4c114d18aee3037c411fbb852274d8c Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Fri, 22 Aug 2025 21:29:44 -0700 Subject: [PATCH 19/19] increased request header size --- linkml_solr/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkml_solr/cli.py b/linkml_solr/cli.py index 4f24109..10f4bab 100644 --- a/linkml_solr/cli.py +++ b/linkml_solr/cli.py @@ -305,7 +305,7 @@ def start_server(schema, kill, container, url, core, port, sleep: int, create_sc '-e', f'SOLR_JAVA_MEM=-Xms{heap_size} -Xmx{heap_size}', '-e', - f'SOLR_OPTS=-Dsolr.ramBufferSizeMB={ram_buffer_mb}', + f'SOLR_OPTS=-Dsolr.ramBufferSizeMB={ram_buffer_mb} -Dsolr.jetty.request.header.size=65535', 'solr:8', 'solr-precreate', core]