docs: Add replay protection example (#9)#163
docs: Add replay protection example (#9)#163yuzengbaao wants to merge 1 commit intoScottcjn:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new reference example demonstrating replay-protected signature verification for Beacon-style message handling (nonce uniqueness + timestamp TTL window), addressing the replay-protection gap described in issue #9.
Changes:
- Introduces
SecureMessageHandlerthat verifies Ed25519 signatures, enforces a TTL window, and rejects duplicate nonces. - Implements a simple in-memory
NonceCachewith TTL-based cleanup. - Provides a runnable set of test vectors (valid, duplicate nonce, stale timestamp, future timestamp, missing fields).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """Beacon Replay Protection Example. | ||
|
|
There was a problem hiding this comment.
All other scripts in examples/ include a #!/usr/bin/env python3 shebang at the top; adding it here would keep the examples consistent and makes the file directly executable on Unix-like systems.
| ts = payload.get("ts") | ||
| nonce = payload.get("nonce") | ||
| sender = payload.get("agent_id") | ||
|
|
||
| if not ts or not nonce or not sender: | ||
| return {"error": "Missing replay protection fields (ts, nonce, agent_id)", "status": 400} | ||
|
|
||
| # 3. Validate Timestamp (TTL validation) | ||
| now = int(time.time()) | ||
| age = now - ts | ||
|
|
||
| # Reject future timestamps (allowing 5s clock skew) | ||
| if age < -5: | ||
| return {"error": "Timestamp in the future", "status": 400} | ||
|
|
||
| if age > TIMESTAMP_TTL_SECONDS: | ||
| return {"error": f"Message expired. Age: {age}s, TTL: {TIMESTAMP_TTL_SECONDS}s", "status": 401} | ||
|
|
||
| # 4. Check Nonce (duplicate prevention) | ||
| if not self.nonce_cache.check_and_add(sender, nonce, now): | ||
| return {"error": "Replayed message (duplicate nonce)", "status": 401} |
There was a problem hiding this comment.
This example trusts agent_id from the signed payload for nonce-scoping. An attacker can sign with a valid key but vary agent_id to bypass duplicate-nonce detection (and/or poison the cache). Consider deriving agent_id from pubkey_hex (via agent_id_from_pubkey) and rejecting the message if the payload’s agent_id doesn’t match the derived value; then use the derived ID for the cache key.
|
|
||
| if not ts or not nonce or not sender: | ||
| return {"error": "Missing replay protection fields (ts, nonce, agent_id)", "status": 400} | ||
|
|
||
| # 3. Validate Timestamp (TTL validation) | ||
| now = int(time.time()) | ||
| age = now - ts |
There was a problem hiding this comment.
payload.get("ts") is used as an int later (age = now - ts), but JSON may supply it as a string/float and this will raise a TypeError. Also, if not ts will treat ts=0 as “missing” even though the field exists. Consider validating/coercing ts to an int (and handling ValueError), and checking required fields with is None / explicit string checks rather than truthiness.
| if not ts or not nonce or not sender: | |
| return {"error": "Missing replay protection fields (ts, nonce, agent_id)", "status": 400} | |
| # 3. Validate Timestamp (TTL validation) | |
| now = int(time.time()) | |
| age = now - ts | |
| # Explicitly check for missing fields; allow values like 0 for ts | |
| if ts is None or nonce is None or sender is None: | |
| return {"error": "Missing replay protection fields (ts, nonce, agent_id)", "status": 400} | |
| # Coerce timestamp to int and validate type | |
| try: | |
| ts_int = int(ts) | |
| except (TypeError, ValueError): | |
| return {"error": "Invalid ts field; must be integer Unix timestamp", "status": 400} | |
| # 3. Validate Timestamp (TTL validation) | |
| now = int(time.time()) | |
| age = now - ts_int |
| try: | ||
| payload = json.loads(raw_payload.decode("utf-8")) | ||
| except json.JSONDecodeError: | ||
| return {"error": "Invalid JSON format", "status": 400} | ||
|
|
||
| # 2. Check for required replay protection fields | ||
| ts = payload.get("ts") | ||
| nonce = payload.get("nonce") | ||
| sender = payload.get("agent_id") |
There was a problem hiding this comment.
raw_payload.decode("utf-8") can raise UnicodeDecodeError, which currently isn’t caught, and json.loads(...) can return a non-dict JSON type (e.g., list) leading to an AttributeError on .get(...). Consider catching UnicodeDecodeError alongside JSONDecodeError and validating that the decoded JSON is an object/dict before accessing .get.
🎯 Claiming this Bounty!Plan:
💰 Payment InformationRTC Address: Quality Commitment:
Let's build this! 🛡️ |
💰 Payment Information (补充)PayPal: 979749654@qq.com
|
💰 Payment InformationPayPal: 979749654@qq.com |
|
Closing — @yuzengbaao, we have seen a pattern of template/auto-generated PRs from this account across multiple Elyan Labs repos. beacon-skill already has 15 working transports and a complete HeartbeatManager with Ed25519 signing, peer tracking, and on-chain anchoring. New transports need to follow existing patterns in If you want to contribute genuinely: read the existing code, pick ONE transport, and submit a PR that works end-to-end with tests. Quality over quantity. This is Sophia's House — we teach, we don't just reject. But we do require real code. |
This PR provides a reference implementation for replay-protected signature verification as requested in #9.
Included in
examples/replay_protection.py:Closes #9.