Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion astroml/features/graph/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, List, Sequence, Set, Tuple
from datetime import datetime, timedelta, timezone
from typing import Generator, Iterable, List, Optional, Sequence, Set, Tuple
import bisect


Expand Down Expand Up @@ -86,3 +87,119 @@ def snapshot_last_n_days(
if start_ts < 0:
start_ts = 0
return window_snapshot(edges, start_ts, now_ts, presorted=presorted)


# ---------------------------------------------------------------------------
# DB-backed time-windowed snapshot slicer
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class SnapshotWindow:
"""A discrete time window slice ready for training."""
index: int # 0-based window index (t_0, t_1, …, t_now)
start: datetime
end: datetime
edges: List[Edge]
nodes: Set[str]


def _parse_window_size(window: str) -> timedelta:
"""Parse a window size string like '7d', '24h', '3600s' into a timedelta."""
unit = window[-1].lower()
value = int(window[:-1])
if unit == "d":
return timedelta(days=value)
if unit == "h":
return timedelta(hours=value)
if unit == "s":
return timedelta(seconds=value)
raise ValueError(f"Unknown window unit '{unit}'. Use 'd', 'h', or 's'.")


def iter_db_snapshots(
window: str = "7d",
t0: Optional[datetime] = None,
t_now: Optional[datetime] = None,
step: Optional[str] = None,
session=None,
) -> Generator[SnapshotWindow, None, None]:
"""Yield discrete time-windowed graph snapshots from the database.

Slices ``normalized_transactions`` into non-overlapping (or rolling)
windows from ``t0`` to ``t_now``, each of size ``window``.

Args:
window: Window size string, e.g. ``'7d'``, ``'24h'``, ``'3600s'``.
t0: Start of the first window. Defaults to the earliest timestamp in DB.
t_now: End of the last window. Defaults to ``datetime.now(UTC)``.
step: Slide step between windows (defaults to ``window`` for non-overlapping).
Set smaller than ``window`` for rolling windows.
session: SQLAlchemy session. If None, one is created via ``get_session()``.

Yields:
:class:`SnapshotWindow` instances in chronological order.
"""
from astroml.db.schema import NormalizedTransaction
from sqlalchemy import select, func as sqlfunc

if session is None:
from astroml.db.session import get_session
session = get_session()

win_delta = _parse_window_size(window)
step_delta = _parse_window_size(step) if step else win_delta

if t_now is None:
t_now = datetime.now(timezone.utc)

if t0 is None:
result = session.execute(
select(sqlfunc.min(NormalizedTransaction.timestamp))
).scalar()
if result is None:
return # empty DB
t0 = result if result.tzinfo else result.replace(tzinfo=timezone.utc)

if t_now.tzinfo is None:
t_now = t_now.replace(tzinfo=timezone.utc)
if t0.tzinfo is None:
t0 = t0.replace(tzinfo=timezone.utc)

window_start = t0
index = 0

while window_start < t_now:
window_end = min(window_start + win_delta, t_now)

rows = session.execute(
select(
NormalizedTransaction.sender,
NormalizedTransaction.receiver,
NormalizedTransaction.timestamp,
).where(
NormalizedTransaction.timestamp >= window_start,
NormalizedTransaction.timestamp <= window_end,
NormalizedTransaction.receiver.isnot(None),
NormalizedTransaction.sender != NormalizedTransaction.receiver,
).order_by(NormalizedTransaction.timestamp)
).all()

edges = [
Edge(src=r.sender, dst=r.receiver, timestamp=int(r.timestamp.timestamp()))
for r in rows
]
nodes: Set[str] = set()
for e in edges:
nodes.add(e.src)
nodes.add(e.dst)

yield SnapshotWindow(
index=index,
start=window_start,
end=window_end,
edges=edges,
nodes=nodes,
)

window_start += step_delta
index += 1
7 changes: 6 additions & 1 deletion astroml/features/transaction_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,19 @@ def add_transaction(
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Add a transaction edge to the graph.


Self-loop edges (from_account == to_account) are silently dropped to
prevent infinite loops during graph traversal.

Args:
from_account: Source account identifier
to_account: Destination account identifier
amount: Transaction amount (weight)
asset: Asset type (e.g., 'USD', 'BTC', 'ETH')
metadata: Optional transaction metadata
"""
if from_account == to_account:
return
self.nodes.add(from_account)
self.nodes.add(to_account)

Expand Down
46 changes: 40 additions & 6 deletions astroml/ingestion/normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,38 @@

from astroml.db.schema import NormalizedTransaction
from astroml.ingestion.parsers import (
_PATH_PAYMENT_TYPES,
_extract_amount,
_extract_asset,
_extract_destination,
_parse_datetime,
extract_path_payment_hops,
)


def normalize_operation(data: dict) -> NormalizedTransaction:
"""Transform raw horizon operation data into a NormalizedTransaction."""
"""Transform raw horizon operation data into a NormalizedTransaction.

For path payments use :func:`normalize_path_payment_hops` instead to
get one record per hop.
"""
op_type = data["type"]
sender = data["source_account"]
receiver = _extract_destination(data, op_type)

amount_str = _extract_amount(data)
amount = float(amount_str) if amount_str is not None else None

asset_code, asset_issuer = _extract_asset(data)

if asset_code == "XLM" and asset_issuer is None:
normalized_asset = "XLM"
else:
# Default fallback to "UNKNOWN" if no explicit asset info is found
normalized_asset = f"{asset_code}:{asset_issuer}" if asset_code and asset_issuer else "UNKNOWN"

timestamp = _parse_datetime(data["created_at"])
transaction_hash = data["transaction_hash"]

return NormalizedTransaction(
transaction_hash=transaction_hash,
sender=sender,
Expand All @@ -40,3 +45,32 @@ def normalize_operation(data: dict) -> NormalizedTransaction:
amount=amount,
timestamp=timestamp,
)


def normalize_path_payment_hops(data: dict) -> list[NormalizedTransaction]:
"""Return one NormalizedTransaction per hop for a path payment operation.

Falls back to a single record (via :func:`normalize_operation`) for
non-path-payment types so callers can use this function uniformly.
"""
if data.get("type") not in _PATH_PAYMENT_TYPES:
return [normalize_operation(data)]

hops = extract_path_payment_hops(data)
if not hops:
return [normalize_operation(data)]

timestamp = _parse_datetime(data["created_at"])
transaction_hash = data["transaction_hash"]

return [
NormalizedTransaction(
transaction_hash=f"{transaction_hash}_hop{hop['hop_index']}",
sender=hop["from_account"],
receiver=hop["to_account"],
asset=hop["asset"],
amount=hop["amount"],
timestamp=timestamp,
)
for hop in hops
]
72 changes: 72 additions & 0 deletions astroml/ingestion/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ def _extract_amount(data: dict) -> Optional[str]:
return data["amount"]
if "starting_balance" in data:
return data["starting_balance"]
# For path payments: prefer destination_amount (what receiver gets)
if "destination_amount" in data:
return data["destination_amount"]
if "source_amount" in data:
return data["source_amount"]
return None


Expand All @@ -154,3 +159,70 @@ def _extract_asset(data: dict) -> tuple[Optional[str], Optional[str]]:
if asset_type == "native":
return ("XLM", None)
return (data.get("asset_code"), data.get("asset_issuer"))


def extract_path_payment_hops(data: dict) -> list[dict]:
"""Decompose a path payment into ordered per-hop dicts.

Each hop dict has keys: from_account, to_account, asset_code,
asset_issuer, amount, hop_index, is_first_hop, is_last_hop.

Returns an empty list for non-path-payment operations.
"""
if data.get("type") not in _PATH_PAYMENT_TYPES:
return []

sender = data["source_account"]
receiver = _extract_destination(data, data["type"])
path = data.get("path", []) # intermediate assets

# Build asset chain: [source_asset, ...path_assets..., dest_asset]
def _asset_str(asset_dict: dict) -> str:
if asset_dict.get("asset_type") == "native":
return "XLM"
code = asset_dict.get("asset_code", "UNKNOWN")
issuer = asset_dict.get("asset_issuer", "")
return f"{code}:{issuer}" if issuer else code

src_asset_type = data.get("source_asset_type", data.get("asset_type", ""))
if src_asset_type == "native":
src_asset = "XLM"
else:
src_code = data.get("source_asset_code", data.get("asset_code", "UNKNOWN"))
src_issuer = data.get("source_asset_issuer", data.get("asset_issuer", ""))
src_asset = f"{src_code}:{src_issuer}" if src_issuer else src_code

dst_asset_type = data.get("asset_type", "")
if dst_asset_type == "native":
dst_asset = "XLM"
else:
dst_code = data.get("asset_code", "UNKNOWN")
dst_issuer = data.get("asset_issuer", "")
dst_asset = f"{dst_code}:{dst_issuer}" if dst_issuer else dst_code

path_assets = [_asset_str(p) for p in path]
asset_chain = [src_asset] + path_assets + [dst_asset]

# Amounts: source_amount on first hop, destination_amount on last hop,
# None for intermediate hops (not exposed by Horizon).
src_amount = data.get("source_amount")
dst_amount = data.get("destination_amount", data.get("amount"))

# Intermediate accounts are not exposed by Horizon; use sentinel "__path__"
# so the graph builder can distinguish them from real accounts.
n_hops = len(asset_chain) - 1
hops = []
for i in range(n_hops):
from_acc = sender if i == 0 else f"__path__{data['transaction_hash']}_{i}"
to_acc = receiver if i == n_hops - 1 else f"__path__{data['transaction_hash']}_{i + 1}"
amount = src_amount if i == 0 else (dst_amount if i == n_hops - 1 else None)
hops.append({
"from_account": from_acc,
"to_account": to_acc,
"asset": asset_chain[i],
"amount": float(amount) if amount is not None else None,
"hop_index": i,
"is_first_hop": i == 0,
"is_last_hop": i == n_hops - 1,
})
return hops
Loading