Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/observatory.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ jobs:
run: |
python scripts/build_catalog_inventory.py \
--out-dir data/catalog_inventory/generated \
--workers 8
--workers 8 \
--skip-red-sources

- name: Build catalog signals + diff
run: |
Expand Down Expand Up @@ -146,7 +147,7 @@ jobs:
python scripts/bulk_source_check.py \
--skip-red-sources \
--no-sdmx-years \
--max-items 500 \
--max-items 1000 \
--workers 8

# ═══════════════════════════════════════════════════════════════════
Expand Down
147 changes: 36 additions & 111 deletions scripts/build_catalog_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from concurrent.futures import ThreadPoolExecutor, wait
from pathlib import Path
from typing import Any, cast

Expand Down Expand Up @@ -98,98 +98,6 @@ def _run() -> None:
res = _result[0]
return source_id, res.rows, res.warning, res.summary, None


# Formati per cui vale la pena fare sniff leggero del file dati
_SNIFFABLE_FORMATS = {"csv", "xlsx", "xls", "tsv"}


def _is_sniffable(format_str: Any) -> bool:
"""True se format contiene uno dei formati sniffabili (es. 'csv', 'csv,xml')."""
if not isinstance(format_str, str):
return False
return any(f in format_str.lower() for f in _SNIFFABLE_FORMATS)


def _sniff_csv_rows(rows: list[dict[str, Any]], logger: logging.Logger) -> None:
"""Lightweight CSV sniff per item con URL diretto a file dati.

Scarica primi ~10KB, sniffa encoding/delim/decimal/skip con
``toolkit.profile.raw.sniff_source_file`` (puro Python, nessun DuckDB).
I risultati aggiornano direttamente le righe (encoding_suggested, ecc.).

Costo: ~0.5s per download + ~0.02s per sniff. Skip per formati non sniffabili
o URL non HTTP.
"""
from pathlib import Path
import tempfile

from lab_connectors.http import HttpClient
from toolkit.profile.raw import sniff_source_file

# Filtra righe con formato sniffabile e URL diretto
targets = [
(i, row) for i, row in enumerate(rows)
if _is_sniffable(row.get("format"))
and isinstance(row.get("distribution_url"), str)
and row["distribution_url"].startswith("http")
]
if not targets:
return

# Nessun limite — il timeout 5s per download e 8 workers rendono il costo
# irrisorio (~0.5s per item con 8 workers). Il vero rallentamento era il
# timeout 15s sulle pagine HTML, fixato separatamente.
logger.info(" sniff CSV: %d items", len(targets))
sniffed = 0

def _infer_sniff_ext(dist_url: str) -> str:
"""Inferisce estensione per tempfile dall'URL (gestisce query string)."""
from urllib.parse import urlparse
ext = Path(urlparse(dist_url).path).suffix.lower()
return ext if ext in (".csv", ".tsv", ".xlsx", ".xls") else ".csv"

def _sniff_one(dist_url: str) -> dict[str, Any]:
client = HttpClient(timeout=(3, 5))
fetch = client.get(dist_url, headers={"Range": "bytes=0-10239"})
if not fetch.is_ok or fetch.response is None or fetch.response.status_code >= 400:
return {}
content = fetch.response.content[:10 * 1024] # 10KB max
with tempfile.NamedTemporaryFile(suffix=_infer_sniff_ext(dist_url), delete=False) as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
try:
sniff = sniff_source_file(tmp_path)
return {
"encoding_suggested": sniff.get("encoding_suggested"),
"delim_suggested": sniff.get("delim_suggested"),
"decimal_suggested": sniff.get("decimal_suggested"),
"skip_suggested": sniff.get("skip_suggested", 0),
}
finally:
tmp_path.unlink(missing_ok=True)

_SNIFF_BATCH_TIMEOUT = 300 # 5 min per batch sniff CSV (matcha _SOURCE_TIMEOUT)
pool = ThreadPoolExecutor(max_workers=8)
try:
fut_to_idx = {pool.submit(_sniff_one, row["distribution_url"]): idx for idx, row in targets}
for fut in as_completed(fut_to_idx, timeout=_SNIFF_BATCH_TIMEOUT):
idx = fut_to_idx[fut]
try:
result = fut.result()
if result and result.get("encoding_suggested"):
rows[idx].update(result)
sniffed += 1
except Exception:
pass
except TimeoutError:
logger.warning(" sniff CSV timeout after %ds (%d/%d processed)",
_SNIFF_BATCH_TIMEOUT, sniffed, len(targets))
finally:
pool.shutdown(wait=False, cancel_futures=True)

logger.info(" sniff CSV OK: %d/%d", sniffed, len(targets))


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Costruisce il catalog inventory derivato dal registry di source-observatory."
Expand All @@ -214,6 +122,12 @@ def parse_args() -> argparse.Namespace:
metavar="SOURCE_ID",
help="Limita il build a queste source_id (spazio-separato).",
)
parser.add_argument(
"--skip-red-sources",
action="store_true",
default=False,
help="Skip fonti con status RED in radar_summary.json (evita timeout su fonti down).",
)
return parser.parse_args()


Expand Down Expand Up @@ -267,6 +181,25 @@ def main() -> None:

inventoriable.append((source_id, source_cfg))

# ── Skip RED sources da radar ─────────────────────────────────────────────
if args.skip_red_sources:
radar_path = REPO_ROOT / "data" / "radar" / "radar_summary.json"
if radar_path.exists():
try:
with radar_path.open() as fh:
radar = json.load(fh)
red_ids = [s["id"] for s in radar.get("sources", []) if s.get("status") == "RED"]
if red_ids:
before = len(inventoriable)
inventoriable = [(sid, cfg) for sid, cfg in inventoriable if sid not in red_ids]
skipped = before - len(inventoriable)
if skipped:
print(f" skip RED sources (radar): {red_ids} — {skipped} fonti saltate", file=sys.stderr)
except Exception as exc:
print(f" skip-red-sources: cannot read radar_summary: {exc}", file=sys.stderr)
else:
print(" skip-red-sources: radar_summary.json not found", file=sys.stderr)

collected: dict[str, tuple[list[dict[str, Any]], dict[str, Any] | None, dict[str, Any] | None, Exception | None]] = {}
source_timing: dict[str, float] = {}
executor = ThreadPoolExecutor(max_workers=args.workers)
Expand Down Expand Up @@ -296,10 +229,10 @@ def _record_timing(_sid: str = source_id) -> None:
logger.warning("Source %s non completato entro %ds (batch timeout), treat as failed", sid, _BATCH_TIMEOUT)
collected[sid] = ([], None, None, TimeoutError(f"Batch timeout after {_BATCH_TIMEOUT}s"))
for f in done:
sid, rows, warning, summary, exc = f.result()
sid, rows, warning, summary, err = f.result()
if sid not in source_timing:
source_timing[sid] = time.time() - submit_times[sid]
collected[sid] = (rows, warning, summary, exc)
collected[sid] = (rows, warning, summary, err)
finally:
# shutdown(wait=False) non aspetta task bloccati — il timeout HTTP
# (5s) li terminerà prima o poi, ma non blocca il workflow.
Expand All @@ -325,20 +258,20 @@ def _record_timing(_sid: str = source_id) -> None:
)

for source_id, source_cfg in inventoriable:
rows, warning, summary, exc = collected[source_id]
if exc is not None:
rows, warning, summary, err = collected[source_id]
if err is not None:
# Source failed: preserve existing rows as stale
report["sources"][source_id] = {
"status": "error",
"protocol": source_cfg.get("protocol"),
"error": str(exc),
"error": str(err),
"method": source_cfg.get("catalog_baseline", {}).get("method"),
}
if existing_df is not None:
stale_rows = existing_df[existing_df["source_id"] == source_id].copy()
if not stale_rows.empty:
stale_rows["source_status"] = "stale"
stale_rows["stale_reason"] = stale_reason_from_exception(exc)
stale_rows["stale_reason"] = stale_reason_from_exception(err)
all_rows.extend(cast(list[dict[str, Any]], stale_rows.to_dict(orient="records")))
continue

Expand All @@ -348,14 +281,6 @@ def _record_timing(_sid: str = source_id) -> None:
row["stale_reason"] = None
row["last_successful_fetch"] = captured_at

# Lightweight CSV sniff: per ogni item con URL diretto a file dati,
# scarica ~10KB e sniffa encoding/delim/decimal/skip.
# Non usa DuckDB — solo puro Python, ~0.02s per sniff + ~0.5s per download.
# I risultati (encoding_suggested, delim_suggested, ...) vengono scritti
# direttamente nel catalog inventory, pronti per source-check e scoring.
if rows:
_sniff_csv_rows(rows, logger)

all_rows.extend(rows)

source_report: dict[str, Any] = {
Expand Down Expand Up @@ -425,16 +350,16 @@ def _record_timing(_sid: str = source_id) -> None:
print(f"{'Source':<24} {'Status':<12} {'Items':<8} {'Time':<8} {'Note'}")
print("-" * 72)
for source_id, source_cfg in inventoriable:
rows_count, _warning, _summary, exc = collected[source_id]
rows_count, _warning, _summary, err = collected[source_id]
elapsed = source_timing.get(source_id, 0)
if exc is not None:
err_str = str(exc)
if err is not None:
err_str = str(err)
if "timed out" in err_str.lower():
status = "TIMEOUT"
note = err_str[:70]
else:
status = "ERROR"
note = type(exc).__name__
note = type(err).__name__
else:
status = "OK"
note = f"{len(rows_count)} items" if rows_count else "empty"
Expand Down