-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmanager.py
More file actions
47 lines (39 loc) · 1.8 KB
/
manager.py
File metadata and controls
47 lines (39 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from .channels import DeliveryChannel, DesktopChannel, QueuedFileChannel
from .models import DeliveryMessage, DeliveryReceipt
class DeliveryGateway:
def __init__(self, root_dir: str | Path = "sessions/delivery") -> None:
self.root_dir = Path(root_dir)
self._channels: dict[str, DeliveryChannel] = {}
self.register(DesktopChannel(root_dir=self.root_dir))
self.register(QueuedFileChannel("heartbeat", root_dir=self.root_dir))
self.register(QueuedFileChannel("telegram", root_dir=self.root_dir))
def register(self, channel: DeliveryChannel) -> None:
self._channels[str(channel.name)] = channel
def list_channels(self) -> list[str]:
return sorted(self._channels.keys())
def deliver(self, channel_name: str, message: DeliveryMessage) -> DeliveryReceipt:
key = str(channel_name or "").strip().lower()
if key not in self._channels:
raise ValueError(f"Unknown delivery channel: {channel_name}")
return self._channels[key].deliver(message)
def list_messages(self, channel_name: str, *, box: str = "outbox", limit: int = 20) -> list[dict[str, Any]]:
path = self.root_dir / str(channel_name or "").strip().lower() / f"{box}.jsonl"
if not path.exists():
return []
rows = []
with path.open("r", encoding="utf-8") as handle:
for line in handle:
raw = line.strip()
if not raw:
continue
try:
row = json.loads(raw)
except Exception:
continue
if isinstance(row, dict):
rows.append(row)
return rows[-max(1, int(limit or 20)) :]