-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
2297 lines (1941 loc) · 86.9 KB
/
bot.py
File metadata and controls
2297 lines (1941 loc) · 86.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Unified AI Telegram Bot.
Combines general AI chat (streaming replies, conversation history, vision
models) and the AI Company bot (multi-role company discussions, /build,
/autorun, code review) into a single process that requires only one
``TELEGRAM_TOKEN``.
Commands
--------
General chat
/ask — ask the AI a question (streaming live updates)
/cancel — cancel your current in-progress request
/models — list available AI models
/settings — view / set your preferred AI model
AI company / build
/company — multi-role company discussion
/build — developer team discussion + code generation + GitHub commit
/autorun — fully autonomous build (AI picks the task)
/company_roles — list available roles
Weather
/weather — current Hong Kong weather + AI clothing suggestions
Universal
/followup — context-aware continuation:
• cancels an active stream and redirects (if one is running)
• build-session follow-up (if a /build or /autorun session exists)
supports unlimited chained follow-ups
• general chat follow-up (otherwise)
/about — show this help guide
Environment variables
---------------------
Required
TELEGRAM_TOKEN Telegram bot token from BotFather
POLLINATIONS_TOKEN Pollinations AI API key
Optional (needed for /build GitHub commit)
GITHUB_TOKEN GitHub PAT or Actions token with repo write access
GITHUB_REPOSITORY Repo in "owner/repo" format
Optional (Hong Kong weather auto-reminders)
WEATHER_CHAT_ID Telegram chat ID to post auto weather reminders
WEATHER_REMINDER_HOURS Comma-separated HKT hours to send reminders (default: 8)
Example: "8,20" sends at 08:00 and 20:00 HKT
"""
import asyncio
import base64
import datetime
import json
import logging
import os
import posixpath
import re
import xml.etree.ElementTree as ET
from typing import AsyncGenerator, Callable, Optional
import aiohttp
from telegram import (
Bot,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
Update,
)
from telegram.constants import ParseMode
from telegram.error import BadRequest, TelegramError
from telegram.ext import (
Application,
ApplicationHandlerStop,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from zoneinfo import ZoneInfo
_HKT = ZoneInfo("Asia/Hong_Kong")
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
TELEGRAM_TOKEN: str = os.environ.get("TELEGRAM_TOKEN", "")
POLLINATIONS_TOKEN: str = os.environ.get("POLLINATIONS_TOKEN", "")
# GitHub integration (optional — only needed for /build file commits)
GITHUB_TOKEN: str = os.environ.get("GITHUB_TOKEN", "")
GITHUB_REPOSITORY: str = os.environ.get("GITHUB_REPOSITORY", "")
GITHUB_API_BASE = "https://api.github.com"
PROJECT_FOLDER = "project"
# Hong Kong weather auto-reminders (optional)
# WEATHER_CHAT_ID — Telegram chat ID to post auto reminders
# WEATHER_REMINDER_HOURS — comma-separated HKT hours, e.g. "8,20"
WEATHER_CHAT_ID: int = int(os.environ.get("WEATHER_CHAT_ID", "0") or "0")
_raw_hours = os.environ.get("WEATHER_REMINDER_HOURS", "8")
WEATHER_REMINDER_HOURS: list[int] = [
int(h.strip()) for h in _raw_hours.split(",") if h.strip().isdigit()
]
HKO_RSS_URL = "https://rss.weather.gov.hk/rss/CurrentWeather_uc.xml"
AI_API_URL = "https://gen.pollinations.ai/v1/chat/completions"
# Model fallback chain — tried in order; next model used on any error.
MODEL_CHAIN = [
"openai-fast",
"gemini-search",
"openai",
"glm",
"claude-fast",
"qwen-character",
"deepseek",
"qwen-safety",
]
TELEGRAM_MAX_LENGTH = 4096
STREAM_EDIT_INTERVAL = 2.0 # seconds between live edit updates
STREAM_DISPLAY_LIMIT = 4000 # chars shown in a streaming placeholder
MAX_HISTORY = 20 # max conversation messages per chat
# Weather parsing constants
_HKO_TEMP_RE = re.compile(r"氣溫\s*[::]\s*(\d+(?:\.\d+)?)")
_HKO_HUMID_RE = re.compile(r"相對濕度\s*[::]\s*(\d+)")
_HKO_SUMMARY_RE = re.compile(r"天氣\s*[::]\s*([^\n。.]+)")
# Relative humidity at or above this level triggers an umbrella suggestion.
HIGH_HUMIDITY_THRESHOLD = 85
# ---------------------------------------------------------------------------
# Model metadata (general chat)
# ---------------------------------------------------------------------------
VISION_MODELS: set[str] = {
"openai",
"openai-fast",
"gemini",
"gemini-fast",
"gemini-large",
"gemini-search",
"claude-fast",
}
ALL_MODELS: set[str] = set(MODEL_CHAIN)
MODEL_INFO: dict[str, tuple[str, bool]] = {
"openai-fast": ("Fast OpenAI model — default first choice", True),
"openai": ("Full OpenAI model", True),
"gemini-search": ("Gemini with Google Search grounding", True),
"gemini": ("Standard Gemini model", True),
"gemini-fast": ("Faster Gemini variant", True),
"gemini-large": ("Larger Gemini variant", True),
"claude-fast": ("Fast Claude model", True),
"glm": ("GLM model", False),
"qwen-character": ("Qwen character model", False),
"deepseek": ("DeepSeek model", False),
"qwen-safety": ("Qwen safety-focused model", False),
}
# ---------------------------------------------------------------------------
# AI Company role configuration
# ---------------------------------------------------------------------------
DEFAULT_ROLES = [
"CEO", "CTO", "Product Manager", "Designer", "Engineer", "Marketing Manager",
]
DEFAULT_BUILD_ROLES = [
"CTO", "Backend Developer", "Frontend Developer", "QA Engineer", "DevOps Engineer",
]
ROLE_PROMPTS: dict[str, str] = {
"CEO": (
"You are the CEO of a technology company. "
"Focus on business strategy, return on investment, market opportunity, and high-level vision. "
"Be concise and decisive."
),
"CTO": (
"You are the CTO of a technology company. "
"Focus on technical architecture, feasibility, scalability, security, and technology stack choices. "
"Be precise and practical."
),
"Product Manager": (
"You are the Product Manager. "
"Focus on user needs, product requirements, prioritization, success metrics, and the product roadmap. "
"Be user-centric and data-driven."
),
"Designer": (
"You are the Lead UX/UI Designer. "
"Focus on user experience, interface design, accessibility, visual identity, and usability. "
"Be creative and empathetic."
),
"Engineer": (
"You are the Lead Software Engineer. "
"Focus on implementation details, technical challenges, development timelines, testing, and code quality. "
"Be realistic and thorough."
),
"Marketing Manager": (
"You are the Marketing Manager. "
"Focus on target audience, brand positioning, growth strategies, content, and messaging. "
"Be persuasive and market-aware."
),
"Data Scientist": (
"You are the Data Scientist. "
"Focus on data requirements, machine learning models, analytics, insights, and data-driven decisions. "
"Be analytical and evidence-based."
),
"Legal Counsel": (
"You are the Legal Counsel. "
"Focus on legal risks, regulatory compliance, intellectual property, privacy laws, and contracts. "
"Be cautious and thorough."
),
"Finance Manager": (
"You are the Finance Manager. "
"Focus on budget planning, cost estimation, revenue projections, financial risks, and ROI analysis. "
"Be precise and conservative."
),
"HR Manager": (
"You are the HR Manager. "
"Focus on team structure, talent requirements, company culture, onboarding, and people management. "
"Be people-focused and empathetic."
),
"Frontend Developer": (
"You are the Frontend Developer. "
"Focus on UI implementation with React/Vue/HTML/CSS/JavaScript, component design, "
"responsiveness, and browser compatibility. "
"Propose concrete frontend architecture and the key components to build."
),
"Backend Developer": (
"You are the Backend Developer. "
"Focus on server-side logic, REST/GraphQL API design, database schemas, authentication, "
"and backend performance. "
"Propose concrete API endpoints, data models, and backend architecture."
),
"Full Stack Developer": (
"You are the Full Stack Developer. "
"Focus on end-to-end implementation, bridging frontend and backend, data flow, "
"and integration points. "
"Provide a holistic view of the implementation."
),
"QA Engineer": (
"You are the QA Engineer. "
"Focus on testing strategies, unit tests, integration tests, edge cases, "
"bug prevention, and quality standards. "
"Outline the key test scenarios and quality gates for the project."
),
"DevOps Engineer": (
"You are the DevOps Engineer. "
"Focus on CI/CD pipelines, Docker/containerization, deployment strategies, "
"monitoring, and infrastructure as code. "
"Propose the deployment setup and toolchain."
),
}
# ---------------------------------------------------------------------------
# Bot application (set up after handlers are defined)
# ---------------------------------------------------------------------------
# Built at the bottom of this file via _build_application()
_app: Optional[Application] = None
# ---------------------------------------------------------------------------
# In-memory state
# ---------------------------------------------------------------------------
# Per-chat build sessions for /followup continuations.
# Structure: { chat_id: { task, discussion, final_outcome, code_files, ... } }
build_sessions: dict[int, dict] = {}
# Per-chat conversation history for general chat.
# Structure: { chat_id: [ {"role": ..., "content": ...}, ... ] }
conversation_history: dict[int, list[dict]] = {}
# Per-user preferred model (set via /settings).
user_preferred_models: dict[int, str] = {}
# Per-user active streaming tasks (used by /cancel and /followup interrupt mode).
active_requests: dict[int, asyncio.Task] = {}
# Per-chat asyncio locks for build-session follow-ups.
# Prevents two concurrent /followup invocations from racing on the same session.
followup_locks: dict[int, asyncio.Lock] = {}
# Pending interactive callbacks: maps a unique key → asyncio.Future
# Used to pass user replies (text or button presses) back to running handlers.
_interactive_futures: dict[str, "asyncio.Future[Optional[str]]"] = {}
# Tracks (date_ordinal, hour) tuples for which a weather reminder was already sent.
_weather_sent_hours: set[tuple[int, int]] = set()
# ---------------------------------------------------------------------------
# Shared text utilities
# ---------------------------------------------------------------------------
def _open_fence(text: str) -> str | None:
"""Return the opening fence token if *text* ends with an unclosed Markdown
code fence, otherwise ``None``."""
open_token: str | None = None
pos = 0
while True:
idx = text.find("```", pos)
if idx == -1:
break
if open_token is None:
rest = text[idx + 3:]
nl = rest.find("\n")
lang = rest[:nl].strip() if nl != -1 else rest.strip()
open_token = "```" + lang
else:
open_token = None
pos = idx + 3
return open_token
def split_message(text: str, limit: int = TELEGRAM_MAX_LENGTH) -> list[str]:
"""Split *text* into chunks ≤ *limit* characters.
Prefers splitting on newlines, then spaces. Closes and re-opens Markdown
code fences across splits so formatting stays intact.
"""
if len(text) <= limit:
return [text]
chunks: list[str] = []
while len(text) > limit:
split_pos = text.rfind("\n", 0, limit)
if split_pos == -1:
split_pos = text.rfind(" ", 0, limit)
if split_pos == -1:
split_pos = limit
chunk = text[:split_pos]
fence = _open_fence(chunk)
if fence is not None:
chunk += "\n```"
text = fence + "\n" + text[split_pos:].lstrip("\n")
else:
text = text[split_pos:].lstrip("\n")
chunks.append(chunk)
if text:
chunks.append(text)
return chunks
def slugify(text: str) -> str:
"""Convert *text* to a filesystem/URL-safe slug (max 50 chars)."""
text = text.lower().strip()
text = re.sub(r"[^\w\s-]", "", text)
text = re.sub(r"[\s_]+", "-", text)
text = text.strip("-")
return text[:50] or "project"
def parse_code_files(text: str) -> list[tuple[str, str]]:
"""Extract ``(filename, content)`` pairs from AI output using the
``### File: <name>`` / ``` format."""
pattern = r"### File:\s*([^\n]+)\n```[^\n]*\n(.*?)```"
matches = re.findall(pattern, text, re.DOTALL)
return [(fn.strip(), code.rstrip()) for fn, code in matches]
def _prior_context_note(prior_count: int) -> str:
if prior_count == 0:
return "*(first to speak)*"
return f"*(read {prior_count} prior response{'s' if prior_count != 1 else ''})*"
# ---------------------------------------------------------------------------
# General chat AI helpers
# ---------------------------------------------------------------------------
def build_model_chain(preferred_model: str | None) -> list[str]:
if preferred_model is None:
return MODEL_CHAIN
rest = [m for m in MODEL_CHAIN if m != preferred_model]
return [preferred_model] + rest
def parse_model_prefix(content: str) -> tuple[str | None, str]:
"""Detect an optional ``@<model-name>``, ``@ai``, or ``@about`` prefix."""
stripped = content.strip()
candidate = stripped[1:] if stripped.startswith("@") else stripped
parts = candidate.split(None, 1)
if not parts:
return None, content
first_word = parts[0].lower()
remainder = parts[1].strip() if len(parts) > 1 else ""
if first_word == "about":
return "about", remainder
if first_word == "ai":
return None, remainder
if first_word in ALL_MODELS:
return first_word, remainder
return None, content
def _fallback_footer(
model_used: str | None, preferred_model: str | None, is_fallback: bool
) -> str:
if not model_used:
return ""
if is_fallback:
return f"\n\n*— Response generated by **{model_used}** (fallback)*"
if preferred_model and model_used != preferred_model:
return f"\n\n*— Response generated by **{model_used}** (fallback from {preferred_model})*"
return ""
def get_image_urls_from_tg(message: Message) -> list[str]:
"""Return image file IDs from a Telegram message (photos or image documents)."""
urls: list[str] = []
if message.photo:
# message.photo is a list of PhotoSize (smallest → largest); take the largest
largest = message.photo[-1]
urls.append(f"tg://{largest.file_id}")
if message.document and message.document.mime_type and message.document.mime_type.startswith("image/"):
urls.append(f"tg://{message.document.file_id}")
return urls
def _update_history(chat_id: int, user_text: str, assistant_reply: str) -> None:
history = conversation_history.setdefault(chat_id, [])
history.append({"role": "user", "content": user_text})
history.append({"role": "assistant", "content": assistant_reply})
if len(history) > MAX_HISTORY:
conversation_history[chat_id] = history[-MAX_HISTORY:]
async def _single_model_call(
session: aiohttp.ClientSession,
model: str,
user_message: str,
image_urls: list[str] | None = None,
history: list[dict] | None = None,
) -> str:
"""Make a single non-streaming call to one specific model."""
headers = {
"Authorization": f"Bearer {POLLINATIONS_TOKEN}",
"Content-Type": "application/json",
}
if image_urls and model in VISION_MODELS:
message_content: list[dict] | str = [{"type": "text", "text": user_message}]
for url in image_urls:
message_content.append({"type": "image_url", "image_url": {"url": url}})
else:
message_content = user_message
messages: list[dict] = list(history) if history else []
messages.append({"role": "user", "content": message_content})
payload = {"model": model, "messages": messages}
async with session.post(
AI_API_URL, json=payload, headers=headers,
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
resp.raise_for_status()
data = await resp.json()
return data["choices"][0]["message"]["content"]
async def _iter_stream_chunks(
session: aiohttp.ClientSession,
model: str,
user_message: str,
image_urls: list[str] | None = None,
history: list[dict] | None = None,
) -> AsyncGenerator[str, None]:
"""Yield text tokens from the AI using SSE streaming."""
headers = {
"Authorization": f"Bearer {POLLINATIONS_TOKEN}",
"Content-Type": "application/json",
}
if image_urls and model in VISION_MODELS:
message_content: list[dict] | str = [{"type": "text", "text": user_message}]
for url in image_urls:
message_content.append({"type": "image_url", "image_url": {"url": url}})
else:
message_content = user_message
messages_list: list[dict] = list(history) if history else []
messages_list.append({"role": "user", "content": message_content})
payload = {"model": model, "messages": messages_list, "stream": True}
async with session.post(
AI_API_URL, json=payload, headers=headers,
timeout=aiohttp.ClientTimeout(total=120),
) as resp:
resp.raise_for_status()
pending = ""
async for raw in resp.content.iter_any():
pending += raw.decode("utf-8", errors="replace")
while "\n" in pending:
line, pending = pending.split("\n", 1)
line = line.strip()
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str == "[DONE]":
return
try:
obj = json.loads(data_str)
delta = obj["choices"][0]["delta"].get("content") or ""
if delta:
yield delta
except (json.JSONDecodeError, KeyError, IndexError):
pass
async def get_ai_reply_streaming(
user_message: str,
preferred_model: str | None = None,
image_urls: list[str] | None = None,
history: list[dict] | None = None,
progress_cb: Callable | None = None,
) -> tuple[str, str | None, bool]:
"""Stream the AI reply. Returns ``(reply_text, model_used, is_fallback)``."""
chain = build_model_chain(preferred_model)
if image_urls:
chain = [m for m in chain if m in VISION_MODELS]
if not chain:
return (
"⚠️ No vision-capable models are available right now.",
None, False,
)
first_model = chain[0]
last_progress = 0.0
async with aiohttp.ClientSession() as session:
for model in chain:
accumulated = ""
try:
async for chunk in _iter_stream_chunks(session, model, user_message, image_urls, history):
accumulated += chunk
if progress_cb is not None:
now = asyncio.get_running_loop().time()
if now - last_progress >= STREAM_EDIT_INTERVAL:
try:
await progress_cb(accumulated)
last_progress = now
except Exception:
pass
if accumulated:
return accumulated, model, (model != first_model)
# SSE returned nothing — fall back to non-streaming
accumulated = await _single_model_call(session, model, user_message, image_urls, history)
return accumulated, model, (model != first_model)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.warning("Model %s failed (streaming): %s. Trying next…", model, exc)
return "Sorry, all AI models are currently unavailable. Please try again later.", None, False
# ---------------------------------------------------------------------------
# AI Company helpers
# ---------------------------------------------------------------------------
async def call_ai(
session: aiohttp.ClientSession,
messages: list[dict],
) -> str:
"""Call the AI API with a messages list, trying each model in MODEL_CHAIN."""
headers = {
"Authorization": f"Bearer {POLLINATIONS_TOKEN}",
"Content-Type": "application/json",
}
for model in MODEL_CHAIN:
payload = {"model": model, "messages": messages}
try:
async with session.post(
AI_API_URL, json=payload, headers=headers,
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
resp.raise_for_status()
data = await resp.json()
logger.info("Got reply from model %s", model)
return data["choices"][0]["message"]["content"]
except Exception as exc:
logger.warning("Model %s failed: %s. Trying next…", model, exc)
raise RuntimeError("All AI models failed.")
async def generate_project_name(session: aiohttp.ClientSession, task: str) -> str:
messages = [
{
"role": "system",
"content": (
"You generate concise, filesystem-safe project names. "
"Reply with ONLY the project name: lowercase letters, digits, and hyphens, "
"2–4 words maximum, no spaces, no punctuation, no extra text. "
"Example: 'todo-rest-api'."
),
},
{"role": "user", "content": f"Generate a project name for: {task}"},
]
try:
raw = await call_ai(session, messages)
first_token = raw.strip().split()[0] if raw.strip() else ""
slug = slugify(first_token)
if slug and slug != "project":
return slug
except Exception as exc:
logger.warning("Project name generation failed: %s", exc)
return slugify(task)
_AUTO_STACKS: dict[str, str] = {
"python": "Python (Flask, FastAPI, or a standalone CLI/script)",
"php": "PHP + HTML (a dynamic web page or small PHP web API)",
"actions": "GitHub Actions (a YAML workflow for CI/CD or automation)",
}
_STACK_EMOJI: dict[str, str] = {"python": "🐍", "php": "🐘", "actions": "⚙️"}
async def generate_auto_task(
session: aiohttp.ClientSession,
stack: str | None = None,
) -> tuple[str, str]:
if stack and stack in _AUTO_STACKS:
stack_hint = f"The project MUST use this tech stack: {_AUTO_STACKS[stack]}."
else:
options = " | ".join(f"{k}: {v}" for k, v in _AUTO_STACKS.items())
stack_hint = f"Choose ONE of these stacks: {options}."
messages = [
{
"role": "system",
"content": (
"You are a creative software architect who invents interesting, "
"self-contained programming projects that a small team can build in one sprint."
),
},
{
"role": "user",
"content": (
f"Invent a concrete, buildable programming project. {stack_hint}\n"
"Reply in EXACTLY this format — no extra text:\n"
"TASK: <one clear sentence describing what to build>\n"
"STACK: <python | php | actions>"
),
},
]
try:
raw = await call_ai(session, messages)
task_match = re.search(r"TASK:\s*(.+)", raw)
stack_match = re.search(r"STACK:\s*(\w+)", raw)
task = task_match.group(1).strip() if task_match else raw.strip()[:200]
chosen = stack_match.group(1).strip().lower() if stack_match else (stack or "python")
if chosen not in _AUTO_STACKS:
chosen = stack or "python"
return task, chosen
except Exception as exc:
logger.warning("Auto-task generation failed: %s", exc)
fallback_stack = stack if stack in _AUTO_STACKS else "python"
return "Build a simple Python CLI tool that converts CSV files to JSON", fallback_stack
async def run_company_discussion(
task: str,
roles: list[str],
role_done_cb: Optional[Callable] = None,
) -> tuple[list[tuple[str, str]], str]:
"""Run a multi-role AI company discussion. Returns ``(discussion, final_outcome)``."""
discussion: list[tuple[str, str]] = []
injected_inputs: list[str] = []
async with aiohttp.ClientSession() as session:
for role in roles:
system_prompt = ROLE_PROMPTS.get(
role,
f"You are the {role} of a technology company. Share your professional perspective.",
)
user_content = f"**Task:** {task}\n"
if injected_inputs:
user_content += "\n**Stakeholder Input (from the human in the room):**\n"
for idx, inp in enumerate(injected_inputs, 1):
user_content += f"> [{idx}] {inp}\n"
if discussion:
user_content += "\n**Discussion so far:**\n"
for prev_role, prev_reply in discussion:
truncated = prev_reply[:500] + "…" if len(prev_reply) > 500 else prev_reply
user_content += f"\n**{prev_role}:** {truncated}\n"
user_content += (
f"\nNow, as the **{role}**, respond to this task building on the "
"discussion above. Acknowledge specific points raised by other roles. "
"Be concise (2–4 sentences)."
)
else:
user_content += (
f"\nAs the **{role}**, what is your initial take on this task? "
"Be concise (2–4 sentences)."
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
]
try:
reply = await call_ai(session, messages)
discussion.append((role, reply))
except Exception as exc:
logger.error("Role %s failed: %s", role, exc)
reply = "*[No response available]*"
discussion.append((role, reply))
if role_done_cb is not None:
user_input = await role_done_cb(role, reply)
if user_input:
injected_inputs.append(user_input)
# Facilitator synthesis
synthesis_context = f"**Task:** {task}\n\n**Company Discussion:**\n"
for role, reply in discussion:
synthesis_context += f"\n**{role}:** {reply}\n"
if injected_inputs:
synthesis_context += "\n**Stakeholder Interjections:**\n"
for idx, inp in enumerate(injected_inputs, 1):
synthesis_context += f"> [{idx}] {inp}\n"
synthesis_context += (
"\nAs the meeting **Facilitator**, synthesise all perspectives into a "
"clear, structured **Final Outcome** with:\n"
"1. Key decisions made\n"
"2. Recommended next steps (prioritised)\n"
"3. Important risks or considerations\n"
"Be actionable and concise."
)
facilitator_messages = [
{
"role": "system",
"content": (
"You are an expert meeting facilitator who synthesises company "
"discussions into clear, actionable outcomes."
),
},
{"role": "user", "content": synthesis_context},
]
try:
final_outcome = await call_ai(session, facilitator_messages)
except Exception as exc:
logger.error("Final synthesis failed: %s", exc)
final_outcome = "*Unable to generate the final outcome. Please try again.*"
return discussion, final_outcome
async def generate_code_files(
task: str,
discussion: list[tuple[str, str]],
final_outcome: str,
feedback: str | None = None,
) -> tuple[list[tuple[str, str]], str]:
"""Generate code files from the team discussion.
When *feedback* is provided (from a code review) it is injected into the
prompt so the AI fixes those issues in this new attempt.
"""
discussion_context = f"**Task:** {task}\n\n**Developer Team Discussion:**\n"
for role, reply in discussion:
discussion_context += f"\n**{role}:** {reply}\n"
discussion_context += f"\n**Final Plan:**\n{final_outcome}"
feedback_section = (
"\n\n**⚠️ Code Reviewer Feedback — ALL issues below MUST be fixed in this generation:**\n"
f"{feedback}\n\n"
"Fix every issue listed above. Do not leave any TODO, placeholder, or "
"unimplemented section in the output."
if feedback else ""
)
_model_list = "\n".join(f" {i}. {m}" for i, m in enumerate(MODEL_CHAIN, 1))
code_gen_prompt = (
f"{discussion_context}{feedback_section}\n\n"
"Based on the plan above, generate a complete, working codebase.\n\n"
"For EACH file, use EXACTLY this format:\n\n"
"### File: <filename with extension and any sub-path>\n"
"```<language>\n"
"<complete file content>\n"
"```\n\n"
"Include ALL necessary files: source code, configuration files "
"(e.g. package.json, requirements.txt), a Dockerfile if appropriate, "
"and a README.md. Make all code complete and functional — not just placeholders."
)
code_gen_messages = [
{
"role": "system",
"content": (
"You are an expert senior software engineer who writes complete, "
"production-quality code. Generate every file needed for the project. "
"Use exactly the `### File:` / ``` format so the output can be parsed "
"and committed to a repository automatically.\n\n"
"**AI integration — IMPORTANT:**\n"
"If the project uses AI/LLM features, use the Pollinations AI API:\n"
" POST https://gen.pollinations.ai/v1/chat/completions\n"
f" Authorization: Bearer {POLLINATIONS_TOKEN}\n\n"
f"Try models in order: {_model_list}\n"
"Implement the fallback loop so it retries on HTTP errors."
),
},
{"role": "user", "content": code_gen_prompt},
]
async with aiohttp.ClientSession() as session:
try:
raw_output = await call_ai(session, code_gen_messages)
except Exception as exc:
logger.error("Code generation failed: %s", exc)
return [], ""
code_files = parse_code_files(raw_output)
logger.info("Generated %d code file(s)", len(code_files))
return code_files, raw_output
async def review_code_files(
task: str,
code_files: list[tuple[str, str]],
final_outcome: str,
) -> str:
"""Ask a Code Reviewer AI to check generated code for critical issues.
Returns a non-empty string with issue descriptions when critical problems
are found, or an empty string when the code passes review (LGTM).
"""
if not code_files:
return ""
file_content = ""
for filename, content in code_files[:8]:
preview = content[:800] + "\n…(truncated)" if len(content) > 800 else content
file_content += f"\n### File: {filename}\n```\n{preview}\n```\n"
review_prompt = (
f"**Task:** {task}\n\n"
f"**Final Plan (summary):**\n{final_outcome[:400]}\n\n"
f"**Generated Code Files:**\n{file_content}\n\n"
"Review the code above. Identify ONLY critical issues that would prevent "
"the code from running or fulfilling the task:\n"
"• Syntax errors or obvious runtime bugs\n"
"• Missing required imports or undefined names (ImportError/NameError)\n"
"• Placeholder code left unimplemented (TODO, pass, raise NotImplementedError)\n"
"• Broken inter-file dependencies\n\n"
"If the code is complete and runnable, respond with exactly: LGTM\n"
"Otherwise list up to 5 specific critical issues. "
"Do NOT suggest style improvements or optional enhancements."
)
messages = [
{
"role": "system",
"content": (
"You are a strict senior code reviewer. "
"Find only critical bugs that would prevent the code from running. "
"If there are no critical issues, respond with exactly: LGTM"
),
},
{"role": "user", "content": review_prompt},
]
async with aiohttp.ClientSession() as session:
try:
review_output = await call_ai(session, messages)
except Exception as exc:
logger.warning("Code review call failed: %s — skipping review", exc)
return ""
review_text = review_output.strip()
lower = review_text.lower()
if (
review_text.upper().startswith("LGTM")
or lower.startswith("no critical")
or lower.startswith("looks good")
or lower.startswith("the code looks")
or lower.startswith("code looks good")
):
logger.info("Code review passed (LGTM)")
return ""
logger.info("Code review found issues: %.200s", review_text)
return review_text
async def generate_verified_code_files(
task: str,
discussion: list[tuple[str, str]],
final_outcome: str,
send_fn,
max_format_retries: int = 3,
max_review_rounds: int = 2,
) -> tuple[list[tuple[str, str]], str]:
"""Generate code with automatic format-retry and code-review loop.
1. Try to generate code up to *max_format_retries* times until ``### File:``
blocks are found.
2. Ask a Code Reviewer AI to check. If critical issues are found,
regenerate with reviewer feedback injected (up to *max_review_rounds*).
"""
review_feedback: str | None = None
for review_round in range(max_review_rounds + 1):
code_files: list[tuple[str, str]] = []
raw_output: str = ""
for fmt_try in range(1, max_format_retries + 1):
if review_round == 0 and fmt_try == 1:
await send_fn("💻 *Generating code files…*")
elif fmt_try > 1:
await send_fn(
f"🔄 *No `### File:` blocks found — retrying generation "
f"(attempt {fmt_try}/{max_format_retries})…*"
)
code_files, raw_output = await generate_code_files(
task, discussion, final_outcome, feedback=review_feedback
)
if code_files:
break
if not code_files:
return [], raw_output
is_last_round = review_round >= max_review_rounds
await send_fn(f"🔍 *Code Reviewer examining {len(code_files)} file(s)…*")
issues = await review_code_files(task, code_files, final_outcome)
if not issues:
await send_fn("✅ *Code review passed — no critical issues found.*")
return code_files, raw_output
if is_last_round:
await send_fn(
"⚠️ *Code Reviewer found issues but max retries reached — "
"proceeding with best available code:*\n"
f"```\n{issues[:500]}\n```"
)
return code_files, raw_output
await send_fn(
f"⚠️ *Code Reviewer found issues (round {review_round + 1}/{max_review_rounds}) "
"— regenerating with fixes applied:*\n"
f"```\n{issues[:400]}\n```"
)
review_feedback = issues
return code_files, raw_output
# ---------------------------------------------------------------------------
# Hong Kong weather helpers
# ---------------------------------------------------------------------------
async def fetch_hk_weather() -> dict | None:
"""Fetch and parse the current HK weather report from the HKO RSS feed.
Returns a dict with keys: title, description, temperature, humidity, summary.
Returns None on any network or parsing error.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
HKO_RSS_URL,
timeout=aiohttp.ClientTimeout(total=15),
headers={"User-Agent": "Mozilla/5.0"},
) as resp:
resp.raise_for_status()
xml_bytes = await resp.read()
root = ET.fromstring(xml_bytes)
# Standard RSS 2.0: <rss><channel><item>…</item></channel></rss>
item = root.find(".//item")
if item is None:
logger.warning("HKO RSS: no <item> element found")
return None
title = (item.findtext("title") or "").strip()
description_raw = (item.findtext("description") or "").strip()
# Strip HTML tags and decode common HTML entities
description = re.sub(r"<[^>]+>", " ", description_raw)
description = re.sub(r"<", "<", description)
description = re.sub(r">", ">", description)
description = re.sub(r"&", "&", description)
description = re.sub(r" ", " ", description)
description = re.sub(r"&#\d+;", " ", description)