Skip to content

Add agentic platform: event bus, webhook API, scheduler, notifications#21

Merged
RichardAtCT merged 3 commits intomainfrom
feature/agentic-platform
Feb 13, 2026
Merged

Add agentic platform: event bus, webhook API, scheduler, notifications#21
RichardAtCT merged 3 commits intomainfrom
feature/agentic-platform

Conversation

@RichardAtCT
Copy link
Owner

Summary

  • Event bus (src/events/) — Async event bus with typed events (UserMessage, Webhook, Scheduled, AgentResponse), an AgentHandler that bridges events to ClaudeIntegration.run_command(), and security middleware wrapping existing validators
  • Webhook API (src/api/) — FastAPI server (POST /webhooks/{provider}) with GitHub HMAC-SHA256 signature verification and generic Bearer token auth
  • Scheduler (src/scheduler/) — APScheduler-based cron system with SQLite persistence for job definitions, publishes ScheduledEvents to the bus
  • Notification service (src/notifications/) — Subscribes to AgentResponseEvent, delivers to Telegram with per-chat rate limiting (1 msg/sec) and automatic message splitting
  • Config & wiring — 6 new settings fields, 2 new feature flags (enable_api_server, enable_scheduler), DB migration 3 (scheduled_jobs + webhook_events tables, WAL mode), concurrent task orchestration in main.py with ordered shutdown

All new features are disabled by default via feature flags. The existing Telegram handler path is untouched — the event bus runs alongside it. No custom plugin system; leverages Claude Code's native Skills + MCP servers for integrations.

Test plan

  • 40 new unit tests covering event bus, webhook auth, API endpoints, notification service, handler routing
  • All 240 tests passing (poetry run pytest)
  • black + isort + flake8 clean on all new/modified files
  • mypy clean on all new modules
  • Manual test: send Telegram message → bot responds (existing path unbroken)
  • Manual test: curl -X POST localhost:8080/webhooks/test with ENABLE_API_SERVER=true

🤖 Generated with Claude Code

Transform the bot from a reactive chat relay into a proactive agentic
platform. New components run alongside the existing Telegram handler
path, controlled by feature flags (disabled by default).

New modules:
- src/events/ - Async event bus with typed events (UserMessage, Webhook,
  Scheduled, AgentResponse), handlers that bridge events to Claude, and
  security middleware wrapping existing validators
- src/api/ - FastAPI webhook server (POST /webhooks/{provider}) with
  GitHub HMAC-SHA256 signature verification and generic Bearer auth
- src/scheduler/ - APScheduler-based cron scheduler that publishes
  ScheduledEvents, with SQLite persistence for job definitions
- src/notifications/ - Delivery service subscribing to AgentResponseEvent,
  sends via Telegram with per-chat rate limiting and message splitting

Changes to existing files:
- settings.py: enable_api_server, api_server_port, enable_scheduler,
  github_webhook_secret, webhook_api_secret, notification_chat_ids
- features.py: api_server_enabled, scheduler_enabled properties
- database.py: Migration 3 adds scheduled_jobs + webhook_events tables,
  enables WAL mode for concurrent writes
- main.py: Wires event bus, notification service, API server, and
  scheduler as concurrent tasks with ordered shutdown
- bot/core.py: Make initialize() idempotent for pre-init access to
  Telegram Bot instance
- pyproject.toml: Add fastapi, uvicorn, apscheduler dependencies

Tests: 40 new tests (240 total), all passing
@RichardAtCT
Copy link
Owner Author

PR Review
Reviewed head: 26b4d3a4245974bfd0701eaae977eab8269b7189

Summary

  • Adds an event-driven sidecar architecture (bus + handlers) alongside the existing Telegram flow.
  • Introduces webhook ingestion via FastAPI and recurring jobs via APScheduler.
  • Adds a notification pipeline for sending agent responses back to Telegram chats.

What looks good

  • Clean separation of concerns between ingestion (api / scheduler), processing (events/handlers), and delivery (notifications/service).
  • Feature flags (enable_api_server, enable_scheduler) reduce rollout risk and keep existing Telegram path intact.
  • Good unit test coverage for core new modules, especially event bus and webhook auth behavior.

Issues / questions

  1. [Blocker] src/api/server.py — generic webhooks are accepted with no auth when webhook_api_secret is unset (if secret and not verify_shared_secret(...)). Since events are forwarded into the agent execution path, enabling ENABLE_API_SERVER=true without a secret effectively exposes an unauthenticated remote trigger surface. I’d strongly recommend fail-closed (require auth by default) or at minimum restrict unauthenticated mode to localhost/dev only.

  2. [Important] src/api/server.py, src/storage/database.py — you create webhook_events with unique delivery_id, but there’s no dedupe check/write before publishing to the bus. Provider retries (e.g., GitHub redelivery) will be re-processed and can duplicate downstream agent runs/notifications. Suggest persisting (provider, delivery_id) first and short-circuiting if already seen.

  3. [Important] src/main.py — after asyncio.wait(..., FIRST_COMPLETED), the completed task result is never inspected. If bot_task/api_task exits with an exception, shutdown will proceed without surfacing root cause clearly. Consider explicitly checking done tasks (task.exception()) and logging/re-raising non-cancellation failures.

Suggested tests (if needed)

  • API server enabled + webhook_api_secret unset: assert generic webhook requests are rejected (or only allowed in explicit dev mode).
  • Duplicate webhook delivery: same X-Delivery-ID twice should publish only once.
  • Task failure propagation: force API task failure and assert app exits with logged exception details.

Verdict

  • ⚠️ Merge after fixes

1. Generic webhooks now require WEBHOOK_API_SECRET (fail-closed). Without
   a configured secret, the endpoint returns 500 instead of accepting
   unauthenticated requests.

2. Webhook deduplication via delivery_id: before publishing to the bus,
   check webhook_events table for existing delivery_id and short-circuit
   with {"status": "duplicate"} if already seen. Records events for audit.

3. After asyncio.wait(FIRST_COMPLETED), inspect completed tasks for
   exceptions and log them with task name and error type before proceeding
   to shutdown.
@RichardAtCT
Copy link
Owner Author

PR Review
Reviewed head: 3e37d5bc19aac7ff9767cfa706714fb7776252dc

Summary

  • Re-review of changes since prior review marker.
  • You addressed the fail-closed auth concern for generic webhooks and added task failure inspection/logging in main.py.
  • You also added webhook-event recording + duplicate detection scaffolding.

What looks good

  • src/api/server.py now requires WEBHOOK_API_SECRET for non-GitHub providers (good security default).
  • src/main.py now inspects completed tasks and logs explicit failures instead of silently continuing.
  • ✅ Tests were updated to reflect stricter auth behavior.

Issues / questions

  1. [Important] src/api/server.py — dedupe is still non-atomic under concurrency. Current flow does SELECT delivery_id (_check_and_record_delivery) and later INSERT OR IGNORE (_record_webhook_event), with publish in between. Two concurrent identical deliveries can both pass the SELECT before either insert, and both will publish. Suggest making dedupe a single atomic insert-first gate (attempt INSERT on unique delivery_id, then only publish when insert actually succeeded).

Suggested tests (if needed)

  • Concurrent duplicate delivery test (same X-Delivery-ID sent in parallel) should result in exactly one published event.

Verdict

  • ⚠️ Merge after fixes

Replace the SELECT-then-INSERT dedup flow with a single atomic
INSERT OR IGNORE followed by SELECT changes(). The insert only
succeeds when delivery_id is new (UNIQUE constraint), so concurrent
identical deliveries cannot both pass the gate. Only publish to the
event bus when the insert actually wrote a row.
@RichardAtCT
Copy link
Owner Author

PR Review
Reviewed head: 912e104ac4473dc5bd05610de772d54805c7b292

Summary

  • Re-review of incremental changes since last review.
  • You replaced the check-then-insert dedupe flow with an insert-first atomic gate.

What looks good

  • src/api/server.py now does atomic dedupe via INSERT OR IGNORE + SELECT changes(), then only publishes when inserted.
  • ✅ This closes the concurrency race where duplicate deliveries could both be published.
  • ✅ Refactor is clear and keeps behavior readable.

Issues / questions

  • No additional issues from this delta.

Suggested tests (if needed)

  • Optional hardening: add a concurrency test for same X-Delivery-ID in parallel to lock in this behavior.

Verdict

  • ✅ Ready to merge

@RichardAtCT RichardAtCT merged commit 139fa3a into main Feb 13, 2026
1 check passed
@RichardAtCT RichardAtCT deleted the feature/agentic-platform branch February 13, 2026 13:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant