Skip to content

Commit 849fa11

Browse files
committed
fix: address 5 code review issues from vulnerability audit
1. Admin auth: allow loopback access when OPENCODE_MEM_ADMIN_TOKEN is unset (dev convenience), require token for all IPs when set (prod) 2. CLI serve: update curl shutdown instruction with Content-Type header and JSON body required after CSRF fix 3. MCP SPOT violation: extract shared MaintenanceServices scheduler to service crate, used by both HTTP cron and MCP background loop — MCP now runs all 7 maintenance tasks instead of only compression 4. Pipeline dead code: fetch all unsummarized events per run (10K limit) instead of 100-event batches that split buckets into overlapping summaries; compress_events_chunked is now reachable for >200 events 5. Circuit breaker bypass: route run_full_compression and run_compression_pipeline through CB so DB failures are tracked
1 parent 692d37e commit 849fa11

File tree

8 files changed

+298
-247
lines changed

8 files changed

+298
-247
lines changed

crates/cli/src/commands/mcp.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,24 @@ use opencode_mem_core::AppConfig;
33
use opencode_mem_embeddings::LazyEmbeddingService;
44
use opencode_mem_llm::LlmClient;
55
use opencode_mem_mcp::run_mcp_server;
6+
use opencode_mem_service::maintenance::{MaintenanceServices, run_maintenance_tick};
67
use opencode_mem_service::{
7-
InfiniteMemoryService, KnowledgeService, ObservationService, SearchService, SessionService,
8+
InfiniteMemoryService, KnowledgeService, ObservationService, QueueService, SearchService,
9+
SessionService,
810
};
911
use std::sync::Arc;
1012
use tokio::sync::broadcast;
1113

12-
fn start_mcp_background_tasks(infinite_mem: Option<Arc<InfiniteMemoryService>>) {
13-
if let Some(mem) = infinite_mem {
14-
tokio::spawn(async move {
15-
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
16-
loop {
17-
interval.tick().await;
18-
match mem.run_full_compression().await {
19-
Ok((five_min, hour, day)) => {
20-
if five_min > 0 || hour > 0 || day > 0 {
21-
tracing::info!(
22-
"MCP cron: created {} 5min, {} hour, {} day summaries",
23-
five_min,
24-
hour,
25-
day,
26-
);
27-
}
28-
}
29-
Err(e) => tracing::warn!("MCP cron: infinite memory error: {e:?}"),
30-
}
31-
}
32-
});
33-
}
14+
fn start_mcp_background_tasks(services: Arc<MaintenanceServices>) {
15+
tokio::spawn(async move {
16+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
17+
let mut loop_count: u64 = 0;
18+
loop {
19+
interval.tick().await;
20+
loop_count = loop_count.wrapping_add(1);
21+
run_maintenance_tick(&services, loop_count).await;
22+
}
23+
});
3424
}
3525

3626
pub(crate) async fn run(config: Arc<AppConfig>) -> Result<()> {
@@ -99,7 +89,7 @@ pub(crate) async fn run(config: Arc<AppConfig>) -> Result<()> {
9989
let session_service = Arc::new(SessionService::new(storage.clone(), llm.clone()));
10090
let knowledge_service = Arc::new(KnowledgeService::new(storage.clone(), embeddings.clone()));
10191
let search_service = Arc::new(SearchService::new(
102-
storage,
92+
storage.clone(),
10393
embeddings,
10494
infinite_mem.clone(),
10595
config.dedup_threshold,
@@ -109,7 +99,19 @@ pub(crate) async fn run(config: Arc<AppConfig>) -> Result<()> {
10999

110100
let pending_writes = Arc::new(opencode_mem_service::PendingWriteQueue::new());
111101

112-
start_mcp_background_tasks(infinite_mem.clone());
102+
let queue_service = Arc::new(QueueService::new(storage, pending_writes.clone(), &config));
103+
104+
let maintenance = Arc::new(MaintenanceServices {
105+
observation_service: observation_service.clone(),
106+
session_service: session_service.clone(),
107+
knowledge_service: knowledge_service.clone(),
108+
search_service: search_service.clone(),
109+
queue_service,
110+
infinite_mem: infinite_mem.clone(),
111+
config: config.clone(),
112+
});
113+
114+
start_mcp_background_tasks(maintenance);
113115

114116
run_mcp_server(
115117
infinite_mem,

crates/cli/src/commands/serve.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ pub(crate) async fn run(port: u16, host: String, config: Arc<AppConfig>) -> Resu
140140
"Port {} is already in use.\n\n\
141141
Another instance of opencode-mem is likely running.\n\
142142
To stop it, run:\n\
143-
curl -X POST http://127.0.0.1:{}/api/admin/shutdown\n\
143+
curl -X POST -H 'Content-Type: application/json' -d 'null' \
144+
http://127.0.0.1:{}/api/admin/shutdown\n\
145+
If OPENCODE_MEM_ADMIN_TOKEN is set, add: -H 'X-Admin-Token: <token>'\n\
144146
Or kill the process manually before starting a new server.",
145147
port,
146148
port

crates/http/src/handlers/cron.rs

Lines changed: 12 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,19 @@ use std::sync::Arc;
22
use std::sync::atomic::Ordering;
33

44
use crate::AppState;
5+
use opencode_mem_service::maintenance::{MaintenanceServices, run_maintenance_tick};
56

67
pub async fn start_cron_scheduler(state: Arc<AppState>) {
8+
let services = MaintenanceServices {
9+
observation_service: Arc::clone(&state.observation_service),
10+
session_service: Arc::clone(&state.session_service),
11+
knowledge_service: Arc::clone(&state.knowledge_service),
12+
search_service: Arc::clone(&state.search_service),
13+
queue_service: Arc::clone(&state.queue_service),
14+
infinite_mem: state.infinite_mem.clone(),
15+
config: Arc::clone(&state.config),
16+
};
17+
718
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
819
let mut shutdown_rx = state.shutdown_tx.subscribe();
920
let mut loop_count: u64 = 0;
@@ -22,126 +33,6 @@ pub async fn start_cron_scheduler(state: Arc<AppState>) {
2233

2334
loop_count = loop_count.wrapping_add(1);
2435

25-
if loop_count.is_multiple_of(60)
26-
&& let Some(ref infinite_mem) = state.infinite_mem
27-
{
28-
tracing::debug!("Cron: running infinite memory compression...");
29-
let mem = Arc::clone(infinite_mem);
30-
state.background_tasks.lock().await.spawn(async move {
31-
match mem.run_full_compression().await {
32-
Ok((five_min, hour, day)) => {
33-
if five_min > 0 || hour > 0 || day > 0 {
34-
tracing::info!(
35-
"Cron: created {} 5min, {} hour, {} day summaries",
36-
five_min,
37-
hour,
38-
day,
39-
);
40-
}
41-
}
42-
Err(e) => tracing::warn!("Cron: infinite memory error: {e:?}"),
43-
}
44-
});
45-
}
46-
47-
if loop_count.is_multiple_of(180) {
48-
tracing::debug!("Cron: running embedding backfill...");
49-
let state_clone = Arc::clone(&state);
50-
state.background_tasks.lock().await.spawn(async move {
51-
match state_clone.search_service.run_embedding_backfill(100).await {
52-
Ok(generated) if generated > 0 => {
53-
tracing::info!("Cron: generated {} embeddings", generated);
54-
}
55-
Ok(_) => {}
56-
Err(e) => tracing::warn!("Cron: embedding backfill failed: {}", e),
57-
}
58-
});
59-
}
60-
61-
if loop_count.is_multiple_of(360) {
62-
let state_clone = Arc::clone(&state);
63-
state.background_tasks.lock().await.spawn(async move {
64-
match state_clone.observation_service.run_dedup_sweep().await {
65-
Ok(merged) if merged > 0 => {
66-
tracing::info!(merged, "Cron: dedup sweep completed");
67-
}
68-
Ok(_) => {}
69-
Err(e) => tracing::warn!(error = %e, "Cron: dedup sweep failed"),
70-
}
71-
});
72-
}
73-
74-
if loop_count.is_multiple_of(720) {
75-
let state_clone = Arc::clone(&state);
76-
state.background_tasks.lock().await.spawn(async move {
77-
if let Err(e) = state_clone
78-
.observation_service
79-
.cleanup_old_injections()
80-
.await
81-
{
82-
tracing::warn!(error = %e, "Cron: injection cleanup failed");
83-
}
84-
});
85-
}
86-
87-
if loop_count.is_multiple_of(17280) {
88-
let ttl_secs = state.config.dlq_ttl_secs();
89-
let state_clone = Arc::clone(&state);
90-
state.background_tasks.lock().await.spawn(async move {
91-
match state_clone
92-
.queue_service
93-
.clear_stale_failed_messages(ttl_secs)
94-
.await
95-
{
96-
Ok(deleted) if deleted > 0 => {
97-
tracing::info!(deleted, "Cron: DLQ garbage collection completed");
98-
}
99-
Ok(_) => {}
100-
Err(e) => tracing::warn!(error = %e, "Cron: DLQ garbage collection failed"),
101-
}
102-
});
103-
}
104-
105-
if loop_count.is_multiple_of(2160) {
106-
let state_clone = Arc::clone(&state);
107-
state.background_tasks.lock().await.spawn(async move {
108-
match state_clone
109-
.knowledge_service
110-
.run_confidence_lifecycle()
111-
.await
112-
{
113-
Ok((decayed, archived)) if decayed > 0 || archived > 0 => {
114-
tracing::info!(
115-
decayed,
116-
archived,
117-
"Cron: knowledge confidence lifecycle completed"
118-
);
119-
}
120-
Ok(_) => {}
121-
Err(e) => {
122-
tracing::warn!(error = %e, "Cron: knowledge confidence lifecycle failed");
123-
}
124-
}
125-
});
126-
}
127-
128-
if loop_count.is_multiple_of(360) {
129-
let state_clone = Arc::clone(&state);
130-
state.background_tasks.lock().await.spawn(async move {
131-
match state_clone
132-
.session_service
133-
.generate_pending_summaries(10)
134-
.await
135-
{
136-
Ok(n) if n > 0 => {
137-
tracing::info!(generated = n, "Cron: session summary generation completed");
138-
}
139-
Ok(_) => {}
140-
Err(e) => {
141-
tracing::warn!(error = %e, "Cron: session summary generation failed");
142-
}
143-
}
144-
});
145-
}
36+
run_maintenance_tick(&services, loop_count).await;
14637
}
14738
}

crates/http/src/handlers/mod.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,29 @@
1212
reason = "HTTP handlers are called once from router"
1313
)]
1414

15+
/// Admin access control with two modes:
16+
/// - **Token configured** (`OPENCODE_MEM_ADMIN_TOKEN` set): require the token
17+
/// for ALL requests regardless of source IP. Secure for production.
18+
/// - **No token configured**: allow admin access from loopback IPs only
19+
/// (127.0.0.1, ::1). Convenient for local development where the CLI
20+
/// `serve` command tells users to `curl -X POST .../api/admin/shutdown`.
1521
pub(crate) fn check_admin_access(
16-
_addr: &std::net::SocketAddr,
22+
addr: &std::net::SocketAddr,
1723
headers: &axum::http::HeaderMap,
1824
config: &opencode_mem_core::AppConfig,
1925
) -> bool {
20-
if let Some(ref token) = config.admin_token
21-
&& let Some(provided) = headers.get("x-admin-token").and_then(|h| h.to_str().ok())
22-
&& subtle::ConstantTimeEq::ct_eq(provided.as_bytes(), token.as_bytes()).into()
23-
{
24-
return true;
26+
if let Some(ref token) = config.admin_token {
27+
// Token mode: require valid token regardless of IP
28+
if let Some(provided) = headers.get("x-admin-token").and_then(|h| h.to_str().ok())
29+
&& subtle::ConstantTimeEq::ct_eq(provided.as_bytes(), token.as_bytes()).into()
30+
{
31+
return true;
32+
}
33+
false
34+
} else {
35+
// No token mode: allow loopback only
36+
addr.ip().is_loopback()
2537
}
26-
27-
false
2838
}
2939

3040
pub mod admin;

crates/service/src/infinite_memory_service/mod.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,22 @@ impl InfiniteMemoryService {
205205
}
206206

207207
pub async fn run_compression_pipeline(&self) -> Result<u32> {
208-
pipeline::run_compression_pipeline(&self.pool, &self.llm).await
208+
if !self.circuit_breaker.should_allow() {
209+
return Err(anyhow::anyhow!(
210+
"circuit breaker open, {} seconds until probe",
211+
self.circuit_breaker.seconds_until_probe()
212+
));
213+
}
214+
let result = pipeline::run_compression_pipeline(&self.pool, &self.llm).await;
215+
match &result {
216+
Ok(_) => {
217+
self.circuit_breaker.record_success();
218+
}
219+
Err(_) => {
220+
self.circuit_breaker.record_failure();
221+
}
222+
}
223+
result
209224
}
210225

211226
pub async fn create_hour_summary(
@@ -253,7 +268,22 @@ impl InfiniteMemoryService {
253268
}
254269

255270
pub async fn run_full_compression(&self) -> Result<(u32, u32, u32)> {
256-
pipeline::run_full_compression(&self.pool, &self.llm).await
271+
if !self.circuit_breaker.should_allow() {
272+
return Err(anyhow::anyhow!(
273+
"circuit breaker open, {} seconds until probe",
274+
self.circuit_breaker.seconds_until_probe()
275+
));
276+
}
277+
let result = pipeline::run_full_compression(&self.pool, &self.llm).await;
278+
match &result {
279+
Ok(_) => {
280+
self.circuit_breaker.record_success();
281+
}
282+
Err(_) => {
283+
self.circuit_breaker.record_failure();
284+
}
285+
}
286+
result
257287
}
258288

259289
pub async fn guarded<F, Fut, T>(&self, op_f: F) -> Result<T, StorageError>

0 commit comments

Comments
 (0)