Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 8 additions & 28 deletions mcp/so_server_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import duckdb
import requests
from lab_connectors.duckdb import safe_connect

from lab_connectors.http import HttpClient

Expand Down Expand Up @@ -701,8 +702,7 @@ def find_by_url(url: str) -> dict[str, Any]:
try:
with _resolved_parquet(source_check_artifact) as (resolved_path, cache):
parquet_path = str(resolved_path)
con = duckdb.connect()
try:
with safe_connect() as con:
cols = _table_columns(con, parquet_path)
url_cols = [c for c in cols if c in ("url", "url_checked", "distribution_url", "landing_page", "source_url")]
if not url_cols:
Expand All @@ -717,17 +717,14 @@ def find_by_url(url: str) -> dict[str, Any]:
rows = con.execute(sql, [like] * len(url_cols)).fetchall()
results["source_check_results"] = [dict(zip(cols, row)) for row in rows]
results["source_check_cache"] = cache
finally:
con.close()
except FileNotFoundError:
results["source_check_error"] = f"{source_check_artifact.name} not found"

catalog_artifact = _catalog_inventory_parquet()
try:
with _resolved_parquet(catalog_artifact) as (resolved_path, cache):
parquet_path = str(resolved_path)
con = duckdb.connect()
try:
with safe_connect() as con:
cols = _table_columns(con, parquet_path)
url_cols = [c for c in cols if c in ("url", "url_checked", "distribution_url", "landing_page", "source_url")]
if not url_cols:
Expand All @@ -742,8 +739,6 @@ def find_by_url(url: str) -> dict[str, Any]:
rows = con.execute(sql, [like] * len(url_cols)).fetchall()
results["catalog_inventory"] = [dict(zip(cols, row)) for row in rows]
results["catalog_inventory_cache"] = cache
finally:
con.close()
except FileNotFoundError:
results["catalog_inventory_error"] = f"{catalog_artifact.name} not found"

Expand Down Expand Up @@ -814,8 +809,7 @@ def query_inventory(
try:
with _resolved_parquet(artifact) as (resolved_path, cache):
parquet_path = str(resolved_path)
con = duckdb.connect()
try:
with safe_connect() as con:
cols = _table_columns(con, parquet_path)
query = f'SELECT * FROM "{parquet_path}"'
filters: list[str] = []
Expand All @@ -837,8 +831,6 @@ def query_inventory(
query += f" ORDER BY intake_score DESC NULLS LAST LIMIT {safe_limit}"

rows = con.execute(query, params).fetchall()
finally:
con.close()
except FileNotFoundError:
return _parquet_not_found(artifact)

Expand Down Expand Up @@ -929,8 +921,7 @@ def catalog_inventory_search(
try:
with _resolved_parquet(artifact) as (resolved_path, cache):
parquet_path = str(resolved_path)
con = duckdb.connect()
try:
with safe_connect() as con:
columns = set(_table_columns(con, parquet_path))
search_columns = [
column
Expand Down Expand Up @@ -992,8 +983,6 @@ def catalog_inventory_search(
"""
rows = con.execute(sql, params).fetchall()
cols = [desc[0] for desc in con.description]
finally:
con.close()
except FileNotFoundError:
return _parquet_not_found(artifact)

Expand Down Expand Up @@ -1157,17 +1146,14 @@ def _inventory_source_status(source_id: str) -> dict[str, Any] | None:


def _read_sdmx_inventory_rows(parquet_path: Path) -> list[dict[str, Any]]:
con = duckdb.connect()
try:
with safe_connect() as con:
rows = con.execute(
f"""
SELECT source_id, item_id, item_name, title, tags, api_base_url, source_url
FROM "{parquet_path}"
WHERE source_id = 'istat_sdmx'
"""
).fetchall()
finally:
con.close()
cols = [
"source_id",
"item_id",
Expand Down Expand Up @@ -1331,8 +1317,7 @@ def recommend_sources(keyword: str, limit: int = 10) -> dict[str, Any]:
try:
artifact = _catalog_inventory_parquet()
with _resolved_parquet(artifact) as (resolved_path, cache):
con = duckdb.connect()
try:
with safe_connect() as con:
rows = con.execute(
f'''
SELECT source_id, source_kind, protocol,
Expand All @@ -1352,8 +1337,6 @@ def recommend_sources(keyword: str, limit: int = 10) -> dict[str, Any]:
''',
[f"%{keyword_low}%"] * 5 + [safe_limit],
).fetchall()
finally:
con.close()
except FileNotFoundError:
return _parquet_not_found(_catalog_inventory_parquet())

Expand Down Expand Up @@ -1406,15 +1389,12 @@ def inventory_diff(source_id: str) -> dict[str, Any]:
try:
artifact = _catalog_inventory_parquet()
with _resolved_parquet(artifact) as (resolved_path, cache):
con = duckdb.connect()
try:
with safe_connect() as con:
row = con.execute(
f'SELECT COUNT(*) FROM "{resolved_path}" WHERE source_id = ?',
[source_id],
).fetchone()
current_count = row[0] if row else 0
finally:
con.close()
except FileNotFoundError:
return _parquet_not_found(_catalog_inventory_parquet())

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ pyarrow>=24.0.0,<25.0
ddgs>=9.14.2,<10.0
mcp>=1.27.1
fastmcp>=3.2.4
lab-connectors @ git+https://github.com/dataciviclab/lab-connectors.git@v0.2.0
dataciviclab-toolkit @ git+https://github.com/dataciviclab/toolkit.git@481b05e5af34974ac7c65b4a65a3fc59596c1002
lab-connectors[duckdb] @ git+https://github.com/dataciviclab/lab-connectors.git@v0.4.0
dataciviclab-toolkit @ git+https://github.com/dataciviclab/toolkit.git@a06005c1424bd381a63070058cb4beae463035c8
11 changes: 5 additions & 6 deletions scripts/build_catalog_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from pathlib import Path
from typing import Any, cast

import duckdb
import pandas as pd

from lab_connectors.duckdb import safe_connect
from collectors import dispatch, supported_protocols
from collectors.base import inventory_cfg, now_utc_iso
from collectors.ckan import (
Expand Down Expand Up @@ -336,11 +336,10 @@ def _record_timing(_sid: str = source_id) -> None:
if sid not in report["sources"]:
report["sources"][sid] = info

con = duckdb.connect()
con.register("inventory_df", df)
con.execute("CREATE TABLE inventory AS SELECT * FROM inventory_df")
con.execute("COPY inventory TO ? (FORMAT PARQUET)", [str(out_parquet)])
con.close()
with safe_connect() as con:
con.register("inventory_df", df)
con.execute("CREATE TABLE inventory AS SELECT * FROM inventory_df")
con.execute("COPY inventory TO ? (FORMAT PARQUET)", [str(out_parquet)])

with out_report.open("w", encoding="utf-8") as fh:
json.dump(report, fh, indent=2, ensure_ascii=False)
Expand Down