-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_bridge.py
More file actions
1257 lines (1083 loc) · 44.9 KB
/
mcp_bridge.py
File metadata and controls
1257 lines (1083 loc) · 44.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""MCP server for agent chat tools — runs alongside the web server.
Serves two transports for compatibility:
- streamable-http on port 8200 (Claude Code, Codex)
- SSE on port 8201 (Gemini)
"""
import json
import os
import time
import logging
import threading
from pathlib import Path
from mcp.server.fastmcp import Context, FastMCP
log = logging.getLogger(__name__)
# Shared state — set by run.py before starting
store = None
rules = None
summaries = None
jobs = None # set by run.py — JobStore instance
room_settings = None # set by run.py — dict with "channels" list etc.
registry = None # set by run.py — RuntimeRegistry instance
config = None # set by run.py — full config.toml dict
router = None # set by run.py — Router instance
agents = None # set by run.py — AgentManager instance
_presence: dict[str, float] = {}
_activity: dict[str, bool] = {} # True = screen changed on last poll
_activity_ts: dict[str, float] = {} # timestamp of last active=True heartbeat
ACTIVITY_TIMEOUT = 8 # auto-expire activity after 8s without a fresh active=True
_presence_lock = threading.Lock() # guards both _presence and _activity
_renamed_from: set[str] = set() # old names from renames — suppress leave messages
_cursors: dict[str, dict[str, int]] = {} # agent_name → {channel_name → last_id}
_cursors_lock = threading.Lock()
_empty_read_count: dict[str, int] = {} # sender → consecutive empty reads
PRESENCE_TIMEOUT = 10 # ~2 missed heartbeats (5s interval) = offline
# Roles — per-instance, persisted to roles.json
_roles: dict[str, str] = {} # agent_name → role string
_ROLES_FILE: Path | None = None
# Cursor persistence — set by run.py to enable saving cursors across restarts
_CURSORS_FILE: Path | None = None
_MCP_INSTRUCTIONS = (
"agentchattr — a shared chat channel for coordinating development between AI agents and humans. "
"Use chat_send to post messages. Use chat_read to check recent messages. "
"Use chat_join when you start a session to announce your presence. "
"Use chat_rules to list or propose shared rules (humans approve via the web UI). "
"Always use your own name as the sender — never impersonate other agents or humans.\n\n"
"CRITICAL — Sender Identity Rules:\n"
"Your BASE agent identity (used for chat_claim and chat_read) is:\n"
' - All Anthropic products (Claude Code, claude-cli, etc.) → base: "claude"\n'
' - All OpenAI products (Codex CLI, codex, chatgpt-cli, etc.) → base: "codex"\n'
' - All Google products (Gemini CLI, gemini-cli, aistudio, etc.) → base: "gemini"\n'
' - Humans use their own name (e.g. "user")\n'
'Do NOT use your CLI tool name (e.g. "gemini-cli", "claude-code") — use the base name above.\n'
'IMPORTANT: When multiple instances run, the server renames slot 1 (e.g. "claude" → "claude-1"). '
"If chat_send rejects your sender, call chat_claim(sender='your_base_name') and use the confirmed_name "
"as your sender for ALL subsequent tool calls. The confirmed_name overrides the base name.\n\n"
"CRITICAL — Identity:\n"
"Always use your base agent name (claude/codex/gemini) as sender. "
"Do NOT call chat_claim on fresh sessions — it is only for "
"recovering a previous identity after /resume.\n\n"
"CRITICAL — Always Respond In Chat:\n"
"When you are addressed in a chat message (@yourname or @all agents), you MUST respond using chat_send "
"in the same channel. NEVER respond only in your terminal/console output. The human and other agents "
"cannot see your terminal — only chat messages are visible to everyone. If you need to do work first, "
"do the work, then post your response/results in chat using chat_send.\n\n"
"CRITICAL — Token-Aware Reading:\n"
"Each chat_read call costs tokens. Default: one read per relevant channel per turn. "
"A second read is fine if you can name the reason (checked a different channel, did work and expect a reply, "
"recovering from an error). After an empty read ('No new messages'), do NOT read the same channel again — "
"stop and wait for your next prompt. Never use chat_read as a sleep/wait loop.\n\n"
"Rules are the shared working style for your agents. They are short imperative instructions that all agents should follow. "
"At session start, call chat_rules(action='list') to read active rules — treat them as authoritative guidance. "
"When you notice a repeated correction, a cross-agent convention, or a preference that should persist, "
"propose it as a rule via chat_rules(action='propose'). Keep rules short and imperative (max 160 chars). "
"Don't propose trivial or session-specific things. chat_decision is an alias for chat_rules (backward compat).\n\n"
"Messages belong to channels (default: 'general'). Use the 'channel' parameter in chat_send and "
"chat_read to target a specific channel. Omit channel or pass empty string to read from all channels.\n\n"
"If you are addressed in chat, respond in chat — use chat_send to reply in the same channel. "
"Do not take the answer back to your terminal session. "
"If the latest message in a channel is addressed to you (or all agents), treat it as your active task "
"and execute it directly. Reading a channel with no task addressed to you is just catching up — no action needed.\n\n"
"Multi-instance support:\n"
"When multiple instances of the same agent run simultaneously, each gets a unique identity.\n"
"The server assigns names like claude-1, claude-2 automatically.\n"
"On /resume, if your conversation history shows you previously used a different name (e.g. 'claude-music'), "
"call chat_claim(sender='your_base_name', name='claude-music') to reclaim it.\n"
"If chat_send rejects your sender with an identity error, call chat_claim first to get your identity.\n\n"
"Summaries are per-channel snapshots that help agents catch up quickly. "
"Use chat_summary(action='read') at session start to get context before reading raw messages. "
"Use chat_summary(action='write', text='...') to update the summary ONLY when:\n"
"- You are explicitly asked via /summary\n"
"- The channel has had 20+ messages since the last summary\n"
"Do NOT update the summary mid-conversation, after trivial exchanges, or when another agent just updated it. "
"Do NOT summarize just because a task was discussed or abandoned — wait for the 20-message threshold or a human request. "
"Keep summaries factual and concise (under 150 words) — focus on decisions made, tasks completed, and open questions.\n\n"
"Jobs are bounded work conversations — like Slack threads with status tracking. "
"When you are triggered with job_id=N, use chat_read(job_id=N) to read the job conversation, "
"then use chat_send(job_id=N, message='...') to reply within it. "
"Job conversations are separate from the main timeline — your response should go to the job, not the channel.\n\n"
"CRITICAL — Proposing Jobs:\n"
"Agents must ONLY propose jobs using chat_propose_job when explicitly asked by the user, OR when the request is a clearly 'scoped task'. "
"A task is scoped if it has: 1) Concrete outcome, 2) Specific boundary, 3) Clear done criteria, 4) Explicit owner/intention, and 5) Appropriate size. "
"If these 5 checks do not pass, do NOT propose a job; instead, reply in chat to ask for clarification. "
"This prevents over-use of the jobs feature for vague requests.\n\n"
"To post a suggestion (Accept/Dismiss card) in a job, prefix your message with [suggestion]: "
"chat_send(job_id=N, message='[suggestion] I recommend we refactor the auth module'). "
"The human can Accept (triggers you with context) or Dismiss."
)
# --- Tool implementations (shared between both servers) ---
def _request_headers(ctx: Context | None):
if ctx is None:
return None
try:
request = ctx.request_context.request
except Exception:
return None
return getattr(request, "headers", None)
def _extract_agent_token(ctx: Context | None) -> str:
headers = _request_headers(ctx)
if not headers:
return ""
auth = headers.get("authorization", "")
if auth and auth.lower().startswith("bearer "):
return auth[7:].strip()
return headers.get("x-agent-token", "").strip()
def _authenticated_instance(ctx: Context | None) -> dict | None:
if not registry:
return None
token = _extract_agent_token(ctx)
if not token:
return None
return registry.resolve_token(token)
def _resolve_tool_identity(
raw_name: str,
ctx: Context | None,
*,
field_name: str,
required: bool = False,
) -> tuple[str, str | None]:
provided = raw_name.strip() if raw_name else ""
token = _extract_agent_token(ctx)
inst = _authenticated_instance(ctx)
if inst:
resolved = inst["name"]
if resolved:
_touch_presence(resolved)
return resolved, None
if token:
return (
"",
"Error: stale or unknown authenticated agent session. Re-register and retry.",
)
if not provided:
if required:
return "", f"Error: {field_name} is required."
return "", None
if registry:
resolved = registry.resolve_name(provided)
if resolved != provided and registry.is_registered(resolved):
provided = resolved
if registry.is_agent_family(provided):
return "", f"Error: authenticated agent session required for '{provided}'."
if provided:
_touch_presence(provided)
return provided, None
def chat_send(
sender: str,
message: str,
image_path: str = "",
reply_to: int = -1,
channel: str = "general",
job_id: int = 0,
ctx: Context | None = None,
) -> str:
"""Send a message to the agentchattr chat. Use your name as sender (claude/codex/user).
Optionally attach a local image by providing image_path (absolute path).
Optionally reply to a message by providing reply_to (message ID).
Optionally specify a channel (default: 'general').
Optionally specify a job_id to post into a job conversation instead of the main timeline."""
sender, err = _resolve_tool_identity(
sender, ctx, field_name="sender", required=True
)
if err:
return err
# Block pending instances (identity not yet confirmed)
if registry and registry.is_pending(sender):
return "Error: identity not confirmed. Call chat_claim(sender=your_base_name) to get your identity."
# Block base family names when multi-instance is active
# (but allow if sender is a registered+active instance — e.g. slot-1 'claude' that already claimed)
if (
registry
and sender in registry.get_bases()
and registry.family_instance_count(sender) >= 2
):
inst = registry.get_instance(sender)
if not inst or inst.get("state") != "active":
return (
f"Error: multiple {sender} instances are registered. "
f"Call chat_claim(sender='{sender}') to get your unique identity, then use the confirmed_name as sender."
)
# Block unregistered agent names (stale identity from resumed session)
if (
registry
and registry.is_agent_family(sender)
and not registry.is_registered(sender)
):
return f"Error: sender '{sender}' is not registered. Call chat_claim(sender=your_base_name) to get your identity."
if not message.strip() and not image_path:
return "Empty message, not sent."
# Job-scoped send: post into a job conversation instead of main timeline
if job_id and jobs:
# Detect suggestion type from [suggestion] prefix
text = message.strip()
msg_type = "chat"
if text.lower().startswith("[suggestion]"):
msg_type = "suggestion"
text = text[len("[suggestion]") :].strip()
# Handle image attachment for job messages
job_attachments = None
if image_path:
import shutil, uuid
src = Path(image_path)
if not src.exists():
return f"Image not found: {image_path}"
if src.suffix.lower() not in (
".png",
".jpg",
".jpeg",
".gif",
".webp",
".bmp",
".svg",
):
return f"Unsupported image type: {src.suffix}"
raw_dir = "./uploads"
if config and "images" in config:
raw_dir = config["images"].get("upload_dir", raw_dir)
upload_dir = Path(raw_dir)
upload_dir.mkdir(parents=True, exist_ok=True)
filename = f"{uuid.uuid4().hex[:8]}{src.suffix}"
shutil.copy2(str(src), str(upload_dir / filename))
job_attachments = [{"name": src.name, "url": f"/uploads/{filename}"}]
msg = jobs.add_message(
job_id, sender, text, msg_type=msg_type, attachments=job_attachments
)
if msg is None:
return f"Error: job #{job_id} not found."
with _presence_lock:
_presence[sender] = time.time()
# Route @mentions in job messages to trigger other agents
if router and agents:
job = jobs.get(job_id)
if job:
job_channel = job.get("channel", "general")
raw_targets = router.get_targets(sender, text, job_channel)
targets = []
for t in raw_targets:
if registry:
targets.extend(registry.resolve_to_instances(t))
else:
targets.append(t)
targets = list(dict.fromkeys(targets))
chat_msg = f"{sender}: {text}" if text else ""
for target in targets:
if registry:
inst = registry.get_instance(target)
if inst and inst.get("state") == "pending":
continue
if agents.is_available(target):
agents.trigger_sync(
target, message=chat_msg, channel=job_channel, job_id=job_id
)
return f"Sent to job #{job_id} (msg_id={msg['id']})" + (
" [suggestion]" if msg_type == "suggestion" else ""
)
attachments = []
if image_path:
import shutil
import uuid
from pathlib import Path
src = Path(image_path)
if not src.exists():
return f"Image not found: {image_path}"
if src.suffix.lower() not in (
".png",
".jpg",
".jpeg",
".gif",
".webp",
".bmp",
".svg",
):
return f"Unsupported image type: {src.suffix}"
# Get upload dir from config (fall back to ./uploads)
raw_dir = "./uploads"
if config and "images" in config:
raw_dir = config["images"].get("upload_dir", raw_dir)
upload_dir = Path(raw_dir)
upload_dir.mkdir(parents=True, exist_ok=True)
filename = f"{uuid.uuid4().hex[:8]}{src.suffix}"
shutil.copy2(str(src), str(upload_dir / filename))
attachments.append({"name": src.name, "url": f"/uploads/{filename}"})
reply_id = reply_to if reply_to >= 0 else None
if reply_id is not None and store.get_by_id(reply_id) is None:
return f"Message #{reply_to} not found."
msg = store.add(
sender,
message.strip(),
attachments=attachments,
reply_to=reply_id,
channel=channel,
)
_update_cursor(sender, [msg], channel)
with _presence_lock:
_presence[sender] = time.time()
return f"Sent (id={msg['id']})"
def chat_thread_list(
channel: str = "",
status: str = "",
ctx: Context | None = None,
) -> str:
"""List threads in a channel. Returns JSON array of thread summaries.
Args:
channel: Filter by channel name (default: all channels).
status: Filter by status - 'open', 'resolved', or omit for all.
"""
import app
from thread_store import build_thread_index, ThreadStore
thread_store: ThreadStore = getattr(app, "thread_state", None)
if not thread_store:
return "Error: thread store not available."
message_store = getattr(app, "store", None)
if not message_store:
return "Error: message store not available."
ch = channel.strip() if channel.strip() else None
status_filter = status.strip() if status.strip() else None
messages = message_store.get_all()
threads = build_thread_index(
messages, thread_store, channel=ch, status=status_filter
)
out = []
for thread in threads:
out.append(
{
"root_id": thread["root_id"],
"channel": thread["channel"],
"owner": thread.get("owner", ""),
"status": thread.get("status", "open"),
"message_count": thread.get("message_count", 0),
"reply_count": thread.get("reply_count", 0),
"participants": thread.get("participants", []),
"updated_at": thread.get("updated_at", 0),
"root_message": thread.get("root_message", {}),
}
)
return json.dumps(out, ensure_ascii=False) if out else "No threads found."
def chat_thread_read(
thread_id: int,
limit: int = 50,
ctx: Context | None = None,
) -> str:
"""Read all messages in a thread by root message ID.
Args:
thread_id: The root message ID of the thread.
limit: Maximum messages to return (default 50).
"""
import app
from thread_store import resolve_thread_root_id, ThreadStore
thread_store: ThreadStore = getattr(app, "thread_state", None)
if not thread_store:
return "Error: thread store not available."
message_store = getattr(app, "store", None)
if not message_store:
return "Error: message store not available."
root_id = int(thread_id)
messages = message_store.get_all()
message_index = {int(m["id"]): m for m in messages}
thread_messages = []
for msg in messages:
msg_root = resolve_thread_root_id(msg, message_index)
if msg_root == root_id:
thread_messages.append(msg)
thread_messages.sort(key=lambda m: m["id"])
thread_messages = thread_messages[-limit:]
thread_state = thread_store.get(root_id)
out = {
"root_id": root_id,
"status": thread_state.get("status", "open") if thread_state else "open",
"owner": thread_state.get("owner", "") if thread_state else "",
"channel": thread_state.get("channel", "general")
if thread_state
else "general",
"messages": [_compact_thread_message(m) for m in thread_messages],
}
return json.dumps(out, ensure_ascii=False)
def _compact_thread_message(message: dict) -> dict:
"""Compact message for thread output."""
entry = {
"id": message["id"],
"sender": message.get("sender", ""),
"text": message.get("text", ""),
"type": message.get("type", "chat"),
"time": message.get("time", ""),
"channel": message.get("channel", "general"),
}
if message.get("reply_to") is not None:
entry["reply_to"] = message["reply_to"]
if message.get("attachments"):
entry["attachments"] = _resolve_attachments(message["attachments"])
return entry
def chat_thread_create(
sender: str,
channel: str,
title: str,
message: str = "",
ctx: Context | None = None,
) -> str:
"""Create a new thread with an initial message.
Args:
sender: Your name (used as message sender).
channel: Channel to create thread in.
title: Thread title (for identification).
message: First message text (optional - can just create thread).
"""
sender, err = _resolve_tool_identity(
sender, ctx, field_name="sender", required=True
)
if err:
return err
import app
from thread_store import ThreadStore
thread_store: ThreadStore = getattr(app, "thread_state", None)
if not thread_store:
return "Error: thread store not available."
message_store = getattr(app, "store", None)
if not message_store:
return "Error: message store not available."
ch = channel.strip() if channel.strip() else "general"
title = title.strip()
msg = message_store.add(
sender, message.strip() if message.strip() else f"Thread: {title}", channel=ch
)
thread_store.update_thread(
msg["id"],
owner=sender,
status="open",
channel=ch,
last_message_id=msg["id"],
)
# Broadcast with owner set (the auto-sync fires before owner is set)
updated = thread_store.get(msg["id"])
if updated:
import asyncio
updated["root_id"] = msg["id"]
if app._event_loop:
asyncio.run_coroutine_threadsafe(
app.broadcast_thread_update(updated), app._event_loop
)
return json.dumps(
{
"root_id": msg["id"],
"channel": ch,
"title": title,
"message_id": msg["id"],
},
ensure_ascii=False,
)
def chat_thread_update(
thread_id: int,
owner: str = "",
status: str = "",
ctx: Context | None = None,
) -> str:
"""Update a thread's owner or status.
Args:
thread_id: The root message ID of the thread.
owner: New thread owner (agent name). Pass empty to leave unchanged.
status: New status - 'open', 'resolved', or 'done'. Pass empty to leave unchanged.
"""
import asyncio
import app
from thread_store import ThreadStore
thread_store: ThreadStore = getattr(app, "thread_state", None)
if not thread_store:
return "Error: thread store not available."
message_store = getattr(app, "store", None)
if not message_store:
return "Error: message store not available."
root_id = int(thread_id)
root_message = message_store.get_by_id(root_id)
if not root_message:
return f"Error: thread root message {root_id} not found."
existing = thread_store.get(root_id) or {}
new_owner = owner.strip() if owner.strip() else existing.get("owner", "")
new_status = status.strip() if status.strip() else existing.get("status", "open")
if new_status not in ("open", "resolved", "done"):
return f"Error: invalid status '{new_status}'. Use 'open', 'resolved', or 'done'."
updated = thread_store.update_thread(
root_id,
owner=new_owner,
status=new_status,
channel=root_message.get("channel", "general"),
last_message_id=existing.get("last_message_id", root_id),
)
updated["root_id"] = root_id
if app._event_loop:
asyncio.run_coroutine_threadsafe(
app.broadcast_thread_update(updated), app._event_loop
)
return json.dumps(updated, ensure_ascii=False)
def chat_propose_job(
sender: str,
title: str,
body: str = "",
channel: str = "general",
ctx: Context | None = None,
) -> str:
"""Propose a job for human approval. Posts a proposal card in the timeline.
The human can Accept (creates the job) or Dismiss. Agents must NOT create jobs
directly — always propose and let the human decide.
Args:
title: Short job title (max 80 chars)
body: Detailed description of the work (max 1000 chars)
channel: Channel to post the proposal in
"""
sender, err = _resolve_tool_identity(
sender, ctx, field_name="sender", required=True
)
if err:
return err
if not title.strip():
return "Error: title is required."
title = title.strip()[:80]
body = (body or "").strip()[:1000]
msg = store.add(
sender,
f"Job proposal: {title}",
msg_type="job_proposal",
channel=channel,
metadata={"title": title, "body": body, "status": "pending"},
)
_update_cursor(sender, [msg], channel)
with _presence_lock:
_presence[sender] = time.time()
return f"Proposed job (msg_id={msg['id']}): {title}"
def _resolve_attachments(attachments: list[dict]) -> list[dict]:
"""Add absolute file_path to attachments so agents can read images."""
if not attachments:
return attachments
raw_dir = "./uploads"
if config and "images" in config:
raw_dir = config["images"].get("upload_dir", raw_dir)
upload_dir = Path(raw_dir).resolve()
resolved = []
for att in attachments:
a = dict(att)
url = a.get("url", "")
if url.startswith("/uploads/"):
filename = url.split("/")[-1]
a["file_path"] = str(upload_dir / filename)
resolved.append(a)
return resolved
def _serialize_messages(msgs: list[dict]) -> str:
"""Serialize store messages into MCP chat_read output shape."""
out = []
for m in msgs:
entry = {
"id": m["id"],
"sender": m["sender"],
"text": m["text"],
"type": m["type"],
"time": m["time"],
"channel": m.get("channel", "general"),
}
if m.get("attachments"):
entry["attachments"] = _resolve_attachments(m["attachments"])
if m.get("reply_to") is not None:
entry["reply_to"] = m["reply_to"]
out.append(entry)
return json.dumps(out, ensure_ascii=False) if out else ""
def _load_cursors():
"""Load cursor state from disk (called by run.py after store init)."""
global _cursors
if _CURSORS_FILE is None or not _CURSORS_FILE.exists():
return
try:
data = json.loads(_CURSORS_FILE.read_text("utf-8"))
with _cursors_lock:
_cursors.update(data)
except Exception:
log.warning("Failed to load cursor state from %s", _CURSORS_FILE)
def _save_cursors():
"""Persist cursor state to disk atomically (write temp + rename)."""
if _CURSORS_FILE is None:
return
try:
with _cursors_lock:
snapshot = dict(_cursors)
_CURSORS_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = _CURSORS_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(snapshot), "utf-8")
os.replace(tmp, _CURSORS_FILE) # atomic on POSIX
except Exception:
log.warning("Failed to save cursor state to %s", _CURSORS_FILE)
def _load_roles():
"""Load persisted roles from disk."""
global _roles
if _ROLES_FILE is None or not _ROLES_FILE.exists():
return
try:
_roles = json.loads(_ROLES_FILE.read_text("utf-8"))
except Exception:
log.warning("Failed to load roles from %s", _ROLES_FILE)
def _save_roles():
"""Persist roles to disk atomically."""
if _ROLES_FILE is None:
return
try:
_ROLES_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = _ROLES_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(_roles), "utf-8")
os.replace(tmp, _ROLES_FILE)
except Exception:
log.warning("Failed to save roles to %s", _ROLES_FILE)
def set_role(name: str, role: str):
"""Set or clear an agent's role. Empty string clears."""
if role:
_roles[name] = role
else:
_roles.pop(name, None)
_save_roles()
def get_role(name: str) -> str:
"""Get an agent's current role, or empty string."""
return _roles.get(name, "")
def get_all_roles() -> dict[str, str]:
"""All active roles."""
return dict(_roles)
def migrate_identity(old_name: str, new_name: str):
"""Migrate all runtime state when an agent is renamed (presence, cursors, activity, roles)."""
with _presence_lock:
if old_name in _presence:
_presence[new_name] = _presence.pop(old_name)
if old_name in _activity:
_activity[new_name] = _activity.pop(old_name)
if old_name in _activity_ts:
_activity_ts[new_name] = _activity_ts.pop(old_name)
_renamed_from.add(old_name) # suppress leave message for old name
with _cursors_lock:
if old_name in _cursors:
_cursors[new_name] = _cursors.pop(old_name)
if old_name in _roles:
_roles[new_name] = _roles.pop(old_name)
_save_roles()
_save_cursors()
def purge_identity(name: str):
"""Remove all runtime state for a deregistered agent (presence, activity, cursors, roles)."""
with _presence_lock:
_presence.pop(name, None)
_activity.pop(name, None)
_activity_ts.pop(name, None)
with _cursors_lock:
_cursors.pop(name, None)
if name in _roles:
del _roles[name]
_save_roles()
_save_cursors()
def migrate_cursors_rename(old_name: str, new_name: str):
"""Move cursor entries from old channel name to new channel name."""
with _cursors_lock:
for agent_cursors in _cursors.values():
if old_name in agent_cursors:
agent_cursors[new_name] = agent_cursors.pop(old_name)
_save_cursors()
def migrate_cursors_delete(channel: str):
"""Remove cursor entries for a deleted channel."""
with _cursors_lock:
for agent_cursors in _cursors.values():
agent_cursors.pop(channel, None)
_save_cursors()
def _update_cursor(sender: str, msgs: list[dict], channel: str | None):
if sender and msgs:
ch_key = channel if channel else "__all__"
with _cursors_lock:
agent_cursors = _cursors.setdefault(sender, {})
agent_cursors[ch_key] = msgs[-1]["id"]
_save_cursors()
def chat_read(
sender: str = "",
since_id: int = 0,
limit: int = 20,
channel: str = "",
job_id: int = 0,
thread_id: int = 0,
ctx: Context | None = None,
) -> str:
"""Read chat messages. Returns JSON array with: id, sender, text, type, time, channel.
Smart defaults:
- First call with sender: returns last `limit` messages (full context).
- Subsequent calls with same sender: returns only NEW messages since last read.
- Pass since_id to override and read from a specific point.
- Omit sender to always get the last `limit` messages (no cursor).
- Pass channel to filter by channel name (default: all channels).
- Pass job_id to read messages from a specific job.
- Pass thread_id to read only messages belonging to a specific thread (root message + replies)."""
sender, err = _resolve_tool_identity(
sender, ctx, field_name="sender", required=False
)
if err:
return err
# Job-scoped read: return messages from the job store
if job_id and jobs:
msgs = jobs.get_messages(job_id)
if msgs is None:
return f"Error: job #{job_id} not found."
out = []
for m in msgs:
entry = {
"id": m["id"],
"sender": m["sender"],
"text": m["text"],
"time": m.get("time", ""),
"job_id": job_id,
}
if m.get("attachments"):
entry["attachments"] = _resolve_attachments(m["attachments"])
if m.get("type"):
entry["type"] = m["type"]
if m.get("resolved"):
entry["resolved"] = m["resolved"]
out.append(entry)
return (
json.dumps(out, ensure_ascii=False)
if out
else "No messages in this job yet."
)
# Thread-scoped read: return only messages in a specific thread
if thread_id and store and thread_state:
from thread_store import resolve_thread_root_id
all_msgs = store.get_all()
message_index = {int(m["id"]): m for m in all_msgs}
thread_msgs = [
m for m in all_msgs if resolve_thread_root_id(m, message_index) == thread_id
]
thread_msgs.sort(key=lambda m: m["id"])
if not thread_msgs:
return f"No messages found for thread {thread_id}."
thread_record = thread_state.get(thread_id)
header = f"Thread {thread_id}"
if thread_record:
header += f" | status: {thread_record.get('status', 'open')}"
if thread_record.get("owner"):
header += f" | owner: {thread_record['owner']}"
serialized = _serialize_messages(thread_msgs)
return f"{header}\n{serialized}"
ch = channel if channel else None
if since_id:
msgs = store.get_since(since_id, channel=ch)
elif sender:
ch_key = ch if ch else "__all__"
with _cursors_lock:
agent_cursors = _cursors.get(sender, {})
cursor = agent_cursors.get(ch_key, 0)
if cursor:
msgs = store.get_since(cursor, channel=ch)
else:
msgs = store.get_recent(limit, channel=ch)
else:
msgs = store.get_recent(limit, channel=ch)
msgs = msgs[-limit:]
_update_cursor(sender, msgs, ch)
serialized = _serialize_messages(msgs)
# Escalating empty-read hints to discourage polling loops
if not serialized and sender:
_empty_read_count[sender] = _empty_read_count.get(sender, 0) + 1
n = _empty_read_count[sender]
if n == 1:
serialized = "No new messages. Do not poll — wait for your next prompt."
elif n == 2:
serialized = (
"No new messages. You have read with no results twice — "
"stop polling and wait for a trigger."
)
else:
serialized = (
"No new messages. STOP. Repeated empty reads waste tokens. "
"Wait for your next prompt."
)
elif sender:
_empty_read_count[sender] = 0
# Prepend identity breadcrumb if multi-instance
if sender and registry and registry.is_registered(sender):
multi = registry.family_instance_count(sender) >= 2
if multi:
inst = registry.get_instance(sender)
if inst:
breadcrumb = f"[identity: {inst['name']} | label: {inst['label']}]"
serialized = f"{breadcrumb}\n{serialized}"
return serialized
def chat_resync(
sender: str,
limit: int = 50,
channel: str = "",
ctx: Context | None = None,
) -> str:
"""Explicit full-context fetch.
Returns the latest `limit` messages and resets the sender cursor
to the latest returned message id.
Pass channel to filter by channel name (default: all channels).
"""
sender, err = _resolve_tool_identity(
sender, ctx, field_name="sender", required=True
)
if err:
return err
ch = channel if channel else None
msgs = store.get_recent(limit, channel=ch)
_update_cursor(sender, msgs, ch)
serialized = _serialize_messages(msgs)
return serialized
def chat_join(name: str, channel: str = "general", ctx: Context | None = None) -> str:
"""Announce that you've connected to agentchattr."""
name, err = _resolve_tool_identity(name, ctx, field_name="name", required=True)
if err:
return err
# Block pending instances (identity not yet confirmed)
if registry and registry.is_pending(name):
return "Error: identity not confirmed. Call chat_claim(sender=your_base_name) to get your identity."
# Block base family names when multi-instance is active
# (but allow if name is a registered+active instance — e.g. slot-1 'claude' that already claimed)
if (
registry
and name in registry.get_bases()
and registry.family_instance_count(name) >= 2
):
inst = registry.get_instance(name)
if not inst or inst.get("state") != "active":
return (
f"Error: multiple {name} instances registered. "
f"Call chat_claim(sender='{name}') to get your unique identity first."
)
# Block unregistered agent names (stale identity from resumed session)
if registry and registry.is_agent_family(name) and not registry.is_registered(name):
return f"Error: '{name}' is not registered. Call chat_claim(sender=your_base_name) to get your identity."
store.add(name, f"{name} is online", msg_type="join", channel="general")
online = _get_online()
return f"Joined. Online: {', '.join(online)}"
def chat_who() -> str:
"""Check who's currently online in agentchattr."""
online = _get_online()
return f"Online: {', '.join(online)}" if online else "Nobody online."
def _touch_presence(name: str):
"""Update presence timestamp — called on any MCP tool use."""
with _presence_lock:
_presence[name] = time.time()
def _get_online() -> list[str]:
now = time.time()
with _presence_lock:
return [name for name, ts in _presence.items() if now - ts < PRESENCE_TIMEOUT]
def is_online(name: str) -> bool:
now = time.time()
with _presence_lock:
return name in _presence and now - _presence.get(name, 0) < PRESENCE_TIMEOUT
def set_active(name: str, active: bool):
with _presence_lock:
_activity[name] = active
if active:
_activity_ts[name] = __import__("time").time()
def is_active(name: str) -> bool: