diff --git a/go.mod b/go.mod index 4442b28feb..acc958274d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.7 require ( fyne.io/systray v1.12.0 github.com/adhocore/gronx v1.19.6 + github.com/Etriti00/agent-telepathy/tpcp-go v0.4.1 github.com/anthropics/anthropic-sdk-go v1.26.0 github.com/bwmarrin/discordgo v0.29.0 github.com/caarlos0/env/v11 v11.4.0 diff --git a/pkg/multiagent/tpcp/adapter.go b/pkg/multiagent/tpcp/adapter.go new file mode 100644 index 0000000000..4c9302670d --- /dev/null +++ b/pkg/multiagent/tpcp/adapter.go @@ -0,0 +1,258 @@ +// Package tpcp provides TPCP (Telepathy Communication Protocol) integration +// for PicoClaw multi-agent coordination across multiple devices. +// +// TPCP solves the network transport problem that the in-process blackboard +// cannot: agents running on different machines (Raspberry Pi, cloud VMs, +// laptops) can send signed messages, synchronize shared state via CRDT, and +// guarantee message delivery via a dead-letter queue. +// +// Usage — two PicoClaw instances communicating: +// +// // Device A (192.168.1.10) +// nodeA, _ := tpcp.NewAdapter("picoclaw-agent-a", nil) +// nodeA.OnMessage(func(from, content string) { +// fmt.Printf("Received from %s: %s\n", from, content) +// }) +// nodeA.ListenAsync(":8765") +// +// // Device B (192.168.1.20) — connects to Device A +// nodeB, _ := tpcp.NewAdapter("picoclaw-agent-b", nil) +// nodeB.Connect("ws://192.168.1.10:8765") +// nodeB.Send(context.Background(), "picoclaw-agent-a", "hello from device B") +// +// For relay-based routing (internet/NAT traversal), pass a relay URL as a peer: +// +// node.Connect("wss://relay.agent-telepathy.io") +// node.Send(ctx, "picoclaw-remote-agent", "hello over relay") +package tpcp + +import ( + "context" + "crypto/ed25519" + "encoding/base64" + "fmt" + "sync" + + tpcpgo "github.com/Etriti00/agent-telepathy/tpcp-go/tpcp" +) + +// Config holds optional configuration for an Adapter. All fields are optional; +// zero values select safe defaults. +type Config struct { + // PrivateKeySeed is a 32-byte Ed25519 seed. If nil, a new key is generated. + PrivateKeySeed []byte + + // Framework identifies the agent runtime in identity announcements. + // Defaults to "picoclaw". + Framework string + + // Capabilities advertises what this agent can do. + // Defaults to ["chat", "task"]. + Capabilities []string + + // AllowedOrigins restricts WebSocket browser origins for the listener. + // If empty, all non-browser connections are accepted. + AllowedOrigins []string +} + +// MessageHandler is called for every inbound TPCP message. +type MessageHandler func(from, content string) + +// Adapter wraps a TPCP node with a PicoClaw-friendly interface. +// It is safe for concurrent use. +type Adapter struct { + node *tpcpgo.TPCPNode + agentID string + handlers []MessageHandler + mu sync.RWMutex +} + +// NewAdapter creates a TPCP adapter for a PicoClaw agent. +// +// agentID should be the same stable identifier used by the PicoClaw instance +// (e.g. its configured agent name or a UUID). cfg may be nil to use defaults. +func NewAdapter(agentID string, cfg *Config) (*Adapter, error) { + if agentID == "" { + return nil, fmt.Errorf("tpcp: agentID must not be empty") + } + + c := resolveConfig(cfg) + + // Build Ed25519 keypair from seed or generate a fresh one. + var privKey ed25519.PrivateKey + var pubKey ed25519.PublicKey + + if len(c.PrivateKeySeed) > 0 { + if len(c.PrivateKeySeed) != 32 { + return nil, fmt.Errorf("tpcp: PrivateKeySeed must be exactly 32 bytes (got %d)", len(c.PrivateKeySeed)) + } + privKey = ed25519.NewKeyFromSeed(c.PrivateKeySeed) + pubKey = privKey.Public().(ed25519.PublicKey) + } else { + var err error + pubKey, privKey, err = ed25519.GenerateKey(nil) + if err != nil { + return nil, fmt.Errorf("tpcp: key generation failed: %w", err) + } + } + + identity := &tpcpgo.AgentIdentity{ + AgentID: agentID, + Framework: c.Framework, + PublicKey: base64.StdEncoding.EncodeToString(pubKey), + Capabilities: c.Capabilities, + Modality: []string{"text"}, + } + + node := tpcpgo.NewTPCPNode(identity, privKey) + node.AllowedOrigins = c.AllowedOrigins + + a := &Adapter{node: node, agentID: agentID} + + // Route all inbound BROADCAST and TASK_REQUEST messages to registered handlers. + for _, intent := range []tpcpgo.Intent{tpcpgo.IntentBroadcast, tpcpgo.IntentTaskRequest, tpcpgo.IntentCritique} { + intent := intent // capture + node.RegisterHandler(intent, func(env *tpcpgo.TPCPEnvelope) { + content := extractContent(env) + a.mu.RLock() + handlers := a.handlers + a.mu.RUnlock() + for _, h := range handlers { + h(env.Header.SenderID, content) + } + }) + } + + return a, nil +} + +// OnMessage registers a handler for inbound messages. Multiple handlers can be +// registered; they are called in registration order for each message. +func (a *Adapter) OnMessage(h MessageHandler) *Adapter { + a.mu.Lock() + a.handlers = append(a.handlers, h) + a.mu.Unlock() + return a +} + +// ListenAsync starts the TPCP WebSocket server in a background goroutine and +// returns immediately once the port is bound. addr follows net.Listen syntax +// (e.g. ":8765" or "0.0.0.0:8765"). +// +// Errors after startup (e.g. accept errors) are silently logged by the +// underlying TPCP node. Use Listen() if you need the goroutine under your +// own supervision. +func (a *Adapter) ListenAsync(addr string) error { + ready := make(chan error, 1) + go func() { + if err := a.node.Listen(addr); err != nil { + ready <- err + } + }() + + // Wait for the node's Ready channel (closed once the server is bound). + select { + case err := <-ready: + return fmt.Errorf("tpcp: listen on %s failed: %w", addr, err) + case <-a.node.Ready: + return nil + } +} + +// Connect opens a WebSocket connection to a peer or relay at url. +// url should be a WebSocket URL, e.g.: +// - "ws://192.168.1.10:8765" — direct P2P to another PicoClaw device +// - "wss://relay.example.com" — relay for NAT traversal +// +// Connect performs a TPCP HANDSHAKE so the remote node registers this agent. +// It returns once the connection is established; message exchange happens +// asynchronously. +func (a *Adapter) Connect(url string) error { + if err := a.node.Connect(url); err != nil { + return fmt.Errorf("tpcp: connect to %s failed: %w", url, err) + } + return nil +} + +// Send sends a text message to the agent identified by targetID. +// The message is signed with this node's Ed25519 key before transmission. +// +// If the target is not currently connected, the message is queued in the +// dead-letter queue (DLQ) and delivered when connectivity is restored. +func (a *Adapter) Send(_ context.Context, targetID, message string) error { + payload := &tpcpgo.TextPayload{ + PayloadType: "text", + Content: message, + Language: "en", + } + if err := a.node.SendMessage(a.agentID, targetID, tpcpgo.IntentBroadcast, payload); err != nil { + return fmt.Errorf("tpcp: send to %s failed: %w", targetID, err) + } + return nil +} + +// Broadcast sends a text message to all connected peers. +func (a *Adapter) Broadcast(_ context.Context, message string) error { + payload := &tpcpgo.TextPayload{ + PayloadType: "text", + Content: message, + Language: "en", + } + const broadcastID = "00000000-0000-0000-0000-000000000000" + if err := a.node.SendMessage(a.agentID, broadcastID, tpcpgo.IntentBroadcast, payload); err != nil { + return fmt.Errorf("tpcp: broadcast failed: %w", err) + } + return nil +} + +// AgentID returns the stable agent identifier for this node. +func (a *Adapter) AgentID() string { return a.agentID } + +// Stop shuts down the TPCP node, closing all connections. +func (a *Adapter) Stop() error { + return a.node.Stop() +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +func resolveConfig(cfg *Config) Config { + c := Config{ + Framework: "picoclaw", + Capabilities: []string{"chat", "task"}, + } + if cfg == nil { + return c + } + if cfg.Framework != "" { + c.Framework = cfg.Framework + } + if len(cfg.Capabilities) > 0 { + c.Capabilities = cfg.Capabilities + } + if len(cfg.PrivateKeySeed) > 0 { + c.PrivateKeySeed = cfg.PrivateKeySeed + } + if len(cfg.AllowedOrigins) > 0 { + c.AllowedOrigins = cfg.AllowedOrigins + } + return c +} + +func extractContent(env *tpcpgo.TPCPEnvelope) string { + if env.Payload == nil { + return "" + } + // Payload is interface{}; try common shapes. + switch p := env.Payload.(type) { + case map[string]interface{}: + if c, ok := p["content"].(string); ok { + return c + } + // Fallback: JSON-encode the whole payload + return fmt.Sprintf("%v", p) + case string: + return p + default: + return fmt.Sprintf("%v", p) + } +} diff --git a/pkg/multiagent/tpcp/adapter_test.go b/pkg/multiagent/tpcp/adapter_test.go new file mode 100644 index 0000000000..5f883a32ba --- /dev/null +++ b/pkg/multiagent/tpcp/adapter_test.go @@ -0,0 +1,157 @@ +package tpcp_test + +import ( + "context" + "testing" + "time" + + tpcpadapter "github.com/sipeed/picoclaw/pkg/multiagent/tpcp" +) + +func TestNewAdapter_EmptyAgentID(t *testing.T) { + _, err := tpcpadapter.NewAdapter("", nil) + if err == nil { + t.Fatal("expected error for empty agentID, got nil") + } +} + +func TestNewAdapter_InvalidSeedLength(t *testing.T) { + cfg := &tpcpadapter.Config{ + PrivateKeySeed: []byte("short"), + } + _, err := tpcpadapter.NewAdapter("test-agent", cfg) + if err == nil { + t.Fatal("expected error for seed with wrong length, got nil") + } +} + +func TestNewAdapter_ValidSeed(t *testing.T) { + seed := make([]byte, 32) + cfg := &tpcpadapter.Config{ + PrivateKeySeed: seed, + } + a, err := tpcpadapter.NewAdapter("test-agent", cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if a.AgentID() != "test-agent" { + t.Errorf("expected agentID 'test-agent', got %q", a.AgentID()) + } +} + +func TestNewAdapter_NilConfig(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatalf("unexpected error with nil config: %v", err) + } + if a.AgentID() != "test-agent" { + t.Errorf("expected agentID 'test-agent', got %q", a.AgentID()) + } +} + +func TestNewAdapter_GeneratesKey(t *testing.T) { + a1, err1 := tpcpadapter.NewAdapter("agent-1", nil) + a2, err2 := tpcpadapter.NewAdapter("agent-2", nil) + if err1 != nil || err2 != nil { + t.Fatalf("unexpected errors: %v, %v", err1, err2) + } + _ = a1 + _ = a2 +} + +func TestOnMessage_Chainable(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatal(err) + } + returned := a.OnMessage(func(from, content string) {}) + if returned != a { + t.Error("OnMessage should return the same adapter for chaining") + } +} + +func TestOnMessage_MultipleHandlers(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatal(err) + } + count := 0 + a.OnMessage(func(from, content string) { count++ }) + a.OnMessage(func(from, content string) { count++ }) + _ = count +} + +func TestAgentID(t *testing.T) { + id := "picoclaw-device-001" + a, err := tpcpadapter.NewAdapter(id, nil) + if err != nil { + t.Fatal(err) + } + if got := a.AgentID(); got != id { + t.Errorf("AgentID() = %q, want %q", got, id) + } +} + +func TestStop_BeforeConnect(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatal(err) + } + _ = a.Stop() +} + +func TestConnect_InvalidURL(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatal(err) + } + err = a.Connect("ws://127.0.0.1:19999") + if err == nil { + t.Error("expected connection error for unreachable address") + } +} + +func TestSend_NotConnected(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + _ = a.Send(ctx, "other-agent", "hello") +} + +func TestListenAsync_ThenStop(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatal(err) + } + + if err := a.ListenAsync(":0"); err != nil { + t.Fatalf("ListenAsync failed: %v", err) + } + + time.Sleep(20 * time.Millisecond) + if err := a.Stop(); err != nil { + t.Errorf("Stop failed: %v", err) + } +} + +func TestConfig_Defaults(t *testing.T) { + a, err := tpcpadapter.NewAdapter("test-agent", nil) + if err != nil { + t.Fatal(err) + } + _ = a +} + +func TestConfig_CustomFramework(t *testing.T) { + cfg := &tpcpadapter.Config{ + Framework: "custom-framework", + Capabilities: []string{"vision", "tool-use"}, + } + a, err := tpcpadapter.NewAdapter("test-agent", cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + _ = a +}