diff --git a/mcp/so_server_core.py b/mcp/so_server_core.py index ba9e165..bec6d8c 100644 --- a/mcp/so_server_core.py +++ b/mcp/so_server_core.py @@ -21,6 +21,7 @@ import duckdb import requests +from lab_connectors.duckdb import safe_connect from lab_connectors.http import HttpClient @@ -701,8 +702,7 @@ def find_by_url(url: str) -> dict[str, Any]: try: with _resolved_parquet(source_check_artifact) as (resolved_path, cache): parquet_path = str(resolved_path) - con = duckdb.connect() - try: + with safe_connect() as con: cols = _table_columns(con, parquet_path) url_cols = [c for c in cols if c in ("url", "url_checked", "distribution_url", "landing_page", "source_url")] if not url_cols: @@ -717,8 +717,6 @@ def find_by_url(url: str) -> dict[str, Any]: rows = con.execute(sql, [like] * len(url_cols)).fetchall() results["source_check_results"] = [dict(zip(cols, row)) for row in rows] results["source_check_cache"] = cache - finally: - con.close() except FileNotFoundError: results["source_check_error"] = f"{source_check_artifact.name} not found" @@ -726,8 +724,7 @@ def find_by_url(url: str) -> dict[str, Any]: try: with _resolved_parquet(catalog_artifact) as (resolved_path, cache): parquet_path = str(resolved_path) - con = duckdb.connect() - try: + with safe_connect() as con: cols = _table_columns(con, parquet_path) url_cols = [c for c in cols if c in ("url", "url_checked", "distribution_url", "landing_page", "source_url")] if not url_cols: @@ -742,8 +739,6 @@ def find_by_url(url: str) -> dict[str, Any]: rows = con.execute(sql, [like] * len(url_cols)).fetchall() results["catalog_inventory"] = [dict(zip(cols, row)) for row in rows] results["catalog_inventory_cache"] = cache - finally: - con.close() except FileNotFoundError: results["catalog_inventory_error"] = f"{catalog_artifact.name} not found" @@ -814,8 +809,7 @@ def query_inventory( try: with _resolved_parquet(artifact) as (resolved_path, cache): parquet_path = str(resolved_path) - con = duckdb.connect() - try: + with safe_connect() as con: cols = _table_columns(con, parquet_path) query = f'SELECT * FROM "{parquet_path}"' filters: list[str] = [] @@ -837,8 +831,6 @@ def query_inventory( query += f" ORDER BY intake_score DESC NULLS LAST LIMIT {safe_limit}" rows = con.execute(query, params).fetchall() - finally: - con.close() except FileNotFoundError: return _parquet_not_found(artifact) @@ -929,8 +921,7 @@ def catalog_inventory_search( try: with _resolved_parquet(artifact) as (resolved_path, cache): parquet_path = str(resolved_path) - con = duckdb.connect() - try: + with safe_connect() as con: columns = set(_table_columns(con, parquet_path)) search_columns = [ column @@ -992,8 +983,6 @@ def catalog_inventory_search( """ rows = con.execute(sql, params).fetchall() cols = [desc[0] for desc in con.description] - finally: - con.close() except FileNotFoundError: return _parquet_not_found(artifact) @@ -1157,8 +1146,7 @@ def _inventory_source_status(source_id: str) -> dict[str, Any] | None: def _read_sdmx_inventory_rows(parquet_path: Path) -> list[dict[str, Any]]: - con = duckdb.connect() - try: + with safe_connect() as con: rows = con.execute( f""" SELECT source_id, item_id, item_name, title, tags, api_base_url, source_url @@ -1166,8 +1154,6 @@ def _read_sdmx_inventory_rows(parquet_path: Path) -> list[dict[str, Any]]: WHERE source_id = 'istat_sdmx' """ ).fetchall() - finally: - con.close() cols = [ "source_id", "item_id", @@ -1331,8 +1317,7 @@ def recommend_sources(keyword: str, limit: int = 10) -> dict[str, Any]: try: artifact = _catalog_inventory_parquet() with _resolved_parquet(artifact) as (resolved_path, cache): - con = duckdb.connect() - try: + with safe_connect() as con: rows = con.execute( f''' SELECT source_id, source_kind, protocol, @@ -1352,8 +1337,6 @@ def recommend_sources(keyword: str, limit: int = 10) -> dict[str, Any]: ''', [f"%{keyword_low}%"] * 5 + [safe_limit], ).fetchall() - finally: - con.close() except FileNotFoundError: return _parquet_not_found(_catalog_inventory_parquet()) @@ -1406,15 +1389,12 @@ def inventory_diff(source_id: str) -> dict[str, Any]: try: artifact = _catalog_inventory_parquet() with _resolved_parquet(artifact) as (resolved_path, cache): - con = duckdb.connect() - try: + with safe_connect() as con: row = con.execute( f'SELECT COUNT(*) FROM "{resolved_path}" WHERE source_id = ?', [source_id], ).fetchone() current_count = row[0] if row else 0 - finally: - con.close() except FileNotFoundError: return _parquet_not_found(_catalog_inventory_parquet()) diff --git a/requirements.txt b/requirements.txt index c612522..efcfe71 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,5 +6,5 @@ pyarrow>=24.0.0,<25.0 ddgs>=9.14.2,<10.0 mcp>=1.27.1 fastmcp>=3.2.4 -lab-connectors @ git+https://github.com/dataciviclab/lab-connectors.git@v0.2.0 -dataciviclab-toolkit @ git+https://github.com/dataciviclab/toolkit.git@481b05e5af34974ac7c65b4a65a3fc59596c1002 +lab-connectors[duckdb] @ git+https://github.com/dataciviclab/lab-connectors.git@v0.4.0 +dataciviclab-toolkit @ git+https://github.com/dataciviclab/toolkit.git@a06005c1424bd381a63070058cb4beae463035c8 diff --git a/scripts/build_catalog_inventory.py b/scripts/build_catalog_inventory.py index 621ba6b..008ec31 100644 --- a/scripts/build_catalog_inventory.py +++ b/scripts/build_catalog_inventory.py @@ -9,9 +9,9 @@ from pathlib import Path from typing import Any, cast -import duckdb import pandas as pd +from lab_connectors.duckdb import safe_connect from collectors import dispatch, supported_protocols from collectors.base import inventory_cfg, now_utc_iso from collectors.ckan import ( @@ -336,11 +336,10 @@ def _record_timing(_sid: str = source_id) -> None: if sid not in report["sources"]: report["sources"][sid] = info - con = duckdb.connect() - con.register("inventory_df", df) - con.execute("CREATE TABLE inventory AS SELECT * FROM inventory_df") - con.execute("COPY inventory TO ? (FORMAT PARQUET)", [str(out_parquet)]) - con.close() + with safe_connect() as con: + con.register("inventory_df", df) + con.execute("CREATE TABLE inventory AS SELECT * FROM inventory_df") + con.execute("COPY inventory TO ? (FORMAT PARQUET)", [str(out_parquet)]) with out_report.open("w", encoding="utf-8") as fh: json.dump(report, fh, indent=2, ensure_ascii=False)