Skip to content

Commit 0edcb94

Browse files
committed
fix: address security vulnerabilities identified by CodeRabbitAI review
- Prevent path traversal attacks with strict date format validation - Improve data comparison logic to preserve comprehensive information
1 parent 18abfa4 commit 0edcb94

File tree

2 files changed

+114
-27
lines changed

2 files changed

+114
-27
lines changed

src/claude_monitor/data/history_manager.py

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,29 @@ def _get_daily_file_path(self, date_str: str) -> Path:
4040
4141
Returns:
4242
Path to the daily data file
43+
44+
Raises:
45+
ValueError: If date_str is not in valid YYYY-MM-DD format
4346
"""
44-
# Organize by year and month for better file management
47+
import re
48+
49+
# Strict validation to prevent path traversal attacks
50+
if not re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}$", date_str):
51+
raise ValueError(f"Invalid date format: {date_str}. Must be YYYY-MM-DD")
52+
53+
# Parse and validate the date
4554
try:
4655
date = datetime.strptime(date_str, "%Y-%m-%d")
47-
year = date.strftime("%Y")
48-
month = date.strftime("%m")
56+
except ValueError as e:
57+
raise ValueError(f"Invalid date: {date_str}. {e}")
4958

50-
month_dir = self.daily_dir / year / month
51-
month_dir.mkdir(parents=True, exist_ok=True)
59+
year = date.strftime("%Y")
60+
month = date.strftime("%m")
5261

53-
return month_dir / f"{date_str}.json"
54-
except ValueError:
55-
# Fallback for invalid date formats
56-
return self.daily_dir / f"{date_str}.json"
62+
month_dir = self.daily_dir / year / month
63+
month_dir.mkdir(parents=True, exist_ok=True)
64+
65+
return month_dir / f"{date_str}.json"
5766

5867
def save_daily_data(
5968
self, daily_data: List[Dict[str, Any]], overwrite: bool = False
@@ -85,34 +94,55 @@ def save_daily_data(
8594
if file_path.exists() and not overwrite:
8695
# Load existing data to check if it needs updating
8796
try:
88-
with open(file_path, "r") as f:
97+
with open(file_path, "r", encoding="utf-8") as f:
8998
existing_data = json.load(f)
9099

91100
# If the data is identical, skip
92101
if existing_data == day_data:
93102
self._saved_dates.add(date_str)
94103
continue
95104

96-
# If existing data has more information, keep it
97-
existing_tokens = existing_data.get(
98-
"input_tokens", 0
99-
) + existing_data.get("output_tokens", 0)
100-
new_tokens = day_data.get("input_tokens", 0) + day_data.get(
101-
"output_tokens", 0
105+
# Compare total information to decide which data to keep
106+
# Sum all token counts for comparison
107+
existing_total_tokens = (
108+
existing_data.get("input_tokens", 0)
109+
+ existing_data.get("output_tokens", 0)
110+
+ existing_data.get("cache_creation_tokens", 0)
111+
+ existing_data.get("cache_read_tokens", 0)
112+
)
113+
new_total_tokens = (
114+
day_data.get("input_tokens", 0)
115+
+ day_data.get("output_tokens", 0)
116+
+ day_data.get("cache_creation_tokens", 0)
117+
+ day_data.get("cache_read_tokens", 0)
102118
)
103119

104-
if existing_tokens >= new_tokens:
120+
# Compare entries count and cost
121+
existing_entries = existing_data.get("entries_count", 0)
122+
new_entries = day_data.get("entries_count", 0)
123+
existing_cost = existing_data.get("total_cost", 0.0)
124+
new_cost = day_data.get("total_cost", 0.0)
125+
126+
# Keep existing only if it has more total tokens, more entries, AND higher cost
127+
# This ensures we don't lose any valuable information
128+
if (
129+
existing_total_tokens > new_total_tokens
130+
and existing_entries >= new_entries
131+
and existing_cost >= new_cost
132+
):
105133
self._saved_dates.add(date_str)
106134
continue
107135

136+
# Otherwise, save the new data (it has more information)
137+
108138
except Exception as e:
109139
logger.warning(f"Error reading existing data for {date_str}: {e}")
110140

111141
# Save the data
112142
try:
113143
temp_file = file_path.with_suffix(".tmp")
114-
with open(temp_file, "w") as f:
115-
json.dump(day_data, f, indent=2, default=str)
144+
with open(temp_file, "w", encoding="utf-8") as f:
145+
json.dump(day_data, f, indent=2, default=str, ensure_ascii=False)
116146
temp_file.replace(file_path)
117147

118148
self._saved_dates.add(date_str)
@@ -198,7 +228,7 @@ def load_historical_daily_data(
198228
continue
199229

200230
# Load the data
201-
with open(file_path, "r") as f:
231+
with open(file_path, "r", encoding="utf-8") as f:
202232
data = json.load(f)
203233
historical_data.append(data)
204234

src/tests/test_history_manager.py

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@ def test_get_daily_file_path_invalid_date(
4646
) -> None:
4747
"""Test file path generation with invalid date format."""
4848
date_str = "invalid-date"
49-
expected_path = history_manager.daily_dir / "invalid-date.json"
50-
actual_path = history_manager._get_daily_file_path(date_str)
51-
assert actual_path == expected_path
49+
with pytest.raises(ValueError, match="Invalid date format"):
50+
history_manager._get_daily_file_path(date_str)
51+
52+
# Test path traversal attempt
53+
malicious_str = "../../../etc/passwd"
54+
with pytest.raises(ValueError, match="Invalid date format"):
55+
history_manager._get_daily_file_path(malicious_str)
5256

5357
def test_save_daily_data(self, history_manager: HistoryManager) -> None:
5458
"""Test saving daily data."""
@@ -349,33 +353,86 @@ def test_save_daily_data_missing_date(
349353
def test_save_daily_data_with_existing_better_data(
350354
self, history_manager: HistoryManager
351355
) -> None:
352-
"""Test that existing data with more tokens is preserved."""
353-
# Save initial data with more tokens
356+
"""Test that existing data with more total tokens is preserved."""
357+
# Save initial data with more total tokens
354358
initial_data = [
355359
{
356360
"date": "2024-12-15",
357361
"input_tokens": 2000,
358362
"output_tokens": 1000,
363+
"cache_creation_tokens": 100,
364+
"cache_read_tokens": 50,
365+
"total_cost": 0.10,
366+
"entries_count": 20,
359367
}
360368
]
361369
history_manager.save_daily_data(initial_data)
362370

363371
# Clear saved dates to allow checking existing file
364372
history_manager._saved_dates.clear()
365373

366-
# Try to save data with fewer tokens
374+
# Try to save data with fewer total tokens
367375
new_data = [
368376
{
369377
"date": "2024-12-15",
370378
"input_tokens": 500,
371379
"output_tokens": 250,
380+
"cache_creation_tokens": 50,
381+
"cache_read_tokens": 25,
382+
"total_cost": 0.05,
383+
"entries_count": 10,
372384
}
373385
]
374386
saved_count = history_manager.save_daily_data(new_data, overwrite=False)
375387
assert saved_count == 0
376388

377389
# Verify original data is preserved
378390
file_path = history_manager._get_daily_file_path("2024-12-15")
379-
with open(file_path, "r") as f:
391+
with open(file_path, "r", encoding="utf-8") as f:
380392
saved_data = json.load(f)
381393
assert saved_data["input_tokens"] == 2000
394+
assert saved_data["total_cost"] == 0.10
395+
396+
def test_save_daily_data_updates_with_more_info(
397+
self, history_manager: HistoryManager
398+
) -> None:
399+
"""Test that new data with more information replaces old data."""
400+
# Save initial data
401+
initial_data = [
402+
{
403+
"date": "2024-12-16",
404+
"input_tokens": 1000,
405+
"output_tokens": 500,
406+
"cache_creation_tokens": 100,
407+
"cache_read_tokens": 50,
408+
"total_cost": 0.05,
409+
"entries_count": 10,
410+
}
411+
]
412+
history_manager.save_daily_data(initial_data)
413+
414+
# Clear saved dates
415+
history_manager._saved_dates.clear()
416+
417+
# Save new data with more total tokens
418+
new_data = [
419+
{
420+
"date": "2024-12-16",
421+
"input_tokens": 900,
422+
"output_tokens": 600,
423+
"cache_creation_tokens": 200,
424+
"cache_read_tokens": 100,
425+
"total_cost": 0.08,
426+
"entries_count": 15,
427+
}
428+
]
429+
saved_count = history_manager.save_daily_data(new_data, overwrite=False)
430+
assert saved_count == 1
431+
432+
# Verify new data was saved
433+
file_path = history_manager._get_daily_file_path("2024-12-16")
434+
with open(file_path, "r", encoding="utf-8") as f:
435+
saved_data = json.load(f)
436+
assert saved_data["input_tokens"] == 900
437+
assert saved_data["cache_creation_tokens"] == 200
438+
assert saved_data["total_cost"] == 0.08

0 commit comments

Comments
 (0)