Skip to content

Commit 94ae8a3

Browse files
committed
feat: implement feed injection for ADR-013 Milestone 2
1 parent 395a3a1 commit 94ae8a3

File tree

14 files changed

+1150
-8
lines changed

14 files changed

+1150
-8
lines changed

cmd/cllama/main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func run(args []string, stdout, stderr io.Writer) error {
6262

6363
apiServer := &http.Server{
6464
Addr: cfg.APIAddr,
65-
Handler: newAPIHandler(cfg.ContextRoot, reg, logger, acc, pricing),
65+
Handler: newAPIHandler(cfg.ContextRoot, reg, logger, acc, pricing, cfg.PodName),
6666
ReadHeaderTimeout: 10 * time.Second,
6767
}
6868
uiServer := &http.Server{
@@ -98,11 +98,15 @@ func run(args []string, stdout, stderr io.Writer) error {
9898
return nil
9999
}
100100

101-
func newAPIHandler(contextRoot string, reg *provider.Registry, logger *logging.Logger, acc *cost.Accumulator, pricing *cost.Pricing) http.Handler {
101+
func newAPIHandler(contextRoot string, reg *provider.Registry, logger *logging.Logger, acc *cost.Accumulator, pricing *cost.Pricing, podName string) http.Handler {
102102
mux := http.NewServeMux()
103+
opts := []proxy.HandlerOption{proxy.WithCostTracking(acc, pricing)}
104+
if podName != "" {
105+
opts = append(opts, proxy.WithFeeds(podName))
106+
}
103107
proxyHandler := proxy.NewHandler(reg, func(agentID string) (*agentctx.AgentContext, error) {
104108
return agentctx.Load(contextRoot, agentID)
105-
}, logger, proxy.WithCostTracking(acc, pricing))
109+
}, logger, opts...)
106110
mux.Handle("POST /v1/chat/completions", proxyHandler)
107111
mux.Handle("POST /v1/messages", proxyHandler)
108112
mux.HandleFunc("GET /health", func(w http.ResponseWriter, r *http.Request) {

cmd/cllama/main_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestDualServerIntegrationSmoke(t *testing.T) {
7575
}
7676
pricing := cost.DefaultPricing()
7777
acc := cost.NewAccumulator()
78-
apiHandler := newAPIHandler(contextRoot, reg, logging.New(io.Discard), acc, pricing)
78+
apiHandler := newAPIHandler(contextRoot, reg, logging.New(io.Discard), acc, pricing, "test-pod")
7979
uiHandler := newUIHandler(reg, acc, contextRoot)
8080

8181
apiServer := &http.Server{Handler: apiHandler}

internal/agentctx/agentctx.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
// AgentContext holds the per-agent mounted contract and metadata files.
1111
type AgentContext struct {
1212
AgentID string
13+
ContextDir string
1314
AgentsMD []byte
1415
ClawdapusMD []byte
1516
Metadata map[string]any
@@ -41,6 +42,7 @@ func Load(contextRoot, agentID string) (*AgentContext, error) {
4142

4243
return &AgentContext{
4344
AgentID: agentID,
45+
ContextDir: dir,
4446
AgentsMD: agentsMD,
4547
ClawdapusMD: clawdapusMD,
4648
Metadata: meta,
@@ -65,6 +67,14 @@ func (a *AgentContext) MetadataString(key string) string {
6567
return v
6668
}
6769

70+
// FeedsPath returns the path to the agent's feeds.json manifest.
71+
func (a *AgentContext) FeedsPath() string {
72+
if a == nil || a.ContextDir == "" {
73+
return ""
74+
}
75+
return filepath.Join(a.ContextDir, "feeds.json")
76+
}
77+
6878
// AgentSummary is a lightweight view of an agent for listing purposes.
6979
type AgentSummary struct {
7080
AgentID string

internal/agentctx/agentctx_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,33 @@ func TestLoadMissingDirErrors(t *testing.T) {
4646
t.Error("expected error for missing dir")
4747
}
4848
}
49+
50+
func TestAgentContextFeedsPath(t *testing.T) {
51+
dir := t.TempDir()
52+
agentDir := filepath.Join(dir, "weston")
53+
if err := os.MkdirAll(agentDir, 0o700); err != nil {
54+
t.Fatal(err)
55+
}
56+
if err := os.WriteFile(filepath.Join(agentDir, "AGENTS.md"), []byte("# C"), 0o644); err != nil {
57+
t.Fatal(err)
58+
}
59+
if err := os.WriteFile(filepath.Join(agentDir, "CLAWDAPUS.md"), []byte("# I"), 0o644); err != nil {
60+
t.Fatal(err)
61+
}
62+
if err := os.WriteFile(filepath.Join(agentDir, "metadata.json"), []byte(`{"token":"weston:x"}`), 0o644); err != nil {
63+
t.Fatal(err)
64+
}
65+
if err := os.WriteFile(filepath.Join(agentDir, "feeds.json"), []byte(`[{"name":"test"}]`), 0o644); err != nil {
66+
t.Fatal(err)
67+
}
68+
69+
ctx, err := Load(dir, "weston")
70+
if err != nil {
71+
t.Fatal(err)
72+
}
73+
74+
expected := filepath.Join(agentDir, "feeds.json")
75+
if ctx.FeedsPath() != expected {
76+
t.Errorf("expected %q, got %q", expected, ctx.FeedsPath())
77+
}
78+
}

internal/feeds/fetcher.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package feeds
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/mostlydev/cllama/internal/logging"
13+
)
14+
15+
// FeedResult is the outcome of a single feed fetch.
16+
type FeedResult struct {
17+
Name string
18+
Source string
19+
Content string
20+
FetchedAt time.Time
21+
Stale bool
22+
Truncated bool
23+
Unavailable bool
24+
}
25+
26+
type cacheEntry struct {
27+
content string
28+
fetchedAt time.Time
29+
expiresAt time.Time
30+
truncated bool
31+
}
32+
33+
// Fetcher fetches feed content with TTL caching.
34+
type Fetcher struct {
35+
podName string
36+
client *http.Client
37+
logger *logging.Logger
38+
mu sync.RWMutex
39+
cache map[string]*cacheEntry
40+
nowFunc func() time.Time
41+
}
42+
43+
// NewFetcher creates a feed fetcher. Pass nil for the default http.Client.
44+
func NewFetcher(podName string, client *http.Client, logger *logging.Logger) *Fetcher {
45+
if client == nil {
46+
client = &http.Client{Timeout: FetchTimeout}
47+
}
48+
return &Fetcher{
49+
podName: podName,
50+
client: client,
51+
logger: logger,
52+
cache: make(map[string]*cacheEntry),
53+
nowFunc: time.Now,
54+
}
55+
}
56+
57+
func (f *Fetcher) cacheKey(agentID, feedURL string) string {
58+
return agentID + "|" + feedURL
59+
}
60+
61+
func (f *Fetcher) now() time.Time {
62+
return f.nowFunc()
63+
}
64+
65+
// Fetch returns feed content, using cache when fresh. On fetch failure, returns
66+
// stale cached content if available, otherwise an unavailable marker.
67+
func (f *Fetcher) Fetch(ctx context.Context, agentID string, entry FeedEntry) (FeedResult, error) {
68+
key := f.cacheKey(agentID, entry.URL)
69+
now := f.now()
70+
71+
f.mu.RLock()
72+
cached, hasCached := f.cache[key]
73+
f.mu.RUnlock()
74+
75+
if hasCached && now.Before(cached.expiresAt) {
76+
return FeedResult{
77+
Name: entry.Name,
78+
Source: entry.Source,
79+
Content: cached.content,
80+
FetchedAt: cached.fetchedAt,
81+
Truncated: cached.truncated,
82+
}, nil
83+
}
84+
85+
content, truncated, err := f.doFetch(ctx, agentID, entry)
86+
if err != nil {
87+
if hasCached {
88+
return FeedResult{
89+
Name: entry.Name,
90+
Source: entry.Source,
91+
Content: cached.content,
92+
FetchedAt: cached.fetchedAt,
93+
Stale: true,
94+
Truncated: cached.truncated,
95+
}, nil
96+
}
97+
return FeedResult{
98+
Name: entry.Name,
99+
Source: entry.Source,
100+
Unavailable: true,
101+
}, nil
102+
}
103+
104+
ttl := entry.TTL
105+
if ttl <= 0 {
106+
ttl = DefaultTTLSeconds
107+
}
108+
109+
newEntry := &cacheEntry{
110+
content: content,
111+
fetchedAt: now,
112+
expiresAt: now.Add(time.Duration(ttl) * time.Second),
113+
truncated: truncated,
114+
}
115+
f.mu.Lock()
116+
f.cache[key] = newEntry
117+
f.mu.Unlock()
118+
119+
return FeedResult{
120+
Name: entry.Name,
121+
Source: entry.Source,
122+
Content: content,
123+
FetchedAt: now,
124+
Truncated: truncated,
125+
}, nil
126+
}
127+
128+
func (f *Fetcher) doFetch(ctx context.Context, agentID string, entry FeedEntry) (content string, truncated bool, err error) {
129+
start := time.Now()
130+
statusCode := 0
131+
defer func() {
132+
if f.logger != nil {
133+
f.logger.LogFeedFetch(agentID, entry.Name, entry.URL, statusCode, time.Since(start).Milliseconds(), err)
134+
}
135+
}()
136+
137+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, entry.URL, nil)
138+
if err != nil {
139+
return "", false, fmt.Errorf("build feed request: %w", err)
140+
}
141+
req.Header.Set("X-Claw-ID", agentID)
142+
req.Header.Set("X-Claw-Pod", f.podName)
143+
req.Header.Set("Accept", "text/plain, text/markdown, application/json")
144+
req.Header.Set("X-Forwarded-Proto", "https")
145+
146+
resp, err := f.client.Do(req)
147+
if err != nil {
148+
return "", false, fmt.Errorf("fetch feed %q: %w", entry.Name, err)
149+
}
150+
defer resp.Body.Close()
151+
statusCode = resp.StatusCode
152+
153+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
154+
return "", false, fmt.Errorf("feed %q returned status %d", entry.Name, resp.StatusCode)
155+
}
156+
157+
limited := io.LimitReader(resp.Body, int64(MaxFeedResponseBytes+1))
158+
body, err := io.ReadAll(limited)
159+
if err != nil {
160+
return "", false, fmt.Errorf("read feed %q body: %w", entry.Name, err)
161+
}
162+
163+
if len(body) > MaxFeedResponseBytes {
164+
body = body[:MaxFeedResponseBytes]
165+
truncated = true
166+
}
167+
168+
return formatFeedContent(body, resp.Header.Get("Content-Type")), truncated, nil
169+
}
170+
171+
func formatFeedContent(body []byte, contentType string) string {
172+
ct := strings.ToLower(strings.TrimSpace(contentType))
173+
if strings.Contains(ct, "application/json") {
174+
var b strings.Builder
175+
b.WriteString("```json\n")
176+
b.Write(body)
177+
if len(body) == 0 || body[len(body)-1] != '\n' {
178+
b.WriteByte('\n')
179+
}
180+
b.WriteString("```")
181+
return b.String()
182+
}
183+
return string(body)
184+
}

0 commit comments

Comments
 (0)