Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@
"stt.stt_tt_new_parse_ninjs",
"stt.io.feed_parsers.stt_events_csv_parse",
"stt.io.feeding_services.stt_http_with_since",
"stt.io.feeding_services.stt_tt_content_api",
"stt.io.feed_parsers.stt_tt_parse_content_api",
]

MODULES.append("planning")
Expand Down
3 changes: 3 additions & 0 deletions server/stt/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import logging

logging.basicConfig(level=logging.INFO)
81 changes: 53 additions & 28 deletions server/stt/io/feed_parsers/stt_parse_content_api.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

import logging
import hashlib
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from urllib.parse import unquote, urlparse

from dateutil import parser as dtparse
from superdesk.io.feed_parsers import FeedParser
Expand Down Expand Up @@ -51,6 +51,44 @@ def _to_int_or_none(v: Any) -> Optional[int]:
return None


GUID_PREFIX = "urn:newsml:stt.fi:contentapi:"
SOURCE_PREFIX = "urn:newsml:stt.fi:"


def _guid_from_value(value: Any) -> Optional[str]:
"""Return a normalized STT Content API GUID or None when value empty."""
if value is None:
return None
if isinstance(value, bytes):
candidate = value.decode("utf-8", errors="ignore").strip()
else:
candidate = str(value).strip()
if not candidate:
return None

if candidate.startswith(GUID_PREFIX):
return candidate

if candidate.startswith(SOURCE_PREFIX):
suffix = candidate[len(SOURCE_PREFIX) :].lstrip(":")
if not suffix:
suffix = hashlib.sha1(candidate.encode("utf-8")).hexdigest()
return f"{GUID_PREFIX}{suffix}"

if candidate.startswith(("http://", "https://")):
parsed = urlparse(candidate)
last_segment = parsed.path.rsplit("/", 1)[-1] or parsed.path.strip("/")
if last_segment:
decoded = unquote(last_segment)
guid = _guid_from_value(decoded)
if guid:
return guid
candidate = f"{parsed.netloc}{parsed.path}" or candidate

digest = hashlib.sha1(candidate.encode("utf-8")).hexdigest()
return f"{GUID_PREFIX}{digest}"


class ContentAPIItemParser(FeedParser):
NAME = "content_api_json"
label = "STT Content API"
Expand Down Expand Up @@ -135,10 +173,6 @@ def _parse_one(
# Apply default fields and normalize headline/body
self._apply_defaults(processed)

# Generate GUID if missing
if not processed.get("guid"):
processed["guid"] = self._ensure_guid(processed)

# Normalize all known timestamp fields
for tf in ("versioncreated", "firstcreated", "_updated", "_created"):
if processed.get(tf):
Expand Down Expand Up @@ -181,7 +215,7 @@ def _parse_one(
if not headline and not body_html:
logger.info(
"Skipping item without meaningful content: %s",
processed.get("guid", "unknown"),
processed.get("uri", "unknown"),
)
return None

Expand All @@ -204,27 +238,18 @@ def _apply_defaults(self, item: Dict[str, Any]) -> None:
item.get("headline") or item.get("name") or item.get("title") or "",
)
item.setdefault("body_html", item.get("body_html") or "")

def _ensure_guid(self, item: Dict[str, Any]) -> str:
uri = (
item.get("uri")
or item.get("guid")
or item.get("original_id")
or item.get("_id")
)
if isinstance(uri, (str, int)):
s = str(uri)
# If it's already a URN, preserve it
if s.startswith("urn:"):
return s
# Otherwise generate a new URN with our namespace
return f"urn:newsml:stt.fi:contentapi:{hashlib.sha256(s.encode('utf-8')).hexdigest()}"
try:
blob = json.dumps(item, ensure_ascii=False, sort_keys=True)
h = hashlib.sha256(blob.encode("utf-8")).hexdigest()
return f"urn:newsml:stt.fi:contentapi:{h}"
except Exception:
return f"urn:newsml:stt.fi:contentapi:{uuid.uuid4()}"
guid = _guid_from_value(item.get("guid"))
if not guid:
for key in ("uri", "original_id", "coverage_id", "_id", "id"):
guid = _guid_from_value(item.get(key))
if guid:
break
if not guid:
serialized = json.dumps(
item, sort_keys=True, default=str, separators=(",", ":")
)
guid = _guid_from_value(serialized)
item["guid"] = guid

def _normalize_timestamp(self, value: Any) -> Optional[datetime]:
"""Normalize timestamps to tz-aware datetime (UTC)."""
Expand Down
100 changes: 100 additions & 0 deletions server/stt/io/feed_parsers/stt_tt_parse_content_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional
import inspect

from superdesk.io.registry import register_feed_parser
from .stt_parse_content_api import ContentAPIItemParser

logger = logging.getLogger(__name__)


class ContentAPITTItemParser(ContentAPIItemParser):
NAME = "stt_tt_parse_content_api"
label = "STT TT Content API"

async def parse(
self, item: Any, provider: Optional[dict] = None
) -> List[Dict[str, Any]]:
"""
TT-specific parse method for single item or list processing by the
feeding service. This MUST return a List[Dict] to comply with Superdesk
ingest expectations. Async to match the base class contract.
"""
provider = provider or {}

# Helper to handle sync/async _parse_one uniformly
async def _parse_one_maybe_async(
elem: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
result = self._parse_one(elem, provider)
if inspect.isawaitable(result):
result = await result
if isinstance(result, dict) and result:
return result
return None

# Case 1: payload is a dict - parse one and return a single-item list
if isinstance(item, dict):
parsed = await _parse_one_maybe_async(item)
return [parsed] if parsed else []

# Case 2: payload is a list - parse each dict item, ignore non-dicts
if isinstance(item, list):
results: List[Dict[str, Any]] = []
for idx, elem in enumerate(item):
if not isinstance(elem, dict):
continue
parsed = await _parse_one_maybe_async(elem)
if parsed is not None:
results.append(parsed)
return results
return []

# ------------------------ TT-specific overrides -------------------------
def _parse_one(self, src: Dict[str, Any], provider: dict) -> Dict[str, Any]:
"""
TT-specific parsing that extends base class functionality.
Adds TT-specific preprocessing and uses custom GUID generation.
"""
if not isinstance(src, dict):
logger.error("TT Parser received non-dict source: %s", type(src))
return {}

# TT-specific: Remove MongoDB incompatible keys first
cleaned_src = {k: v for k, v in src.items() if not k.startswith("$")}

# Use base class parsing for most functionality
processed = super()._parse_one(cleaned_src, provider)

# Validate base class returned proper dict
if not processed:
return {}

if not isinstance(processed, dict):
logger.error(
"Base class _parse_one returned non-dict: type=%s, value=%s",
type(processed),
processed,
)
return {}

# TT-specific: Additional body_html fallbacks
if not processed.get("body_html"):
processed["body_html"] = (
processed.get("body_html5") or processed.get("body_richhtml5") or ""
)

body_html = processed.get("body_html")
if not isinstance(body_html, str):
processed["body_html"] = ""
else:
processed["body_html"] = body_html or ""
# Guarantee downstream consumers always receive a GUID
return processed


# Register like BusinessWire example: parse() returns List[Dict[str, Any]]
register_feed_parser(ContentAPITTItemParser.NAME, ContentAPITTItemParser())
3 changes: 2 additions & 1 deletion server/stt/io/feeding_services/stt_content_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ def _safe_json(self, response, provider) -> Any:
response.status_code,
response.headers.get("content-type", "unknown"),
)
raise IngestApiError.apiGeneralError(f"JSON parse error: {ex}", provider)
parse_error = Exception(f"JSON parse error: {ex}")
raise IngestApiError.apiGeneralError(parse_error, provider)

def _extract_batch(self, data: Any) -> List[Dict]:
if isinstance(data, list):
Expand Down
Loading