diff --git a/astroml/features/graph/snapshot.py b/astroml/features/graph/snapshot.py index 45ec817..bc115bf 100644 --- a/astroml/features/graph/snapshot.py +++ b/astroml/features/graph/snapshot.py @@ -1,7 +1,8 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Iterable, List, Sequence, Set, Tuple +from datetime import datetime, timedelta, timezone +from typing import Generator, Iterable, List, Optional, Sequence, Set, Tuple import bisect @@ -86,3 +87,119 @@ def snapshot_last_n_days( if start_ts < 0: start_ts = 0 return window_snapshot(edges, start_ts, now_ts, presorted=presorted) + + +# --------------------------------------------------------------------------- +# DB-backed time-windowed snapshot slicer +# --------------------------------------------------------------------------- + +@dataclass(frozen=True) +class SnapshotWindow: + """A discrete time window slice ready for training.""" + index: int # 0-based window index (t_0, t_1, …, t_now) + start: datetime + end: datetime + edges: List[Edge] + nodes: Set[str] + + +def _parse_window_size(window: str) -> timedelta: + """Parse a window size string like '7d', '24h', '3600s' into a timedelta.""" + unit = window[-1].lower() + value = int(window[:-1]) + if unit == "d": + return timedelta(days=value) + if unit == "h": + return timedelta(hours=value) + if unit == "s": + return timedelta(seconds=value) + raise ValueError(f"Unknown window unit '{unit}'. Use 'd', 'h', or 's'.") + + +def iter_db_snapshots( + window: str = "7d", + t0: Optional[datetime] = None, + t_now: Optional[datetime] = None, + step: Optional[str] = None, + session=None, +) -> Generator[SnapshotWindow, None, None]: + """Yield discrete time-windowed graph snapshots from the database. + + Slices ``normalized_transactions`` into non-overlapping (or rolling) + windows from ``t0`` to ``t_now``, each of size ``window``. + + Args: + window: Window size string, e.g. ``'7d'``, ``'24h'``, ``'3600s'``. + t0: Start of the first window. Defaults to the earliest timestamp in DB. + t_now: End of the last window. Defaults to ``datetime.now(UTC)``. + step: Slide step between windows (defaults to ``window`` for non-overlapping). + Set smaller than ``window`` for rolling windows. + session: SQLAlchemy session. If None, one is created via ``get_session()``. + + Yields: + :class:`SnapshotWindow` instances in chronological order. + """ + from astroml.db.schema import NormalizedTransaction + from sqlalchemy import select, func as sqlfunc + + if session is None: + from astroml.db.session import get_session + session = get_session() + + win_delta = _parse_window_size(window) + step_delta = _parse_window_size(step) if step else win_delta + + if t_now is None: + t_now = datetime.now(timezone.utc) + + if t0 is None: + result = session.execute( + select(sqlfunc.min(NormalizedTransaction.timestamp)) + ).scalar() + if result is None: + return # empty DB + t0 = result if result.tzinfo else result.replace(tzinfo=timezone.utc) + + if t_now.tzinfo is None: + t_now = t_now.replace(tzinfo=timezone.utc) + if t0.tzinfo is None: + t0 = t0.replace(tzinfo=timezone.utc) + + window_start = t0 + index = 0 + + while window_start < t_now: + window_end = min(window_start + win_delta, t_now) + + rows = session.execute( + select( + NormalizedTransaction.sender, + NormalizedTransaction.receiver, + NormalizedTransaction.timestamp, + ).where( + NormalizedTransaction.timestamp >= window_start, + NormalizedTransaction.timestamp <= window_end, + NormalizedTransaction.receiver.isnot(None), + NormalizedTransaction.sender != NormalizedTransaction.receiver, + ).order_by(NormalizedTransaction.timestamp) + ).all() + + edges = [ + Edge(src=r.sender, dst=r.receiver, timestamp=int(r.timestamp.timestamp())) + for r in rows + ] + nodes: Set[str] = set() + for e in edges: + nodes.add(e.src) + nodes.add(e.dst) + + yield SnapshotWindow( + index=index, + start=window_start, + end=window_end, + edges=edges, + nodes=nodes, + ) + + window_start += step_delta + index += 1 diff --git a/astroml/features/transaction_graph.py b/astroml/features/transaction_graph.py index b0c4f42..50048f0 100644 --- a/astroml/features/transaction_graph.py +++ b/astroml/features/transaction_graph.py @@ -30,7 +30,10 @@ def add_transaction( metadata: Optional[Dict[str, Any]] = None ) -> None: """Add a transaction edge to the graph. - + + Self-loop edges (from_account == to_account) are silently dropped to + prevent infinite loops during graph traversal. + Args: from_account: Source account identifier to_account: Destination account identifier @@ -38,6 +41,8 @@ def add_transaction( asset: Asset type (e.g., 'USD', 'BTC', 'ETH') metadata: Optional transaction metadata """ + if from_account == to_account: + return self.nodes.add(from_account) self.nodes.add(to_account) diff --git a/astroml/ingestion/normalizer.py b/astroml/ingestion/normalizer.py index e5dbda9..34b808c 100644 --- a/astroml/ingestion/normalizer.py +++ b/astroml/ingestion/normalizer.py @@ -5,33 +5,38 @@ from astroml.db.schema import NormalizedTransaction from astroml.ingestion.parsers import ( + _PATH_PAYMENT_TYPES, _extract_amount, _extract_asset, _extract_destination, _parse_datetime, + extract_path_payment_hops, ) def normalize_operation(data: dict) -> NormalizedTransaction: - """Transform raw horizon operation data into a NormalizedTransaction.""" + """Transform raw horizon operation data into a NormalizedTransaction. + + For path payments use :func:`normalize_path_payment_hops` instead to + get one record per hop. + """ op_type = data["type"] sender = data["source_account"] receiver = _extract_destination(data, op_type) - + amount_str = _extract_amount(data) amount = float(amount_str) if amount_str is not None else None - + asset_code, asset_issuer = _extract_asset(data) - + if asset_code == "XLM" and asset_issuer is None: normalized_asset = "XLM" else: - # Default fallback to "UNKNOWN" if no explicit asset info is found normalized_asset = f"{asset_code}:{asset_issuer}" if asset_code and asset_issuer else "UNKNOWN" timestamp = _parse_datetime(data["created_at"]) transaction_hash = data["transaction_hash"] - + return NormalizedTransaction( transaction_hash=transaction_hash, sender=sender, @@ -40,3 +45,32 @@ def normalize_operation(data: dict) -> NormalizedTransaction: amount=amount, timestamp=timestamp, ) + + +def normalize_path_payment_hops(data: dict) -> list[NormalizedTransaction]: + """Return one NormalizedTransaction per hop for a path payment operation. + + Falls back to a single record (via :func:`normalize_operation`) for + non-path-payment types so callers can use this function uniformly. + """ + if data.get("type") not in _PATH_PAYMENT_TYPES: + return [normalize_operation(data)] + + hops = extract_path_payment_hops(data) + if not hops: + return [normalize_operation(data)] + + timestamp = _parse_datetime(data["created_at"]) + transaction_hash = data["transaction_hash"] + + return [ + NormalizedTransaction( + transaction_hash=f"{transaction_hash}_hop{hop['hop_index']}", + sender=hop["from_account"], + receiver=hop["to_account"], + asset=hop["asset"], + amount=hop["amount"], + timestamp=timestamp, + ) + for hop in hops + ] diff --git a/astroml/ingestion/parsers.py b/astroml/ingestion/parsers.py index 49b6d47..e8a89c5 100644 --- a/astroml/ingestion/parsers.py +++ b/astroml/ingestion/parsers.py @@ -145,6 +145,11 @@ def _extract_amount(data: dict) -> Optional[str]: return data["amount"] if "starting_balance" in data: return data["starting_balance"] + # For path payments: prefer destination_amount (what receiver gets) + if "destination_amount" in data: + return data["destination_amount"] + if "source_amount" in data: + return data["source_amount"] return None @@ -154,3 +159,70 @@ def _extract_asset(data: dict) -> tuple[Optional[str], Optional[str]]: if asset_type == "native": return ("XLM", None) return (data.get("asset_code"), data.get("asset_issuer")) + + +def extract_path_payment_hops(data: dict) -> list[dict]: + """Decompose a path payment into ordered per-hop dicts. + + Each hop dict has keys: from_account, to_account, asset_code, + asset_issuer, amount, hop_index, is_first_hop, is_last_hop. + + Returns an empty list for non-path-payment operations. + """ + if data.get("type") not in _PATH_PAYMENT_TYPES: + return [] + + sender = data["source_account"] + receiver = _extract_destination(data, data["type"]) + path = data.get("path", []) # intermediate assets + + # Build asset chain: [source_asset, ...path_assets..., dest_asset] + def _asset_str(asset_dict: dict) -> str: + if asset_dict.get("asset_type") == "native": + return "XLM" + code = asset_dict.get("asset_code", "UNKNOWN") + issuer = asset_dict.get("asset_issuer", "") + return f"{code}:{issuer}" if issuer else code + + src_asset_type = data.get("source_asset_type", data.get("asset_type", "")) + if src_asset_type == "native": + src_asset = "XLM" + else: + src_code = data.get("source_asset_code", data.get("asset_code", "UNKNOWN")) + src_issuer = data.get("source_asset_issuer", data.get("asset_issuer", "")) + src_asset = f"{src_code}:{src_issuer}" if src_issuer else src_code + + dst_asset_type = data.get("asset_type", "") + if dst_asset_type == "native": + dst_asset = "XLM" + else: + dst_code = data.get("asset_code", "UNKNOWN") + dst_issuer = data.get("asset_issuer", "") + dst_asset = f"{dst_code}:{dst_issuer}" if dst_issuer else dst_code + + path_assets = [_asset_str(p) for p in path] + asset_chain = [src_asset] + path_assets + [dst_asset] + + # Amounts: source_amount on first hop, destination_amount on last hop, + # None for intermediate hops (not exposed by Horizon). + src_amount = data.get("source_amount") + dst_amount = data.get("destination_amount", data.get("amount")) + + # Intermediate accounts are not exposed by Horizon; use sentinel "__path__" + # so the graph builder can distinguish them from real accounts. + n_hops = len(asset_chain) - 1 + hops = [] + for i in range(n_hops): + from_acc = sender if i == 0 else f"__path__{data['transaction_hash']}_{i}" + to_acc = receiver if i == n_hops - 1 else f"__path__{data['transaction_hash']}_{i + 1}" + amount = src_amount if i == 0 else (dst_amount if i == n_hops - 1 else None) + hops.append({ + "from_account": from_acc, + "to_account": to_acc, + "asset": asset_chain[i], + "amount": float(amount) if amount is not None else None, + "hop_index": i, + "is_first_hop": i == 0, + "is_last_hop": i == n_hops - 1, + }) + return hops