diff --git a/.gitignore b/.gitignore index 713a642..09aca21 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,9 @@ report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json # Finder (MacOS) folder config .DS_Store + +# test results +test-results/ + +# local backup of full telegram provider (pre-strip) +telegram-full-backup/ diff --git a/README.md b/README.md index f0ebbca..c7d51b0 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ Visit **[docs.photon.codes](https://docs.photon.codes)** to view the full docume | Platform | Package | |----------|---------| | iMessage | `spectrum-ts/providers/imessage` | +| Telegram | `spectrum-ts/providers/telegram` | | WhatsApp | `spectrum-ts/providers/whatsapp` | | Terminal | `spectrum-ts/providers/terminal` | | Custom | `definePlatform` from `spectrum-ts` | diff --git a/bun.lock b/bun.lock index d7e16b7..756d235 100644 --- a/bun.lock +++ b/bun.lock @@ -19,7 +19,7 @@ }, "packages/spectrum-ts": { "name": "spectrum-ts", - "version": "1.5.0", + "version": "1.6.0", "dependencies": { "@photon-ai/advanced-imessage": "^0.7.0", "@photon-ai/imessage-kit": "^3.0.0", @@ -28,6 +28,7 @@ "better-grpc": "^0.3.2", "mime-types": "^3.0.1", "open-graph-scraper": "^6.11.0", + "quick-lru": "^7.3.0", "type-fest": "^5.4.1", "vcf": "^2.1.2", "zod": "^4.2.1", @@ -481,6 +482,8 @@ "pump": ["pump@3.0.4", "", { "dependencies": { "end-of-stream": "^1.1.0", "once": "^1.3.1" } }, "sha512-VS7sjc6KR7e1ukRFhQSY5LM2uBWAUPiOPa/A3mkKmiMwSmRFUITt0xuj+/lesgnCv+dPIEYlkzrcyXgquIHMcA=="], + "quick-lru": ["quick-lru@7.3.0", "", {}, "sha512-k9lSsjl36EJdK7I06v7APZCbyGT2vMTsYSRX1Q2nbYmnkBqgUhRkAuzH08Ciotteu/PLJmIF2+tti7o3C/ts2g=="], + "rc": ["rc@1.2.8", "", { "dependencies": { "deep-extend": "^0.6.0", "ini": "~1.3.0", "minimist": "^1.2.0", "strip-json-comments": "~2.0.1" }, "bin": { "rc": "./cli.js" } }, "sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw=="], "readable-stream": ["readable-stream@3.6.2", "", { "dependencies": { "inherits": "^2.0.3", "string_decoder": "^1.1.1", "util-deprecate": "^1.0.1" } }, "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA=="], diff --git a/package.json b/package.json index 00f19fc..2b28176 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,8 @@ "build": "turbo build", "dev": "turbo dev", "check": "ultracite check", - "fix": "ultracite fix" + "fix": "ultracite fix", + "gen:telegram": "bun run packages/spectrum-ts/src/providers/telegram/bot-api-spec/generators/typescript.ts" }, "workspaces": [ "packages/*", diff --git a/packages/spectrum-ts/package.json b/packages/spectrum-ts/package.json index c69c86c..301d784 100644 --- a/packages/spectrum-ts/package.json +++ b/packages/spectrum-ts/package.json @@ -44,7 +44,8 @@ "scripts": { "build": "tsup && cp ../../README.md ../../LICENSE .", "dev": "tsup --watch", - "generate:emoji": "bun scripts/generate-emoji.ts" + "generate:emoji": "bun scripts/generate-emoji.ts", + "gen:telegram": "bun src/providers/telegram/bot-api-spec/generators/typescript.ts" }, "dependencies": { "@photon-ai/advanced-imessage": "^0.7.0", @@ -54,6 +55,7 @@ "better-grpc": "^0.3.2", "mime-types": "^3.0.1", "open-graph-scraper": "^6.11.0", + "quick-lru": "^7.3.0", "type-fest": "^5.4.1", "vcf": "^2.1.2", "zod": "^4.2.1" diff --git a/packages/spectrum-ts/src/providers/telegram/bot-api-spec/README.md b/packages/spectrum-ts/src/providers/telegram/bot-api-spec/README.md new file mode 100644 index 0000000..75d7981 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/bot-api-spec/README.md @@ -0,0 +1,98 @@ +# Telegram Bot API Spec + +JSON schema of the Telegram Bot API subset used by the Telegram provider, plus the generator that emits `providers/telegram/generated/*.ts`. + +## Layout + +```text +packages/spectrum-ts/src/providers/telegram/bot-api-spec/ +├── schema/ +│ └── telegram.json +├── generators/ +│ └── typescript.ts +└── README.md +``` + +## Scope + +The schema covers the Bot API surface that maps onto Spectrum's `Platform`: + +- `send` — `text`, `richlink`, `attachment`, `voice`, `contact`, `poll`, `reply`, `edit`, `reaction`, `typing`, `group` +- `messages` — long-polling stream of inbound `Update`s +- Lifecycle: `getMe`, `getChat`, `getFile` + +Notes on a few non-obvious mappings: + +- `typing` with `state: "stop"` is a no-op. `sendChatAction` is one-shot and the indicator auto-expires after ~5s; there's no cancel endpoint. +- `Update.poll_answer` is diffed against the bot's prior cached vote vector and emitted as per-option `poll_option` events. `sendPoll` always pins `is_anonymous: false` to enable this. `Update.poll` aggregate state is unmapped. +- Outbound `richlink` drops `title` / `summary` / `cover` — Telegram's preview scraper owns that metadata; the Bot API exposes only layout knobs. + +### Provider-local cache + +Bridges the gaps in the Bot API (no fetch-by-id, no album update kind, no chat on `poll_answer`): + +- **Messages cache** — bounded LRU written by every inbound `Update.message` and outbound send. `space.getMessage(id)` reads from it. Reaction events also use it to hydrate `reaction.target`. +- **Poll cache** — maps `poll_id` to the original Spectrum poll + chat snapshot, plus per-voter vote vectors for diffing. +- **Album buffer** — with `coalesceAlbums: true`, debounces members sharing `media_group_id` into a single `group`; off by default. + +In-process, capacity-bounded, no TTL, torn down with the runtime. Defaults: 5000 messages, 500 polls, 5000 vote vectors, 100 concurrent albums, 500ms debounce, 2s ceiling. Set any capacity to `0` to disable that slot. + +Inline queries, callback queries, payments, passports, forum topics, and admin operations are out of scope; surface them via custom events if needed. + +## Schema format + +TypeRef DSL: + +- Primitives: `string`, `integer`, `float`, `boolean`, `any` +- Reference: `Ref:` +- Composition: `Array:`, `Union:||...` + +`InputFile` is a marker; the generator emits `string | Blob`. + +### Type + +```json +"Message": { + "description": "A message.", + "fields": [ + { "name": "message_id", "type": "integer", "required": true }, + { "name": "text", "type": "string", "required": false }, + { "name": "chat", "type": "Ref:Chat", "required": true } + ] +} +``` + +Field options: `required`, `description`, `enum`, `const`. + +### Method + +```json +"sendMessage": { + "description": "Sends a text message.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "text", "type": "string", "required": true } + ], + "returns": "Ref:Message" +} +``` + +## Regeneration + +```bash +bun run gen:telegram +``` + +Rewrites `providers/telegram/generated/*.ts`. Generated files are committed; CI runs `git diff --exit-code` after regen. + +## Adding a method + +1. Add any new type under `types`. +2. Add the method under `methods` with params and return type. +3. `bun run gen:telegram`. +4. Commit schema + generated output together. + +## Version + +Tracking **Bot API 9.6** (April 3, 2026). diff --git a/packages/spectrum-ts/src/providers/telegram/bot-api-spec/generators/typescript.ts b/packages/spectrum-ts/src/providers/telegram/bot-api-spec/generators/typescript.ts new file mode 100644 index 0000000..d00fe68 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/bot-api-spec/generators/typescript.ts @@ -0,0 +1,239 @@ +#!/usr/bin/env bun +import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { resolve } from "node:path"; + +interface SchemaField { + const?: string; + description?: string; + enum?: string[]; + name: string; + required: boolean; + type: string; +} + +interface SchemaType { + description?: string; + fields: SchemaField[]; + primitive?: "input-file"; +} + +interface SchemaMethod { + description?: string; + httpMethod: "GET" | "POST"; + params: SchemaField[]; + returns: string; +} + +interface Schema { + baseUrl: string; + methods: Record; + types: Record; + version: string; +} + +const SCHEMA_PATH = resolve(import.meta.dir, "..", "schema", "telegram.json"); +// `bot-api-spec/` lives under the Telegram provider directory, so the +// generated output sits as a sibling (`../../generated`) of the bot-api-spec +// folder rather than far away in the package tree. +const OUTPUT_DIR = resolve(import.meta.dir, "..", "..", "generated"); + +const FILE_BANNER = `// GENERATED FILE — do not edit by hand. +// Source: providers/telegram/bot-api-spec/schema/telegram.json +// Regenerate with: bun run gen:telegram +`; + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +const loadSchema = async (): Promise => { + const raw = await readFile(SCHEMA_PATH, "utf8"); + const parsed = JSON.parse(raw) as unknown; + // Fail fast with a clear message if the schema file is malformed rather + // than letting downstream generators throw confusing property-access errors. + if ( + !(isRecord(parsed) && isRecord(parsed.types) && isRecord(parsed.methods)) + ) { + throw new Error( + `Invalid schema at ${SCHEMA_PATH}: expected object with "types" and "methods" object maps` + ); + } + return parsed as unknown as Schema; +}; + +// Translate the schema's TypeRef DSL into a TypeScript type expression. +// Supported forms: +// string | integer | boolean | float | any +// Ref: +// Array: +// Union:||... (members may be primitives or Ref:X) +const toTs = (type: string): string => { + if (type === "string") { + return "string"; + } + if (type === "integer" || type === "float") { + return "number"; + } + if (type === "boolean") { + return "boolean"; + } + if (type === "any") { + return "unknown"; + } + // `InputFile` is documented in README as a top-level TypeRef primitive so + // bare occurrences (e.g. `"type": "InputFile"` or `"Union:InputFile|string"`) + // resolve without requiring the `Ref:` prefix. + if (type === "InputFile") { + return "InputFile"; + } + if (type.startsWith("Ref:")) { + const name = type.slice("Ref:".length); + if (name === "InputFile") { + return "InputFile"; + } + return name; + } + if (type.startsWith("Array:")) { + const inner = type.slice("Array:".length); + return `Array<${toTs(inner)}>`; + } + if (type.startsWith("Union:")) { + const rest = type.slice("Union:".length); + return rest.split("|").map(toTs).join(" | "); + } + throw new Error(`Unknown type expression: ${type}`); +}; + +const renderFieldType = (field: SchemaField): string => { + if (field.const !== undefined) { + return JSON.stringify(field.const); + } + if (field.enum && field.enum.length > 0) { + return field.enum.map((value) => JSON.stringify(value)).join(" | "); + } + return toTs(field.type); +}; + +const renderTypes = (schema: Schema): string => { + const lines: string[] = []; + lines.push(FILE_BANNER); + lines.push(""); + lines.push("export type InputFile = string | Blob;"); + lines.push(""); + + for (const [name, def] of Object.entries(schema.types)) { + if (def.primitive === "input-file") { + continue; + } + if (def.description) { + lines.push(`/** ${def.description} */`); + } + lines.push(`export interface ${name} {`); + for (const field of def.fields) { + if (field.description) { + lines.push(` /** ${field.description} */`); + } + const optional = field.required ? "" : "?"; + lines.push(` ${field.name}${optional}: ${renderFieldType(field)};`); + } + lines.push("}"); + lines.push(""); + } + + return lines.join("\n"); +}; + +// Build the TS interface name for a method's params (e.g. `sendMessage` → +// `SendMessageParams`). The schema is hand-written, but we still fail loudly +// on an empty key so a malformed entry can't silently produce an unusable +// `undefinedParams` interface that compiles but is broken at every call site. +const paramsInterfaceName = (methodName: string): string => { + if (!methodName) { + throw new Error( + "Telegram schema contains a method with an empty name; refusing to generate" + ); + } + return `${methodName[0]?.toUpperCase()}${methodName.slice(1)}Params`; +}; + +const renderMethods = (schema: Schema): string => { + const lines: string[] = []; + lines.push(FILE_BANNER); + lines.push(""); + lines.push("import type {"); + const typeImports = Object.keys(schema.types) + .filter((name) => schema.types[name]?.primitive !== "input-file") + .sort(); + for (const typeName of typeImports) { + lines.push(` ${typeName},`); + } + lines.push(" InputFile,"); + lines.push('} from "./types";'); + lines.push(""); + + // Emit one params interface per method. + for (const [methodName, def] of Object.entries(schema.methods)) { + const interfaceName = paramsInterfaceName(methodName); + if (def.description) { + lines.push(`/** ${def.description} */`); + } + if (def.params.length === 0) { + lines.push(`export type ${interfaceName} = Record;`); + } else { + lines.push(`export interface ${interfaceName} {`); + for (const field of def.params) { + if (field.description) { + lines.push(` /** ${field.description} */`); + } + const optional = field.required ? "" : "?"; + lines.push(` ${field.name}${optional}: ${renderFieldType(field)};`); + } + lines.push("}"); + } + lines.push(""); + } + + // Emit the Methods map: method name -> { params, result }. + lines.push( + "/** Bot API method map. Used by the runtime client for type-safe invoke(). */" + ); + lines.push("export interface Methods {"); + for (const [methodName, def] of Object.entries(schema.methods)) { + const paramsName = paramsInterfaceName(methodName); + const returnType = toTs(def.returns); + lines.push(` ${methodName}: {`); + lines.push(` params: ${paramsName};`); + lines.push(` result: ${returnType};`); + lines.push(" };"); + } + lines.push("}"); + lines.push(""); + + lines.push("export type MethodName = keyof Methods;"); + lines.push(""); + lines.push(`export const BASE_URL = ${JSON.stringify(schema.baseUrl)};`); + lines.push(`export const API_VERSION = ${JSON.stringify(schema.version)};`); + lines.push(""); + + return lines.join("\n"); +}; + +const main = async (): Promise => { + const schema = await loadSchema(); + await mkdir(OUTPUT_DIR, { recursive: true }); + + const typesPath = resolve(OUTPUT_DIR, "types.ts"); + const methodsPath = resolve(OUTPUT_DIR, "methods.ts"); + + await writeFile(typesPath, renderTypes(schema)); + await writeFile(methodsPath, renderMethods(schema)); + + console.log(`Wrote ${typesPath}`); + console.log(`Wrote ${methodsPath}`); +}; + +if (import.meta.main) { + main().catch((err) => { + console.error(err); + process.exit(1); + }); +} diff --git a/packages/spectrum-ts/src/providers/telegram/bot-api-spec/schema/telegram.json b/packages/spectrum-ts/src/providers/telegram/bot-api-spec/schema/telegram.json new file mode 100644 index 0000000..408c70c --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/bot-api-spec/schema/telegram.json @@ -0,0 +1,604 @@ +{ + "version": "9.6", + "baseUrl": "https://api.telegram.org", + "types": { + "Update": { + "description": "Incoming update. At most one of the optional parameters can be present in any given update.", + "fields": [ + { "name": "update_id", "type": "integer", "required": true }, + { "name": "message", "type": "Ref:Message", "required": false }, + { "name": "edited_message", "type": "Ref:Message", "required": false }, + { "name": "channel_post", "type": "Ref:Message", "required": false }, + { + "name": "edited_channel_post", + "type": "Ref:Message", + "required": false + }, + { + "name": "message_reaction", + "type": "Ref:MessageReactionUpdated", + "required": false + }, + { + "name": "poll", + "type": "Ref:Poll", + "required": false, + "description": "New aggregate poll state. Bots receive only updates about stopped polls and polls they sent." + }, + { + "name": "poll_answer", + "type": "Ref:PollAnswer", + "required": false, + "description": "User changed their vote in a non-anonymous poll. Anonymous polls do NOT emit poll_answer." + } + ] + }, + "User": { + "description": "Telegram user or bot.", + "fields": [ + { "name": "id", "type": "integer", "required": true }, + { "name": "is_bot", "type": "boolean", "required": true }, + { "name": "first_name", "type": "string", "required": true }, + { "name": "last_name", "type": "string", "required": false }, + { "name": "username", "type": "string", "required": false }, + { "name": "language_code", "type": "string", "required": false } + ] + }, + "Chat": { + "description": "Chat (private, group, supergroup, or channel).", + "fields": [ + { "name": "id", "type": "integer", "required": true }, + { + "name": "type", + "type": "string", + "required": true, + "enum": ["private", "group", "supergroup", "channel"] + }, + { "name": "title", "type": "string", "required": false }, + { "name": "username", "type": "string", "required": false }, + { + "name": "first_name", + "type": "string", + "required": false, + "description": "Present for private chats; readable name of the user." + }, + { + "name": "last_name", + "type": "string", + "required": false, + "description": "Present for private chats when the user supplied a last name." + } + ] + }, + "Message": { + "description": "A message.", + "fields": [ + { "name": "message_id", "type": "integer", "required": true }, + { + "name": "date", + "type": "integer", + "required": true, + "description": "Unix time" + }, + { "name": "chat", "type": "Ref:Chat", "required": true }, + { "name": "from", "type": "Ref:User", "required": false }, + { + "name": "sender_chat", + "type": "Ref:Chat", + "required": false, + "description": "Sender of the message, sent on behalf of a chat (channel posts, anonymous group admins)." + }, + { "name": "text", "type": "string", "required": false }, + { "name": "caption", "type": "string", "required": false }, + { + "name": "entities", + "type": "Array:Ref:MessageEntity", + "required": false + }, + { + "name": "link_preview_options", + "type": "Ref:LinkPreviewOptions", + "required": false + }, + { "name": "photo", "type": "Array:Ref:PhotoSize", "required": false }, + { "name": "document", "type": "Ref:Document", "required": false }, + { "name": "audio", "type": "Ref:Audio", "required": false }, + { "name": "video", "type": "Ref:Video", "required": false }, + { "name": "voice", "type": "Ref:Voice", "required": false }, + { "name": "contact", "type": "Ref:Contact", "required": false }, + { "name": "poll", "type": "Ref:Poll", "required": false }, + { + "name": "media_group_id", + "type": "string", + "required": false, + "description": "Set on each member of an album. All messages sent together share the same id; absent on standalone messages." + } + ] + }, + "MessageEntity": { + "description": "A special entity in a text message (URL, mention, hashtag, etc.).", + "fields": [ + { "name": "type", "type": "string", "required": true }, + { "name": "offset", "type": "integer", "required": true }, + { "name": "length", "type": "integer", "required": true }, + { + "name": "url", + "type": "string", + "required": false, + "description": "For text_link only: the URL to open" + } + ] + }, + "LinkPreviewOptions": { + "description": "Describes link-preview generation options for a message.", + "fields": [ + { "name": "is_disabled", "type": "boolean", "required": false }, + { "name": "url", "type": "string", "required": false }, + { "name": "prefer_small_media", "type": "boolean", "required": false }, + { "name": "prefer_large_media", "type": "boolean", "required": false }, + { "name": "show_above_text", "type": "boolean", "required": false } + ] + }, + "PhotoSize": { + "description": "One size of a photo.", + "fields": [ + { "name": "file_id", "type": "string", "required": true }, + { "name": "width", "type": "integer", "required": true }, + { "name": "height", "type": "integer", "required": true }, + { "name": "file_size", "type": "integer", "required": false } + ] + }, + "Document": { + "description": "General file (as opposed to photo/voice/video/audio).", + "fields": [ + { "name": "file_id", "type": "string", "required": true }, + { "name": "file_name", "type": "string", "required": false }, + { "name": "mime_type", "type": "string", "required": false }, + { "name": "file_size", "type": "integer", "required": false } + ] + }, + "Audio": { + "description": "Audio file to be treated as music.", + "fields": [ + { "name": "file_id", "type": "string", "required": true }, + { "name": "file_name", "type": "string", "required": false }, + { "name": "mime_type", "type": "string", "required": false }, + { "name": "file_size", "type": "integer", "required": false } + ] + }, + "Video": { + "description": "Video file.", + "fields": [ + { "name": "file_id", "type": "string", "required": true }, + { "name": "file_name", "type": "string", "required": false }, + { "name": "mime_type", "type": "string", "required": false }, + { "name": "file_size", "type": "integer", "required": false } + ] + }, + "Voice": { + "description": "Voice note.", + "fields": [ + { "name": "file_id", "type": "string", "required": true }, + { "name": "duration", "type": "integer", "required": true }, + { "name": "mime_type", "type": "string", "required": false }, + { "name": "file_size", "type": "integer", "required": false } + ] + }, + "Contact": { + "description": "Phone contact.", + "fields": [ + { "name": "phone_number", "type": "string", "required": true }, + { "name": "first_name", "type": "string", "required": true }, + { "name": "last_name", "type": "string", "required": false }, + { "name": "user_id", "type": "integer", "required": false }, + { "name": "vcard", "type": "string", "required": false } + ] + }, + "ReactionType": { + "description": "Reaction type. Flat union shape: discriminate at runtime via `type`.", + "fields": [ + { + "name": "type", + "type": "string", + "required": true, + "enum": ["emoji", "custom_emoji", "paid"] + }, + { + "name": "emoji", + "type": "string", + "required": false, + "description": "Set when type === \"emoji\"." + }, + { + "name": "custom_emoji_id", + "type": "string", + "required": false, + "description": "Set when type === \"custom_emoji\"." + } + ] + }, + "MessageReactionUpdated": { + "description": "A reaction on a message changed by a user.", + "fields": [ + { "name": "chat", "type": "Ref:Chat", "required": true }, + { "name": "message_id", "type": "integer", "required": true }, + { + "name": "date", + "type": "integer", + "required": true, + "description": "Unix time of the change" + }, + { "name": "user", "type": "Ref:User", "required": false }, + { "name": "actor_chat", "type": "Ref:Chat", "required": false }, + { + "name": "old_reaction", + "type": "Array:Ref:ReactionType", + "required": true + }, + { + "name": "new_reaction", + "type": "Array:Ref:ReactionType", + "required": true + } + ] + }, + "PollOption": { + "description": "Option in a Telegram poll.", + "fields": [ + { "name": "text", "type": "string", "required": true }, + { "name": "voter_count", "type": "integer", "required": true } + ] + }, + "InputPollOption": { + "description": "Outbound option used by `sendPoll`.", + "fields": [{ "name": "text", "type": "string", "required": true }] + }, + "Poll": { + "description": "A native Telegram poll (regular or quiz). Spectrum exposes only regular polls; quiz/explanation/period fields are passed through but not modelled in the universal Content.", + "fields": [ + { "name": "id", "type": "string", "required": true }, + { "name": "question", "type": "string", "required": true }, + { + "name": "options", + "type": "Array:Ref:PollOption", + "required": true + }, + { "name": "total_voter_count", "type": "integer", "required": true }, + { "name": "is_closed", "type": "boolean", "required": true }, + { "name": "is_anonymous", "type": "boolean", "required": true }, + { + "name": "type", + "type": "string", + "required": true, + "enum": ["regular", "quiz"] + }, + { + "name": "allows_multiple_answers", + "type": "boolean", + "required": true + }, + { + "name": "correct_option_ids", + "type": "Array:integer", + "required": false, + "description": "Bot API 9.6+: 0-based identifiers of the correct answer options. Quiz polls only; available when the poll is closed or sent (not forwarded) by the bot. Replaces the pre-9.6 singular `correct_option_id`." + }, + { + "name": "allows_revoting", + "type": "boolean", + "required": false, + "description": "Bot API 9.6+: True if the poll allows changing the chosen answer after voting." + } + ] + }, + "PollAnswer": { + "description": "Answer of a user in a non-anonymous poll. Spectrum keys vote-state caches by `poll_id`; chat/message_id are not provided by Telegram in this update so the poll cache must be populated when the poll is created or first observed.", + "fields": [ + { "name": "poll_id", "type": "string", "required": true }, + { + "name": "voter_chat", + "type": "Ref:Chat", + "required": false, + "description": "Set when the vote came from an anonymous channel admin acting on behalf of the channel." + }, + { + "name": "user", + "type": "Ref:User", + "required": false, + "description": "Set when the vote came from a regular user. Either `user` or `voter_chat` is present." + }, + { + "name": "option_ids", + "type": "Array:integer", + "required": true, + "description": "Zero-based indices into the original poll options. Empty array means the user retracted all of their votes." + } + ] + }, + "ReplyParameters": { + "description": "Reply parameters for outgoing messages.", + "fields": [{ "name": "message_id", "type": "integer", "required": true }] + }, + "ResponseParameters": { + "description": "Error response parameters.", + "fields": [ + { "name": "retry_after", "type": "integer", "required": false }, + { "name": "migrate_to_chat_id", "type": "integer", "required": false } + ] + }, + "File": { + "description": "Downloadable file descriptor.", + "fields": [ + { "name": "file_id", "type": "string", "required": true }, + { "name": "file_size", "type": "integer", "required": false }, + { "name": "file_path", "type": "string", "required": false } + ] + }, + "InputFile": { + "description": "File to upload. Sent as either a file_id/URL string or a multipart attachment.", + "fields": [], + "primitive": "input-file" + } + }, + "methods": { + "getMe": { + "description": "Returns basic info about the bot.", + "httpMethod": "POST", + "params": [], + "returns": "Ref:User" + }, + "getUpdates": { + "description": "Receives incoming updates using long polling.", + "httpMethod": "POST", + "params": [ + { "name": "offset", "type": "integer", "required": false }, + { + "name": "timeout", + "type": "integer", + "required": false, + "description": "Long-polling timeout in seconds" + }, + { + "name": "allowed_updates", + "type": "Array:string", + "required": false + } + ], + "returns": "Array:Ref:Update" + }, + "sendMessage": { + "description": "Sends a text message.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "text", "type": "string", "required": true }, + { + "name": "link_preview_options", + "type": "Ref:LinkPreviewOptions", + "required": false + }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "sendPhoto": { + "description": "Sends a photo.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "photo", "type": "Ref:InputFile", "required": true }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "sendDocument": { + "description": "Sends a general file.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "document", "type": "Ref:InputFile", "required": true }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "sendAudio": { + "description": "Sends an audio file shown as music.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "audio", "type": "Ref:InputFile", "required": true }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "sendVideo": { + "description": "Sends a video.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "video", "type": "Ref:InputFile", "required": true }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "sendVoice": { + "description": "Sends an audio file to be displayed as a voice note.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "voice", "type": "Ref:InputFile", "required": true }, + { "name": "duration", "type": "integer", "required": false }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "sendContact": { + "description": "Sends a phone contact.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "phone_number", "type": "string", "required": true }, + { "name": "first_name", "type": "string", "required": true }, + { "name": "last_name", "type": "string", "required": false }, + { "name": "vcard", "type": "string", "required": false }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "sendPoll": { + "description": "Sends a native Telegram poll. Spectrum sends regular (non-quiz) polls with `is_anonymous: false` so per-user vote events surface as poll_option content. Private 1:1 chats, groups, supergroups, and channels are all supported; the only chat type Telegram rejects is channel direct-message chats (the per-user threads on top of broadcast channels), which return BAD_REQUEST.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "question", "type": "string", "required": true }, + { + "name": "options", + "type": "Array:Ref:InputPollOption", + "required": true + }, + { + "name": "is_anonymous", + "type": "boolean", + "required": false, + "description": "Defaults to true on the Telegram side. Spectrum forces this to `false` on outbound sends so vote events are attributed." + }, + { + "name": "type", + "type": "string", + "required": false, + "enum": ["regular"], + "description": "Spectrum only sends regular polls. Quiz mode is intentionally out of scope: it requires `correct_option_ids`, and Spectrum's universal `poll` content has no notion of quiz correct-answer metadata. Callers needing quizzes should use the raw client directly." + }, + { + "name": "allows_multiple_answers", + "type": "boolean", + "required": false + }, + { + "name": "allows_revoting", + "type": "boolean", + "required": false, + "description": "Bot API 9.6+: pass true to allow voters to change their selection after voting." + }, + { + "name": "shuffle_options", + "type": "boolean", + "required": false, + "description": "Bot API 9.6+: pass true to randomize the order of options when displaying the poll." + }, + { + "name": "allow_adding_options", + "type": "boolean", + "required": false, + "description": "Bot API 9.6+: pass true to let users add options after the poll has been created." + }, + { + "name": "hide_results_until_closes", + "type": "boolean", + "required": false, + "description": "Bot API 9.6+: pass true to hide aggregate results until the poll is closed." + }, + { + "name": "reply_parameters", + "type": "Ref:ReplyParameters", + "required": false + } + ], + "returns": "Ref:Message" + }, + "stopPoll": { + "description": "Stops a poll the bot started. Returns the final Poll state. Not exposed through Spectrum's universal API; available via raw client for callers who need it.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "message_id", "type": "integer", "required": true } + ], + "returns": "Ref:Poll" + }, + "editMessageText": { + "description": "Edits a text message.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "message_id", "type": "integer", "required": true }, + { "name": "text", "type": "string", "required": true }, + { + "name": "link_preview_options", + "type": "Ref:LinkPreviewOptions", + "required": false + } + ], + "returns": "Union:Ref:Message|boolean" + }, + "setMessageReaction": { + "description": "Sets reactions on a message.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { "name": "message_id", "type": "integer", "required": true }, + { + "name": "reaction", + "type": "Array:Ref:ReactionType", + "required": false + } + ], + "returns": "boolean" + }, + "sendChatAction": { + "description": "Broadcasts a chat action.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true }, + { + "name": "action", + "type": "string", + "required": true, + "const": "typing" + } + ], + "returns": "boolean" + }, + "getChat": { + "description": "Gets a chat by ID.", + "httpMethod": "POST", + "params": [ + { "name": "chat_id", "type": "Union:integer|string", "required": true } + ], + "returns": "Ref:Chat" + }, + "getFile": { + "description": "Gets basic info about a file. Compose with baseUrl + /file/bot/ to download.", + "httpMethod": "POST", + "params": [{ "name": "file_id", "type": "string", "required": true }], + "returns": "Ref:File" + } + } +} diff --git a/packages/spectrum-ts/src/providers/telegram/events/inbound.ts b/packages/spectrum-ts/src/providers/telegram/events/inbound.ts new file mode 100644 index 0000000..d1d115c --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/events/inbound.ts @@ -0,0 +1,345 @@ +import { asAttachment } from "../../../content/attachment"; +import { asContact } from "../../../content/contact"; +import { asGroup } from "../../../content/group"; +import { asPoll } from "../../../content/poll"; +import { asRichlink } from "../../../content/richlink"; +import { asText } from "../../../content/text"; +import type { Content } from "../../../content/types"; +import { asVoice } from "../../../content/voice"; +import { fromVCard } from "../../../utils/vcard"; +import type { + Audio, + Document, + LinkPreviewOptions, + Message, + MessageEntity, + PhotoSize, + Contact as TgContact, + Poll as TgPoll, + Voice as TgVoice, + Video, +} from "../generated/types"; + +import { chatToSender, chatToSpace, userToSender } from "../identity"; +import { toGroupItems } from "../messages"; +import type { TelegramClient } from "../runtime/client"; +import type { TelegramMessage } from "../types"; + +// `getFile` URLs are valid for ~1h; resolve lazily on each read so +// messages stay readable after the event stream tears down. +const fetchFileBytes = async ( + client: TelegramClient, + fileId: string +): Promise => { + const file = await client.invoke("getFile", { file_id: fileId }); + if (!file.file_path) { + throw new Error( + `Telegram getFile returned no file_path for file_id=${fileId}` + ); + } + const response = await client.downloadFile(file.file_path); + if (!response.ok) { + throw new Error( + `Telegram file download for file_id=${fileId} returned HTTP ${response.status}` + ); + } + return response; +}; + +const attachmentFromFile = ( + client: TelegramClient, + fileId: string, + name: string, + mimeType: string, + size?: number +): Content => + asAttachment({ + name, + mimeType, + ...(size === undefined ? {} : { size }), + read: async () => + Buffer.from(await (await fetchFileBytes(client, fileId)).arrayBuffer()), + stream: async () => { + const response = await fetchFileBytes(client, fileId); + if (!response.body) { + throw new Error( + `Telegram file response has no body (file_id=${fileId})` + ); + } + return response.body; + }, + }); + +const voiceFromFile = (client: TelegramClient, voice: TgVoice): Content => { + const mimeType = voice.mime_type ?? "audio/ogg"; + return asVoice({ + mimeType, + ...(voice.duration === undefined ? {} : { duration: voice.duration }), + ...(voice.file_size === undefined ? {} : { size: voice.file_size }), + read: async () => + Buffer.from( + await (await fetchFileBytes(client, voice.file_id)).arrayBuffer() + ), + stream: async () => { + const response = await fetchFileBytes(client, voice.file_id); + if (!response.body) { + throw new Error( + `Telegram voice file has no body (file_id=${voice.file_id})` + ); + } + return response.body; + }, + }); +}; + +const largestPhoto = (photos: PhotoSize[]): PhotoSize | undefined => { + let best: PhotoSize | undefined; + for (const photo of photos) { + const area = photo.width * photo.height; + if (!best || area > best.width * best.height) { + best = photo; + } + } + return best; +}; + +const photoName = (photo: PhotoSize): string => `photo-${photo.file_id}.jpg`; +const documentName = (doc: Document): string => + doc.file_name ?? `document-${doc.file_id}`; +const audioName = (audio: Audio): string => + audio.file_name ?? `audio-${audio.file_id}`; +const videoName = (video: Video): string => + video.file_name ?? `video-${video.file_id}.mp4`; + +const parseVCardSafe = ( + vcard: string +): Parameters[0] | undefined => { + try { + return fromVCard(vcard); + } catch { + return; + } +}; + +const contactToContent = (contact: TgContact): Content => { + const formatted = [contact.first_name, contact.last_name] + .filter((p): p is string => Boolean(p)) + .join(" "); + const fromCard = + contact.vcard === undefined ? undefined : parseVCardSafe(contact.vcard); + const input: Parameters[0] = { + ...(fromCard ?? {}), + raw: contact, + name: { + ...(fromCard?.name ?? {}), + formatted: formatted || contact.first_name, + first: contact.first_name, + }, + phones: fromCard?.phones?.length + ? fromCard.phones + : [{ value: contact.phone_number }], + }; + if (contact.last_name !== undefined && input.name) { + input.name.last = contact.last_name; + } + return asContact(input); +}; + +// Richlink only when the message body is effectively bare: either the +// trimmed text equals the preview URL, or the text is exactly one +// `url`/`text_link` entity spanning the whole string. Mixed prose + +// preview keeps the text content so callers don't lose context. +const extractRichlinkUrl = ( + text: string, + entities: MessageEntity[] | undefined, + linkPreview: LinkPreviewOptions | undefined +): string | undefined => { + if (linkPreview?.is_disabled === true) { + return; + } + if (linkPreview?.url && text.trim() === linkPreview.url) { + return linkPreview.url; + } + if (!entities || entities.length !== 1) { + return; + } + const [entity] = entities; + if (!entity) { + return; + } + const covers = entity.offset === 0 && entity.length === text.length; + if (!covers) { + return; + } + if (entity.type === "text_link") { + return entity.url; + } + if (entity.type === "url") { + return text; + } + return; +}; + +const richlinkFromMessage = (msg: Message): Content | undefined => { + if (msg.text === undefined) { + return; + } + const url = extractRichlinkUrl( + msg.text, + msg.entities, + msg.link_preview_options + ); + if (url === undefined) { + return; + } + try { + return asRichlink({ url }); + } catch { + return; + } +}; + +const pollFromTelegramPoll = (poll: TgPoll): Content | undefined => { + try { + return asPoll({ + title: poll.question, + options: poll.options.map((opt) => ({ title: opt.text })), + }); + } catch { + return; + } +}; + +// Media + caption → media content with caption as a `TelegramMessage` +// extra; caption-only → text content. +const messageToContent = ( + client: TelegramClient, + msg: Message +): Content | undefined => { + if (msg.text !== undefined) { + const richlink = richlinkFromMessage(msg); + if (richlink) { + return richlink; + } + return asText(msg.text); + } + if (msg.voice) { + return voiceFromFile(client, msg.voice); + } + if (msg.photo && msg.photo.length > 0) { + const largest = largestPhoto(msg.photo); + if (largest) { + return attachmentFromFile( + client, + largest.file_id, + photoName(largest), + "image/jpeg", + largest.file_size + ); + } + } + if (msg.document) { + return attachmentFromFile( + client, + msg.document.file_id, + documentName(msg.document), + msg.document.mime_type ?? "application/octet-stream", + msg.document.file_size + ); + } + if (msg.audio) { + return attachmentFromFile( + client, + msg.audio.file_id, + audioName(msg.audio), + msg.audio.mime_type ?? "audio/mpeg", + msg.audio.file_size + ); + } + if (msg.video) { + return attachmentFromFile( + client, + msg.video.file_id, + videoName(msg.video), + msg.video.mime_type ?? "video/mp4", + msg.video.file_size + ); + } + if (msg.contact) { + return contactToContent(msg.contact); + } + if (msg.poll) { + return pollFromTelegramPoll(msg.poll); + } + if (msg.caption !== undefined) { + return asText(msg.caption); + } + return; +}; + +export const toTelegramMessage = ( + client: TelegramClient, + msg: Message +): TelegramMessage | undefined => { + let sender: TelegramMessage["sender"] | undefined; + if (msg.from) { + sender = userToSender(msg.from); + } else if (msg.sender_chat) { + sender = chatToSender(msg.sender_chat); + } + if (!sender) { + return; + } + const content = messageToContent(client, msg); + if (!content) { + return; + } + const built: TelegramMessage = { + id: String(msg.message_id), + content, + sender, + space: chatToSpace(msg.chat), + timestamp: new Date(msg.date * 1000), + }; + if (msg.media_group_id !== undefined) { + built.mediaGroupId = msg.media_group_id; + } + if (msg.caption !== undefined && content.type !== "text") { + built.caption = msg.caption; + } + return built; +}; + +// Wrapper id = lead member's `message_id` so reply/edit/setMessageReaction +// (all numeric-id-only) keep working on the coalesced message. +export const coalesceAlbumGroup = ( + members: TelegramMessage[] +): TelegramMessage | undefined => { + if (members.length === 0) { + return; + } + // `asGroup` requires >= 2 items; ceiling-flush race can leave us with 1. + if (members.length === 1) { + return members[0]; + } + const sorted = [...members].sort((a, b) => Number(a.id) - Number(b.id)); + const head = sorted[0]; + if (!head?.mediaGroupId) { + return; + } + const items = toGroupItems(sorted); + const wrapper: TelegramMessage = { + id: head.id, + content: asGroup({ items }), + sender: head.sender, + space: head.space, + ...(head.timestamp ? { timestamp: head.timestamp } : {}), + mediaGroupId: head.mediaGroupId, + }; + // Telegram attaches the album caption to exactly one member; promote it. + const captioned = sorted.find((m) => m.caption !== undefined); + if (captioned?.caption !== undefined) { + wrapper.caption = captioned.caption; + } + return wrapper; +}; diff --git a/packages/spectrum-ts/src/providers/telegram/events/index.ts b/packages/spectrum-ts/src/providers/telegram/events/index.ts new file mode 100644 index 0000000..01068c6 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/events/index.ts @@ -0,0 +1,160 @@ +import { type ManagedStream, stream } from "../../../utils/stream"; +import type { Update } from "../generated/types"; +import { AlbumBuffer, messageCacheKey } from "../runtime/cache"; +import { pollUpdates } from "../runtime/polling"; +import type { TelegramMessage, TelegramRuntime } from "../types"; +import { coalesceAlbumGroup, toTelegramMessage } from "./inbound"; +import { pollAnswerEvents } from "./polls"; +import { reactionEventsFromUpdate } from "./reactions"; + +const pickMessage = (update: Update) => + update.message ?? + update.edited_message ?? + update.channel_post ?? + update.edited_channel_post; + +interface BuildOutput { + cacheable: boolean; + messages: TelegramMessage[]; +} + +const buildMessages = ( + runtime: TelegramRuntime, + update: Update +): BuildOutput => { + if (update.message_reaction) { + return { + messages: reactionEventsFromUpdate( + update.message_reaction, + update.update_id, + runtime.cache + ), + cacheable: false, + }; + } + if (update.poll) { + // Internal cache sync only — no Spectrum event surfaced. Keeps the + // cached poll's option list aligned with `allow_adding_options` polls. + runtime.cache.polls.refreshPollOptions( + update.poll.id, + update.poll.options.map((o) => ({ title: o.text })) + ); + return { messages: [], cacheable: false }; + } + if (update.poll_answer) { + return { + messages: pollAnswerEvents(update.poll_answer, runtime.cache, update), + cacheable: false, + }; + } + const tgMessage = pickMessage(update); + if (!tgMessage) { + return { messages: [], cacheable: false }; + } + const message = toTelegramMessage(runtime.client, tgMessage); + return { + messages: message ? [message] : [], + cacheable: true, + }; +}; + +export interface MessagesOptions { + allowedUpdates?: string[]; + dropPendingUpdates?: boolean; + timeout?: number; +} + +export const messages = ( + runtime: TelegramRuntime, + signal: AbortSignal, + options: MessagesOptions = {} +): ManagedStream => + stream((emit, end) => { + const abortController = new AbortController(); + const onSignalAbort = () => abortController.abort(signal.reason); + if (signal.aborted) { + abortController.abort(signal.reason); + } else { + signal.addEventListener("abort", onSignalAbort, { once: true }); + } + + // Per-stream because the flush callback closes over `emit`. + const albumBuffer = runtime.cache.coalesceAlbums + ? new AlbumBuffer({ + ...runtime.cache.albumOptions, + flush: async (members) => { + const grouped = coalesceAlbumGroup(members); + if (grouped) { + // Each album member was cached individually as it streamed in; + // overwrite every member's slot with the coalesced wrapper so + // later lookups (reactions, edits) on any child id resolve to + // the group instead of a stale standalone record. + for (const member of members) { + runtime.cache.messages.set( + messageCacheKey(grouped.space.id, member.id), + grouped + ); + } + await emit(grouped); + } + }, + }) + : undefined; + + const drainAlbumsSafely = async (): Promise => { + if (!albumBuffer) { + return; + } + try { + await albumBuffer.flushAll(); + } catch { + // logged inside flushAll() + } + }; + + const pump = (async () => { + try { + for await (const update of pollUpdates( + runtime.client, + abortController.signal, + options + )) { + const built = buildMessages(runtime, update); + for (const message of built.messages) { + if (built.cacheable) { + runtime.cache.messages.set( + messageCacheKey(message.space.id, message.id), + message + ); + } + if ( + albumBuffer && + built.cacheable && + message.mediaGroupId !== undefined + ) { + albumBuffer.push(message.mediaGroupId, message); + continue; + } + await emit(message); + } + } + await drainAlbumsSafely(); + end(); + } catch (err) { + // Drain inside the pump so consumers always see a final `end()`. + if (abortController.signal.aborted) { + await drainAlbumsSafely(); + end(); + return; + } + await drainAlbumsSafely(); + end(err); + } + })(); + + return async () => { + signal.removeEventListener("abort", onSignalAbort); + abortController.abort(); + await pump; + }; + }); diff --git a/packages/spectrum-ts/src/providers/telegram/events/polls.ts b/packages/spectrum-ts/src/providers/telegram/events/polls.ts new file mode 100644 index 0000000..d0d6062 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/events/polls.ts @@ -0,0 +1,98 @@ +import { asPollOption } from "../../../content/poll"; +import type { PollAnswer, Update } from "../generated/types"; +import { chatToSender, userToSender } from "../identity"; +import type { CachedPoll, TelegramCache } from "../runtime/cache"; +import type { TelegramMessage } from "../types"; + +// `poll_answer` ships the full post-vote vector; Spectrum wants per-option +// selected/deselected diffs, so we cache each voter's prior vector. +const computeDiff = ( + prior: readonly number[], + next: readonly number[] +): { added: number[]; removed: number[] } => { + const priorSet = new Set(prior); + const nextSet = new Set(next); + const added = next.filter((id) => !priorSet.has(id)); + const removed = prior.filter((id) => !nextSet.has(id)); + return { added, removed }; +}; + +const buildPollOptionEvent = ( + cached: CachedPoll, + optionIndex: number, + selected: boolean, + context: { + eventId: string; + sender: ReturnType; + space: CachedPoll["chat"]; + timestamp: Date; + } +): TelegramMessage | undefined => { + const option = cached.poll.options[optionIndex]; + if (!option) { + return; + } + return { + id: context.eventId, + content: asPollOption({ poll: cached.poll, option, selected }), + sender: context.sender, + space: context.space, + timestamp: context.timestamp, + }; +}; + +export const pollAnswerEvents = ( + answer: PollAnswer, + cache: TelegramCache, + update: Update +): TelegramMessage[] => { + // `user` (regular voter) or `voter_chat` (anonymous channel admin) is + // populated. Chat ids are negative so they can't collide with user ids. + let sender: ReturnType; + let voterId: number; + if (answer.user) { + sender = userToSender(answer.user); + voterId = answer.user.id; + } else if (answer.voter_chat) { + sender = chatToSender(answer.voter_chat); + voterId = answer.voter_chat.id; + } else { + return []; + } + const cached = cache.polls.resolvePoll(answer.poll_id); + if (!cached) { + return []; + } + const prior = cache.polls.priorVote(answer.poll_id, voterId); + const { added, removed } = computeDiff(prior, answer.option_ids); + cache.polls.recordVote(answer.poll_id, voterId, answer.option_ids); + + // `poll_answer` carries no chat or timestamp. + const space = cached.chat; + const timestamp = new Date(); + + const events: TelegramMessage[] = []; + for (const optionIndex of added) { + const event = buildPollOptionEvent(cached, optionIndex, true, { + eventId: `poll_answer:${update.update_id}:add:${optionIndex}`, + sender, + space, + timestamp, + }); + if (event) { + events.push(event); + } + } + for (const optionIndex of removed) { + const event = buildPollOptionEvent(cached, optionIndex, false, { + eventId: `poll_answer:${update.update_id}:remove:${optionIndex}`, + sender, + space, + timestamp, + }); + if (event) { + events.push(event); + } + } + return events; +}; diff --git a/packages/spectrum-ts/src/providers/telegram/events/reactions.ts b/packages/spectrum-ts/src/providers/telegram/events/reactions.ts new file mode 100644 index 0000000..b4fd5f4 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/events/reactions.ts @@ -0,0 +1,93 @@ +import { asCustom } from "../../../content/custom"; +import { asReaction } from "../../../content/reaction"; +import type { ProviderMessageRecord } from "../../../platform/build"; +import type { Message as SpectrumMessage } from "../../../types/message"; +import type { MessageReactionUpdated, ReactionType } from "../generated/types"; +import { chatToSender, chatToSpace, userToSender } from "../identity"; +import { messageCacheKey, type TelegramCache } from "../runtime/cache"; +import type { TelegramMessage } from "../types"; + +// Spectrum reaction content is add-only and plain-unicode, so we emit +// only newly-added emoji reactions from the `old_reaction`→`new_reaction` +// diff. Removes, custom emoji, and paid reactions are dropped. +const extractEmoji = (reaction: ReactionType): string | undefined => { + if (reaction.type !== "emoji") { + return; + } + return reaction.emoji; +}; + +const newlyAddedEmojis = (update: MessageReactionUpdated): string[] => { + const previous = new Set( + update.old_reaction.map(extractEmoji).filter((e): e is string => !!e) + ); + const added: string[] = []; + for (const reaction of update.new_reaction) { + const emoji = extractEmoji(reaction); + if (emoji && !previous.has(emoji)) { + added.push(emoji); + } + } + return added; +}; + +// Updates carry only the target's message id; return the cached record +// or a minimal stub if we never saw it. +const reactionTargetStub = ( + messageId: number, + space: ReturnType, + timestamp: Date +): ProviderMessageRecord => ({ + id: String(messageId), + content: asCustom({ telegram_type: "reaction-target", stub: true }), + sender: { id: "__unknown__" }, + space, + timestamp, +}); + +const resolveReactionTarget = ( + cache: TelegramCache, + messageId: number, + space: ReturnType, + timestamp: Date +): TelegramMessage | ProviderMessageRecord => { + const cached = cache.messages.get(messageCacheKey(space.id, messageId)); + if (cached) { + return cached; + } + return reactionTargetStub(messageId, space, timestamp); +}; + +export const reactionEventsFromUpdate = ( + update: MessageReactionUpdated, + updateId: number, + cache: TelegramCache +): TelegramMessage[] => { + // Anonymous channel admins arrive as `actor_chat`. + let sender: ReturnType; + if (update.user) { + sender = userToSender(update.user); + } else if (update.actor_chat) { + sender = chatToSender(update.actor_chat); + } else { + return []; + } + const space = chatToSpace(update.chat); + const timestamp = new Date(update.date * 1000); + const target = resolveReactionTarget( + cache, + update.message_id, + space, + timestamp + ); + return newlyAddedEmojis(update).map((emoji, index) => ({ + id: `reaction:${updateId}:${index}`, + content: asReaction({ + emoji, + target: target as unknown as SpectrumMessage, + }), + sender, + space, + timestamp, + })); +}; diff --git a/packages/spectrum-ts/src/providers/telegram/generated/methods.ts b/packages/spectrum-ts/src/providers/telegram/generated/methods.ts new file mode 100644 index 0000000..8a58465 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/generated/methods.ts @@ -0,0 +1,225 @@ +// GENERATED FILE — do not edit by hand. +// Source: providers/telegram/bot-api-spec/schema/telegram.json +// Regenerate with: bun run gen:telegram + + +import type { + Audio, + Chat, + Contact, + Document, + File, + InputPollOption, + LinkPreviewOptions, + Message, + MessageEntity, + MessageReactionUpdated, + PhotoSize, + Poll, + PollAnswer, + PollOption, + ReactionType, + ReplyParameters, + ResponseParameters, + Update, + User, + Video, + Voice, + InputFile, +} from "./types"; + +/** Returns basic info about the bot. */ +export type GetMeParams = Record; + +/** Receives incoming updates using long polling. */ +export interface GetUpdatesParams { + offset?: number; + /** Long-polling timeout in seconds */ + timeout?: number; + allowed_updates?: Array; +} + +/** Sends a text message. */ +export interface SendMessageParams { + chat_id: number | string; + text: string; + link_preview_options?: LinkPreviewOptions; + reply_parameters?: ReplyParameters; +} + +/** Sends a photo. */ +export interface SendPhotoParams { + chat_id: number | string; + photo: InputFile; + reply_parameters?: ReplyParameters; +} + +/** Sends a general file. */ +export interface SendDocumentParams { + chat_id: number | string; + document: InputFile; + reply_parameters?: ReplyParameters; +} + +/** Sends an audio file shown as music. */ +export interface SendAudioParams { + chat_id: number | string; + audio: InputFile; + reply_parameters?: ReplyParameters; +} + +/** Sends a video. */ +export interface SendVideoParams { + chat_id: number | string; + video: InputFile; + reply_parameters?: ReplyParameters; +} + +/** Sends an audio file to be displayed as a voice note. */ +export interface SendVoiceParams { + chat_id: number | string; + voice: InputFile; + duration?: number; + reply_parameters?: ReplyParameters; +} + +/** Sends a phone contact. */ +export interface SendContactParams { + chat_id: number | string; + phone_number: string; + first_name: string; + last_name?: string; + vcard?: string; + reply_parameters?: ReplyParameters; +} + +/** Sends a native Telegram poll. Spectrum sends regular (non-quiz) polls with `is_anonymous: false` so per-user vote events surface as poll_option content. Private 1:1 chats, groups, supergroups, and channels are all supported; the only chat type Telegram rejects is channel direct-message chats (the per-user threads on top of broadcast channels), which return BAD_REQUEST. */ +export interface SendPollParams { + chat_id: number | string; + question: string; + options: Array; + /** Defaults to true on the Telegram side. Spectrum forces this to `false` on outbound sends so vote events are attributed. */ + is_anonymous?: boolean; + /** Spectrum only sends regular polls. Quiz mode is intentionally out of scope: it requires `correct_option_ids`, and Spectrum's universal `poll` content has no notion of quiz correct-answer metadata. Callers needing quizzes should use the raw client directly. */ + type?: "regular"; + allows_multiple_answers?: boolean; + /** Bot API 9.6+: pass true to allow voters to change their selection after voting. */ + allows_revoting?: boolean; + /** Bot API 9.6+: pass true to randomize the order of options when displaying the poll. */ + shuffle_options?: boolean; + /** Bot API 9.6+: pass true to let users add options after the poll has been created. */ + allow_adding_options?: boolean; + /** Bot API 9.6+: pass true to hide aggregate results until the poll is closed. */ + hide_results_until_closes?: boolean; + reply_parameters?: ReplyParameters; +} + +/** Stops a poll the bot started. Returns the final Poll state. Not exposed through Spectrum's universal API; available via raw client for callers who need it. */ +export interface StopPollParams { + chat_id: number | string; + message_id: number; +} + +/** Edits a text message. */ +export interface EditMessageTextParams { + chat_id: number | string; + message_id: number; + text: string; + link_preview_options?: LinkPreviewOptions; +} + +/** Sets reactions on a message. */ +export interface SetMessageReactionParams { + chat_id: number | string; + message_id: number; + reaction?: Array; +} + +/** Broadcasts a chat action. */ +export interface SendChatActionParams { + chat_id: number | string; + action: "typing"; +} + +/** Gets a chat by ID. */ +export interface GetChatParams { + chat_id: number | string; +} + +/** Gets basic info about a file. Compose with baseUrl + /file/bot/ to download. */ +export interface GetFileParams { + file_id: string; +} + +/** Bot API method map. Used by the runtime client for type-safe invoke(). */ +export interface Methods { + getMe: { + params: GetMeParams; + result: User; + }; + getUpdates: { + params: GetUpdatesParams; + result: Array; + }; + sendMessage: { + params: SendMessageParams; + result: Message; + }; + sendPhoto: { + params: SendPhotoParams; + result: Message; + }; + sendDocument: { + params: SendDocumentParams; + result: Message; + }; + sendAudio: { + params: SendAudioParams; + result: Message; + }; + sendVideo: { + params: SendVideoParams; + result: Message; + }; + sendVoice: { + params: SendVoiceParams; + result: Message; + }; + sendContact: { + params: SendContactParams; + result: Message; + }; + sendPoll: { + params: SendPollParams; + result: Message; + }; + stopPoll: { + params: StopPollParams; + result: Poll; + }; + editMessageText: { + params: EditMessageTextParams; + result: Message | boolean; + }; + setMessageReaction: { + params: SetMessageReactionParams; + result: boolean; + }; + sendChatAction: { + params: SendChatActionParams; + result: boolean; + }; + getChat: { + params: GetChatParams; + result: Chat; + }; + getFile: { + params: GetFileParams; + result: File; + }; +} + +export type MethodName = keyof Methods; + +export const BASE_URL = "https://api.telegram.org"; +export const API_VERSION = "9.6"; diff --git a/packages/spectrum-ts/src/providers/telegram/generated/types.ts b/packages/spectrum-ts/src/providers/telegram/generated/types.ts new file mode 100644 index 0000000..a5e0511 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/generated/types.ts @@ -0,0 +1,210 @@ +// GENERATED FILE — do not edit by hand. +// Source: providers/telegram/bot-api-spec/schema/telegram.json +// Regenerate with: bun run gen:telegram + + +export type InputFile = string | Blob; + +/** Incoming update. At most one of the optional parameters can be present in any given update. */ +export interface Update { + update_id: number; + message?: Message; + edited_message?: Message; + channel_post?: Message; + edited_channel_post?: Message; + message_reaction?: MessageReactionUpdated; + /** New aggregate poll state. Bots receive only updates about stopped polls and polls they sent. */ + poll?: Poll; + /** User changed their vote in a non-anonymous poll. Anonymous polls do NOT emit poll_answer. */ + poll_answer?: PollAnswer; +} + +/** Telegram user or bot. */ +export interface User { + id: number; + is_bot: boolean; + first_name: string; + last_name?: string; + username?: string; + language_code?: string; +} + +/** Chat (private, group, supergroup, or channel). */ +export interface Chat { + id: number; + type: "private" | "group" | "supergroup" | "channel"; + title?: string; + username?: string; + /** Present for private chats; readable name of the user. */ + first_name?: string; + /** Present for private chats when the user supplied a last name. */ + last_name?: string; +} + +/** A message. */ +export interface Message { + message_id: number; + /** Unix time */ + date: number; + chat: Chat; + from?: User; + /** Sender of the message, sent on behalf of a chat (channel posts, anonymous group admins). */ + sender_chat?: Chat; + text?: string; + caption?: string; + entities?: Array; + link_preview_options?: LinkPreviewOptions; + photo?: Array; + document?: Document; + audio?: Audio; + video?: Video; + voice?: Voice; + contact?: Contact; + poll?: Poll; + /** Set on each member of an album. All messages sent together share the same id; absent on standalone messages. */ + media_group_id?: string; +} + +/** A special entity in a text message (URL, mention, hashtag, etc.). */ +export interface MessageEntity { + type: string; + offset: number; + length: number; + /** For text_link only: the URL to open */ + url?: string; +} + +/** Describes link-preview generation options for a message. */ +export interface LinkPreviewOptions { + is_disabled?: boolean; + url?: string; + prefer_small_media?: boolean; + prefer_large_media?: boolean; + show_above_text?: boolean; +} + +/** One size of a photo. */ +export interface PhotoSize { + file_id: string; + width: number; + height: number; + file_size?: number; +} + +/** General file (as opposed to photo/voice/video/audio). */ +export interface Document { + file_id: string; + file_name?: string; + mime_type?: string; + file_size?: number; +} + +/** Audio file to be treated as music. */ +export interface Audio { + file_id: string; + file_name?: string; + mime_type?: string; + file_size?: number; +} + +/** Video file. */ +export interface Video { + file_id: string; + file_name?: string; + mime_type?: string; + file_size?: number; +} + +/** Voice note. */ +export interface Voice { + file_id: string; + duration: number; + mime_type?: string; + file_size?: number; +} + +/** Phone contact. */ +export interface Contact { + phone_number: string; + first_name: string; + last_name?: string; + user_id?: number; + vcard?: string; +} + +/** Reaction type. Flat union shape: discriminate at runtime via `type`. */ +export interface ReactionType { + type: "emoji" | "custom_emoji" | "paid"; + /** Set when type === "emoji". */ + emoji?: string; + /** Set when type === "custom_emoji". */ + custom_emoji_id?: string; +} + +/** A reaction on a message changed by a user. */ +export interface MessageReactionUpdated { + chat: Chat; + message_id: number; + /** Unix time of the change */ + date: number; + user?: User; + actor_chat?: Chat; + old_reaction: Array; + new_reaction: Array; +} + +/** Option in a Telegram poll. */ +export interface PollOption { + text: string; + voter_count: number; +} + +/** Outbound option used by `sendPoll`. */ +export interface InputPollOption { + text: string; +} + +/** A native Telegram poll (regular or quiz). Spectrum exposes only regular polls; quiz/explanation/period fields are passed through but not modelled in the universal Content. */ +export interface Poll { + id: string; + question: string; + options: Array; + total_voter_count: number; + is_closed: boolean; + is_anonymous: boolean; + type: "regular" | "quiz"; + allows_multiple_answers: boolean; + /** Bot API 9.6+: 0-based identifiers of the correct answer options. Quiz polls only; available when the poll is closed or sent (not forwarded) by the bot. Replaces the pre-9.6 singular `correct_option_id`. */ + correct_option_ids?: Array; + /** Bot API 9.6+: True if the poll allows changing the chosen answer after voting. */ + allows_revoting?: boolean; +} + +/** Answer of a user in a non-anonymous poll. Spectrum keys vote-state caches by `poll_id`; chat/message_id are not provided by Telegram in this update so the poll cache must be populated when the poll is created or first observed. */ +export interface PollAnswer { + poll_id: string; + /** Set when the vote came from an anonymous channel admin acting on behalf of the channel. */ + voter_chat?: Chat; + /** Set when the vote came from a regular user. Either `user` or `voter_chat` is present. */ + user?: User; + /** Zero-based indices into the original poll options. Empty array means the user retracted all of their votes. */ + option_ids: Array; +} + +/** Reply parameters for outgoing messages. */ +export interface ReplyParameters { + message_id: number; +} + +/** Error response parameters. */ +export interface ResponseParameters { + retry_after?: number; + migrate_to_chat_id?: number; +} + +/** Downloadable file descriptor. */ +export interface File { + file_id: string; + file_size?: number; + file_path?: string; +} diff --git a/packages/spectrum-ts/src/providers/telegram/identity.ts b/packages/spectrum-ts/src/providers/telegram/identity.ts new file mode 100644 index 0000000..3d4b88e --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/identity.ts @@ -0,0 +1,83 @@ +import type { Chat, Message, User } from "./generated/types"; +import type { TelegramMessage } from "./types"; + +const chatIdToSpaceId = (chatId: number): string => String(chatId); +const userIdToSpectrumId = (userId: number): string => String(userId); + +export const userToSender = (user: User): TelegramMessage["sender"] => { + const sender: TelegramMessage["sender"] = { + id: userIdToSpectrumId(user.id), + chatId: user.id, + isBot: user.is_bot, + firstName: user.first_name, + }; + if (user.last_name !== undefined) { + sender.lastName = user.last_name; + } + if (user.username !== undefined) { + sender.username = user.username; + } + if (user.language_code !== undefined) { + sender.languageCode = user.language_code; + } + return sender; +}; + +// For channel posts and anonymous group-admin messages, `from` is absent +// and `sender_chat` carries the author. +export const chatToSender = ( + chat: NonNullable | Chat +): TelegramMessage["sender"] => { + const sender: TelegramMessage["sender"] = { + id: chatIdToSpaceId(chat.id), + chatId: chat.id, + isBot: false, + firstName: chat.title ?? chat.username ?? "Telegram chat", + }; + if (chat.username !== undefined) { + sender.username = chat.username; + } + return sender; +}; + +// Must be `type`, not `interface`: callers assign into `Record` +// slots which interfaces don't satisfy. +// biome-ignore lint/style/useConsistentTypeDefinitions: see comment above. +type TelegramSpaceShape = { + chatId: number; + id: string; + title?: string; + type: Chat["type"]; + username?: string; +}; + +// Private chats arrive without a `title`; synthesize a readable one +// from first_name + last_name so DM spaces still surface a usable name. +const privateChatTitle = (chat: Chat): string | undefined => { + if (chat.type !== "private") { + return; + } + const parts = [chat.first_name, chat.last_name].filter( + (value): value is string => typeof value === "string" && value.length > 0 + ); + if (parts.length === 0) { + return chat.username; + } + return parts.join(" "); +}; + +export const chatToSpace = (chat: Chat): TelegramSpaceShape => { + const space: TelegramSpaceShape = { + id: chatIdToSpaceId(chat.id), + chatId: chat.id, + type: chat.type, + }; + const title = chat.title ?? privateChatTitle(chat); + if (title !== undefined) { + space.title = title; + } + if (chat.username !== undefined) { + space.username = chat.username; + } + return space; +}; diff --git a/packages/spectrum-ts/src/providers/telegram/index.ts b/packages/spectrum-ts/src/providers/telegram/index.ts new file mode 100644 index 0000000..1ee802c --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/index.ts @@ -0,0 +1,115 @@ +import { definePlatform } from "../../platform/define"; +import { messages as telegramMessages } from "./events"; +import { chatToSpace } from "./identity"; +import { send as telegramSend } from "./messages"; +import { + createTelegramCache, + DEFAULT_CACHE_OPTIONS, + messageCacheKey, + type TelegramCacheOptions, +} from "./runtime/cache"; +import { TelegramClient } from "./runtime/client"; +import { + configSchema, + messageSchema, + spaceParamsSchema, + spaceSchema, + type TelegramConfig, + type TelegramRuntime, + userSchema, +} from "./types"; + +const asRuntime = (client: unknown): TelegramRuntime => + client as TelegramRuntime; + +const resolveCacheOptions = ( + cfg: TelegramConfig["cache"] +): TelegramCacheOptions => ({ + capacity: { + messages: cfg?.messages ?? DEFAULT_CACHE_OPTIONS.capacity.messages, + polls: cfg?.polls ?? DEFAULT_CACHE_OPTIONS.capacity.polls, + pollVotes: cfg?.pollVotes ?? DEFAULT_CACHE_OPTIONS.capacity.pollVotes, + albumConcurrent: + cfg?.albumConcurrent ?? DEFAULT_CACHE_OPTIONS.capacity.albumConcurrent, + }, + albumDebounceMs: + cfg?.albumDebounceMs ?? DEFAULT_CACHE_OPTIONS.albumDebounceMs, + albumCeilingMs: cfg?.albumCeilingMs ?? DEFAULT_CACHE_OPTIONS.albumCeilingMs, + coalesceAlbums: cfg?.coalesceAlbums ?? DEFAULT_CACHE_OPTIONS.coalesceAlbums, +}); + +export const telegram = definePlatform("Telegram", { + config: configSchema, + + user: { + schema: userSchema, + resolve: async ({ input }) => ({ id: input.userID }), + }, + + message: { + schema: messageSchema, + }, + + space: { + schema: spaceSchema, + params: spaceParamsSchema, + resolve: async ({ input, client }) => { + const runtime = asRuntime(client); + const chatIdSource = + input.params?.chatId ?? + (input.users.length === 1 ? input.users[0]?.id : undefined); + if (chatIdSource === undefined) { + throw new Error( + "Telegram space() requires params.chatId or a single resolved user" + ); + } + const chat = await runtime.client.invoke("getChat", { + chat_id: + typeof chatIdSource === "string" + ? chatIdSource + : Number(chatIdSource), + }); + return chatToSpace(chat); + }, + }, + + lifecycle: { + createClient: async ({ config }): Promise => { + const client = new TelegramClient({ + token: config.token, + ...(config.apiBaseUrl ? { baseUrl: config.apiBaseUrl } : {}), + }); + const me = await client.invoke("getMe", {}); + const cache = createTelegramCache(resolveCacheOptions(config.cache)); + return { client, abort: new AbortController(), cache, me }; + }, + + destroyClient: async ({ client }) => { + const runtime = asRuntime(client); + runtime.abort.abort(); + runtime.cache.destroy(); + }, + }, + + messages: ({ client, config }) => { + const runtime = asRuntime(client); + return telegramMessages(runtime, runtime.abort.signal, { + ...(config.pollingTimeout === undefined + ? {} + : { timeout: config.pollingTimeout }), + ...(config.dropPendingUpdates === undefined + ? {} + : { dropPendingUpdates: config.dropPendingUpdates }), + }); + }, + + send: async ({ space, content, client }) => + await telegramSend(asRuntime(client), space.id, content), + + actions: { + getMessage: async ({ space, messageId, client }) => + asRuntime(client).cache.messages.get( + messageCacheKey(space.id, messageId) + ), + }, +}); diff --git a/packages/spectrum-ts/src/providers/telegram/messages.ts b/packages/spectrum-ts/src/providers/telegram/messages.ts new file mode 100644 index 0000000..397375f --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/messages.ts @@ -0,0 +1,595 @@ +import type { Attachment } from "../../content/attachment"; +import type { Contact } from "../../content/contact"; +import type { Edit } from "../../content/edit"; +import { asGroup, type Group } from "../../content/group"; +import type { Poll as SpectrumPoll } from "../../content/poll"; +import type { Reaction } from "../../content/reaction"; +import type { Reply } from "../../content/reply"; +import type { Richlink } from "../../content/richlink"; +import type { Content } from "../../content/types"; +import type { Typing } from "../../content/typing"; +import type { Voice } from "../../content/voice"; +import type { Message as SpectrumMessage } from "../../types/message"; +import { UnsupportedError } from "../../utils/errors"; +import { toVCard } from "../../utils/vcard"; +import type { LinkPreviewOptions, Message } from "./generated/types"; +import { chatToSender, chatToSpace, userToSender } from "./identity"; +import { messageCacheKey } from "./runtime/cache"; +import type { TelegramMessage, TelegramRuntime } from "./types"; + +const PLATFORM_NAME = "Telegram"; + +// Telegram caps poll option text at 100 chars; pollSchema doesn't. +const TG_POLL_OPTION_MAX_LEN = 100; + +const toChatId = (spaceId: string): number | string => { + const asNumber = Number(spaceId); + if (Number.isInteger(asNumber) && String(asNumber) === spaceId) { + return asNumber; + } + return spaceId; +}; + +const DECIMAL_DIGITS = /^\d+$/; + +const toMessageId = (messageId: string): number => { + if (!DECIMAL_DIGITS.test(messageId)) { + throw new Error(`Invalid Telegram message_id: ${messageId}`); + } + const parsed = Number.parseInt(messageId, 10); + // Telegram message_ids are positive integers starting at 1; reject 0 + // and anything outside the safe-integer range up front. + if (!Number.isSafeInteger(parsed) || parsed <= 0) { + throw new Error(`Invalid Telegram message_id: ${messageId}`); + } + return parsed; +}; + +const recordOutbound = ( + runtime: TelegramRuntime, + message: Message, + content: Content +): TelegramMessage => { + let sender: TelegramMessage["sender"]; + if (message.from) { + sender = userToSender(message.from); + } else if (message.sender_chat) { + sender = chatToSender(message.sender_chat); + } else { + sender = userToSender(runtime.me); + } + const record: TelegramMessage = { + id: String(message.message_id), + content, + sender, + space: chatToSpace(message.chat), + timestamp: new Date(message.date * 1000), + }; + if (message.media_group_id !== undefined) { + record.mediaGroupId = message.media_group_id; + } + if (message.caption !== undefined && content.type !== "text") { + record.caption = message.caption; + } + runtime.cache.messages.set( + messageCacheKey(record.space.id, record.id), + record + ); + return record; +}; + +const attachmentToFile = async (att: Attachment): Promise => { + const buffer = await att.read(); + return new File([buffer], att.name, { type: att.mimeType }); +}; + +const voiceFile = async (voice: Voice): Promise => { + const buffer = await voice.read(); + const name = voice.name ?? "voice.ogg"; + return new File([buffer], name, { type: voice.mimeType }); +}; + +type AttachmentRoute = "photo" | "video" | "audio" | "document"; + +// `sendPhoto` only reliably accepts JPEG / PNG / WEBP; other image +// subtypes get rejected with opaque "WRONG_FILE_FORMAT" errors. Everything +// else falls through to `sendDocument`, which has no MIME constraints. +const PHOTO_MIME_TYPES = new Set(["image/jpeg", "image/png", "image/webp"]); + +const routeAttachment = (mime: string): AttachmentRoute => { + if (PHOTO_MIME_TYPES.has(mime)) { + return "photo"; + } + if (mime.startsWith("video/")) { + return "video"; + } + if (mime.startsWith("audio/")) { + return "audio"; + } + return "document"; +}; + +interface SendOpts { + replyToMessageId?: number; +} + +const replyParams = ( + opts: SendOpts +): { reply_parameters: { message_id: number } } | Record => + opts.replyToMessageId === undefined + ? {} + : { reply_parameters: { message_id: opts.replyToMessageId } }; + +const sendText = async ( + runtime: TelegramRuntime, + spaceId: string, + content: Content & { type: "text" }, + opts: SendOpts +): Promise => { + const message = await runtime.client.invoke("sendMessage", { + chat_id: toChatId(spaceId), + text: content.text, + ...replyParams(opts), + }); + return recordOutbound(runtime, message, content); +}; + +// `prefer_large_media` only takes effect when `url` is pinned; otherwise +// Telegram falls back to a thumbnail. +const richlinkPreviewOptions = (url: string): LinkPreviewOptions => ({ + is_disabled: false, + url, + prefer_large_media: true, + show_above_text: true, +}); + +// Bot API has no fields for caller-provided title / summary / cover — +// previews come from Telegram's server-side scraper. +const sendRichlinkContent = async ( + runtime: TelegramRuntime, + spaceId: string, + richlink: Richlink, + opts: SendOpts +): Promise => { + const message = await runtime.client.invoke("sendMessage", { + chat_id: toChatId(spaceId), + text: richlink.url, + link_preview_options: richlinkPreviewOptions(richlink.url), + ...replyParams(opts), + }); + return recordOutbound(runtime, message, richlink); +}; + +const sendAttachment = async ( + runtime: TelegramRuntime, + spaceId: string, + att: Attachment, + opts: SendOpts +): Promise => { + const chat_id = toChatId(spaceId); + const file = await attachmentToFile(att); + const reply = replyParams(opts); + const route = routeAttachment(att.mimeType); + const client = runtime.client; + + let message: Message; + switch (route) { + case "photo": + message = await client.invoke("sendPhoto", { + chat_id, + photo: file, + ...reply, + }); + break; + case "video": + message = await client.invoke("sendVideo", { + chat_id, + video: file, + ...reply, + }); + break; + case "audio": + message = await client.invoke("sendAudio", { + chat_id, + audio: file, + ...reply, + }); + break; + case "document": + message = await client.invoke("sendDocument", { + chat_id, + document: file, + ...reply, + }); + break; + default: { + const _exhaustive: never = route; + throw new Error(`Unhandled attachment route: ${String(_exhaustive)}`); + } + } + return recordOutbound(runtime, message, att); +}; + +const sendVoiceContent = async ( + runtime: TelegramRuntime, + spaceId: string, + voice: Voice, + opts: SendOpts +): Promise => { + const file = await voiceFile(voice); + const message = await runtime.client.invoke("sendVoice", { + chat_id: toChatId(spaceId), + voice: file, + ...(voice.duration === undefined + ? {} + : { duration: Math.round(voice.duration) }), + ...replyParams(opts), + }); + return recordOutbound(runtime, message, voice); +}; + +const VCARD_MAX_BYTES = 2048; +const VCARD_HEADER = "BEGIN:VCARD"; + +// Prefer a caller-supplied vCard verbatim (string or nested `raw.vcard`) so +// outbound contacts preserve any custom fields toVCard would normalize away. +const preservedVCard = (contact: Contact): string | undefined => { + const raw = contact.raw; + if (typeof raw === "string" && raw.startsWith(VCARD_HEADER)) { + return raw; + } + if (raw && typeof raw === "object" && "vcard" in raw) { + const candidate = (raw as { vcard: unknown }).vcard; + if (typeof candidate === "string" && candidate.startsWith(VCARD_HEADER)) { + return candidate; + } + } + return; +}; + +const hasExtraContactData = (contact: Contact): boolean => + (contact.phones?.length ?? 0) > 1 || + (contact.emails?.length ?? 0) > 0 || + (contact.addresses?.length ?? 0) > 0 || + (contact.urls?.length ?? 0) > 0 || + contact.org !== undefined || + contact.birthday !== undefined || + contact.note !== undefined || + contact.photo !== undefined || + preservedVCard(contact) !== undefined; + +const buildVCardField = async ( + contact: Contact +): Promise => { + if (!hasExtraContactData(contact)) { + return; + } + const preserved = preservedVCard(contact); + if (preserved !== undefined) { + return Buffer.byteLength(preserved, "utf8") > VCARD_MAX_BYTES + ? undefined + : preserved; + } + let card: string; + try { + card = await toVCard(contact); + } catch { + return; + } + if (Buffer.byteLength(card, "utf8") > VCARD_MAX_BYTES) { + return; + } + return card; +}; + +const sendContactContent = async ( + runtime: TelegramRuntime, + spaceId: string, + contact: Contact, + opts: SendOpts +): Promise => { + const phone = contact.phones?.[0]?.value; + if (!phone) { + throw new Error( + "Telegram sendContact requires at least one phone number on the contact" + ); + } + const firstName = contact.name?.first ?? contact.name?.formatted ?? "Contact"; + const params: { + chat_id: number | string; + phone_number: string; + first_name: string; + last_name?: string; + vcard?: string; + reply_parameters?: { message_id: number }; + } = { + chat_id: toChatId(spaceId), + phone_number: phone, + first_name: firstName, + }; + if (contact.name?.last !== undefined) { + params.last_name = contact.name.last; + } + const vcard = await buildVCardField(contact); + if (vcard !== undefined) { + params.vcard = vcard; + } + const reply = replyParams(opts); + if ("reply_parameters" in reply) { + params.reply_parameters = reply.reply_parameters; + } + const message = await runtime.client.invoke("sendContact", params); + return recordOutbound(runtime, message, contact); +}; + +// `setMessageReaction` returns a boolean. Single-element array overwrites +// any prior bot reaction with this emoji; `[]` would clear (not modeled). +const sendReactionContent = async ( + runtime: TelegramRuntime, + spaceId: string, + reaction: Reaction +): Promise => { + await runtime.client.invoke("setMessageReaction", { + chat_id: toChatId(spaceId), + message_id: toMessageId(reaction.target.id), + reaction: [{ type: "emoji", emoji: reaction.emoji }], + }); + return; +}; + +const validatePollOptionTitles = (poll: SpectrumPoll): void => { + for (const opt of poll.options) { + if (opt.title.length > TG_POLL_OPTION_MAX_LEN) { + throw new Error( + `Telegram poll option titles must be <= ${TG_POLL_OPTION_MAX_LEN} chars, got ${opt.title.length}: "${opt.title.slice(0, 32)}…"` + ); + } + } +}; + +// `is_anonymous: false` is required — Telegram only delivers `poll_answer` +// updates for non-anonymous polls the bot itself sent. +const sendPollContent = async ( + runtime: TelegramRuntime, + spaceId: string, + poll: SpectrumPoll, + opts: SendOpts +): Promise => { + validatePollOptionTitles(poll); + const message = await runtime.client.invoke("sendPoll", { + chat_id: toChatId(spaceId), + question: poll.title, + options: poll.options.map((o) => ({ text: o.title })), + is_anonymous: false, + ...replyParams(opts), + }); + if (message.poll) { + runtime.cache.polls.rememberPoll(message.poll.id, { + chat: { + id: String(message.chat.id), + chatId: message.chat.id, + type: message.chat.type, + ...(message.chat.title === undefined + ? {} + : { title: message.chat.title }), + ...(message.chat.username === undefined + ? {} + : { username: message.chat.username }), + }, + messageId: message.message_id, + poll, + }); + } + return recordOutbound(runtime, message, poll); +}; + +// `sendMediaGroup` only accepts homogeneous photo/video/audio/document +// buckets; Spectrum `group` is heterogeneous, so we iterate children +// through the normal send pipeline. Wrapper id = lead child id so +// replies / reactions target the album's first message. + +// Platform inflation adds `react()`/`reply()` closures downstream; the +// provider-side records intentionally lack them. +export const toGroupItems = (records: TelegramMessage[]): SpectrumMessage[] => + records as unknown as SpectrumMessage[]; + +const MIN_GROUP_ITEMS = 2; + +const sendGroupContent = async ( + runtime: TelegramRuntime, + spaceId: string, + group: Group, + opts: SendOpts +): Promise => { + // `asGroup` requires >= 2 items, but a hand-rolled `group` content could + // skip that check. Catch it here before any child is sent so we never + // partial-send a "group". + if (group.items.length < MIN_GROUP_ITEMS) { + throw new Error( + `Telegram group send requires at least ${MIN_GROUP_ITEMS} items, got ${group.items.length}` + ); + } + // Pre-validate every child — Telegram has no atomic undo for partial + // album sends. + for (const item of group.items) { + const child = item.content; + if (child.type === "group" || child.type === "reaction") { + throw UnsupportedError.content( + child.type, + PLATFORM_NAME, + `nested ${child.type} inside group is not allowed` + ); + } + if ( + child.type === "reply" || + child.type === "edit" || + child.type === "typing" + ) { + throw UnsupportedError.content( + child.type, + PLATFORM_NAME, + `${child.type} cannot be an album member` + ); + } + if (child.type === "custom") { + throw UnsupportedError.content("custom", PLATFORM_NAME); + } + if (child.type === "poll_option") { + throw UnsupportedError.content( + "poll_option", + PLATFORM_NAME, + "poll_option is an inbound-only content type" + ); + } + if (child.type === "effect") { + throw UnsupportedError.content( + "effect", + PLATFORM_NAME, + "effect is an iMessage-only content type" + ); + } + } + + const childRecords: TelegramMessage[] = []; + // Reply edge attaches to the first child only. + let firstOpts: SendOpts = opts; + for (const item of group.items) { + const child = item.content; + const childRecord = await dispatchSend(runtime, spaceId, child, firstOpts); + if (!childRecord) { + throw new Error( + `Telegram group send: child of type "${child.type}" did not produce a message` + ); + } + childRecords.push(childRecord); + firstOpts = {}; + } + const first = childRecords[0]; + if (!first) { + throw new Error("Telegram group send: empty items"); + } + const wrapper: TelegramMessage = { + ...first, + content: asGroup({ items: toGroupItems(childRecords) }), + }; + // Overwrite the lead child's per-message cache entry so + // `getMessage(wrapper.id)` returns what `send` returned. + runtime.cache.messages.set( + messageCacheKey(first.space.id, wrapper.id), + wrapper + ); + return wrapper; +}; + +// Telegram only edits text/richlink bodies; no edit endpoint for media +// or polls. +const sendEditContent = async ( + runtime: TelegramRuntime, + spaceId: string, + editContent: Edit +): Promise => { + const inner = editContent.content; + if (inner.type !== "text" && inner.type !== "richlink") { + throw UnsupportedError.content( + "edit", + PLATFORM_NAME, + `only text/richlink edits are supported, got "${inner.type}"` + ); + } + const text = inner.type === "text" ? inner.text : inner.url; + // Result is a boolean only for inline messages, which we never send. + const result = await runtime.client.invoke("editMessageText", { + chat_id: toChatId(spaceId), + message_id: toMessageId(editContent.target.id), + text, + ...(inner.type === "richlink" + ? { link_preview_options: richlinkPreviewOptions(inner.url) } + : {}), + }); + if (typeof result !== "boolean") { + recordOutbound(runtime, result, inner); + } + return; +}; + +// `sendChatAction` is fire-and-forget — Telegram auto-expires after ~5s +// and has no cancel API, so `stop` is a no-op. +const sendTypingContent = async ( + runtime: TelegramRuntime, + spaceId: string, + typing: Typing +): Promise => { + if (typing.state === "start") { + await runtime.client.invoke("sendChatAction", { + chat_id: toChatId(spaceId), + action: "typing", + }); + } + return; +}; + +const sendReplyContent = async ( + runtime: TelegramRuntime, + spaceId: string, + replyContent: Reply +): Promise => + await dispatchSend(runtime, spaceId, replyContent.content, { + replyToMessageId: toMessageId(replyContent.target.id), + }); + +const dispatchSend = async ( + runtime: TelegramRuntime, + spaceId: string, + content: Content, + opts: SendOpts +): Promise => { + switch (content.type) { + case "text": + return await sendText(runtime, spaceId, content, opts); + case "richlink": + return await sendRichlinkContent(runtime, spaceId, content, opts); + case "attachment": + return await sendAttachment(runtime, spaceId, content, opts); + case "voice": + return await sendVoiceContent(runtime, spaceId, content, opts); + case "contact": + return await sendContactContent(runtime, spaceId, content, opts); + case "poll": + return await sendPollContent(runtime, spaceId, content, opts); + case "reaction": + return await sendReactionContent(runtime, spaceId, content); + case "reply": + return await sendReplyContent(runtime, spaceId, content); + case "edit": + return await sendEditContent(runtime, spaceId, content); + case "typing": + return await sendTypingContent(runtime, spaceId, content); + case "custom": + throw UnsupportedError.content("custom", PLATFORM_NAME); + case "group": + return await sendGroupContent(runtime, spaceId, content, opts); + case "poll_option": + throw UnsupportedError.content( + "poll_option", + PLATFORM_NAME, + "poll_option is an inbound-only content type" + ); + case "effect": + throw UnsupportedError.content( + "effect", + PLATFORM_NAME, + "effect is an iMessage-only content type" + ); + default: { + content satisfies never; + throw UnsupportedError.content("unknown", PLATFORM_NAME); + } + } +}; + +export const send = ( + runtime: TelegramRuntime, + spaceId: string, + content: Content +): Promise => + dispatchSend(runtime, spaceId, content, {}); diff --git a/packages/spectrum-ts/src/providers/telegram/runtime/cache.ts b/packages/spectrum-ts/src/providers/telegram/runtime/cache.ts new file mode 100644 index 0000000..d1b5883 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/runtime/cache.ts @@ -0,0 +1,225 @@ +import QuickLRU from "quick-lru"; +import type { Poll as SpectrumPoll } from "../../../content/poll"; +import type { TelegramMessage } from "../types"; + +// Album members arrive as separate `Update`s sharing a `media_group_id`, +// with no "album finished" signal. Each new member arms a debounce; flush +// fires after `debounceMs` of quiet or when `ceilingMs` is reached. +export interface AlbumBufferEntry { + ceilingTimer: ReturnType; + debounceTimer: ReturnType; + members: TelegramMessage[]; +} + +export interface AlbumBufferOptions { + ceilingMs: number; + /** Max concurrent in-flight albums; oldest flushes early on overflow. */ + concurrentCapacity: number; + debounceMs: number; + flush: (members: TelegramMessage[]) => Promise | void; +} + +export class AlbumBuffer { + private readonly inFlight = new Map(); + private readonly options: AlbumBufferOptions; + + constructor(options: AlbumBufferOptions) { + this.options = options; + } + + push(mediaGroupId: string, member: TelegramMessage): void { + const existing = this.inFlight.get(mediaGroupId); + if (existing) { + existing.members.push(member); + clearTimeout(existing.debounceTimer); + existing.debounceTimer = this.armDebounce(mediaGroupId); + return; + } + if (this.inFlight.size >= this.options.concurrentCapacity) { + const oldestKey = this.inFlight.keys().next().value; + if (oldestKey !== undefined) { + this.flush(oldestKey); + } + } + const entry: AlbumBufferEntry = { + members: [member], + debounceTimer: this.armDebounce(mediaGroupId), + ceilingTimer: this.armCeiling(mediaGroupId), + }; + this.inFlight.set(mediaGroupId, entry); + } + + async flushAll(): Promise { + await Promise.all([...this.inFlight.keys()].map((key) => this.flush(key))); + } + + private armDebounce(mediaGroupId: string): ReturnType { + return setTimeout(() => { + this.flush(mediaGroupId).catch(() => { + // logged inside flush() + }); + }, this.options.debounceMs); + } + + private armCeiling(mediaGroupId: string): ReturnType { + return setTimeout(() => { + this.flush(mediaGroupId).catch(() => { + // logged inside flush() + }); + }, this.options.ceilingMs); + } + + private async flush(mediaGroupId: string): Promise { + const entry = this.inFlight.get(mediaGroupId); + if (!entry) { + return; + } + clearTimeout(entry.debounceTimer); + clearTimeout(entry.ceilingTimer); + this.inFlight.delete(mediaGroupId); + try { + await this.options.flush(entry.members); + } catch (err) { + console.error("Telegram album flush failed:", err); + } + } +} + +export interface CachedPoll { + // `poll_answer` updates carry no chat info, so we restore it from here. + chat: { + chatId: number; + id: string; + title?: string; + type: "private" | "group" | "supergroup" | "channel"; + username?: string; + }; + messageId: number; + poll: SpectrumPoll; +} + +const voteKey = (pollId: string, userId: number): string => + `${pollId}:${userId}`; + +export class PollStore { + readonly polls: QuickLRU; + readonly votes: QuickLRU; + + constructor(pollCapacity: number, voteCapacity: number) { + this.polls = new QuickLRU({ maxSize: pollCapacity }); + this.votes = new QuickLRU({ maxSize: voteCapacity }); + } + + rememberPoll(pollId: string, value: CachedPoll): void { + this.polls.set(pollId, value); + } + + resolvePoll(pollId: string): CachedPoll | undefined { + return this.polls.get(pollId); + } + + // Telegram delivers `Update.poll` whenever the bot-sent poll changes + // (e.g. `allow_adding_options` participants appending options). Refresh + // the cached option list so `poll_answer` indexes that fall past the + // original array still resolve. + refreshPollOptions( + pollId: string, + options: readonly { title: string }[] + ): void { + const cached = this.polls.peek(pollId); + if (!cached) { + return; + } + this.polls.set(pollId, { + ...cached, + poll: { ...cached.poll, options: [...options] }, + }); + } + + priorVote(pollId: string, userId: number): readonly number[] { + return this.votes.get(voteKey(pollId, userId)) ?? []; + } + + recordVote( + pollId: string, + userId: number, + optionIds: readonly number[] + ): void { + this.votes.set(voteKey(pollId, userId), optionIds); + } +} + +export interface TelegramCacheCapacities { + albumConcurrent: number; + messages: number; + polls: number; + pollVotes: number; +} + +export interface TelegramCacheTimings { + albumCeilingMs: number; + albumDebounceMs: number; +} + +export interface TelegramCacheOptions extends TelegramCacheTimings { + capacity: TelegramCacheCapacities; + coalesceAlbums: boolean; +} + +// `quick-lru` may hold up to 2× `maxSize` between promotions, so caps below +// are halved relative to the desired hard ceiling (e.g. 2500 → up to 5000). +export const DEFAULT_CACHE_OPTIONS: TelegramCacheOptions = { + capacity: { + messages: 2500, + polls: 250, + pollVotes: 2500, + albumConcurrent: 100, + }, + albumDebounceMs: 500, + albumCeilingMs: 2000, + coalesceAlbums: false, +}; + +// `message_id` is unique per-chat, so the key namespaces by `spaceId`. +export const messageCacheKey = ( + spaceId: string, + messageId: number | string +): string => `${spaceId}:${messageId}`; + +export interface TelegramCache { + readonly albumOptions: Pick< + AlbumBufferOptions, + "ceilingMs" | "concurrentCapacity" | "debounceMs" + >; + readonly coalesceAlbums: boolean; + destroy(): void; + readonly messages: QuickLRU; + readonly polls: PollStore; +} + +export const createTelegramCache = ( + options: TelegramCacheOptions +): TelegramCache => { + const messages = new QuickLRU({ + maxSize: options.capacity.messages, + }); + const polls = new PollStore( + options.capacity.polls, + options.capacity.pollVotes + ); + return { + messages, + polls, + coalesceAlbums: options.coalesceAlbums, + albumOptions: { + concurrentCapacity: options.capacity.albumConcurrent, + debounceMs: options.albumDebounceMs, + ceilingMs: options.albumCeilingMs, + }, + destroy() { + messages.clear(); + polls.polls.clear(); + polls.votes.clear(); + }, + }; +}; diff --git a/packages/spectrum-ts/src/providers/telegram/runtime/client.ts b/packages/spectrum-ts/src/providers/telegram/runtime/client.ts new file mode 100644 index 0000000..ca30c61 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/runtime/client.ts @@ -0,0 +1,376 @@ +import type { MethodName, Methods } from "../generated/methods"; +import { BASE_URL } from "../generated/methods"; +import { TelegramApiError, TelegramNetworkError } from "./errors"; +import { mergeRetryPolicy, type RetryPolicy, withRetry } from "./retry"; + +export interface TelegramClientOptions { + baseUrl?: string; + fetch?: typeof fetch; + /** Per-request deadline in ms. Null disables the timeout. Default: 60_000. */ + requestTimeoutMs?: number | null; + retry?: Partial; + token: string; +} + +const DEFAULT_REQUEST_TIMEOUT_MS = 60_000; + +// Telegram has no idempotency key, so mutating methods get at-most-once +// semantics: 429 retries only (server explicitly asks via `retry_after`). +// Reads keep the full policy. +const MUTATING_METHOD_PREFIXES = [ + "send", + "edit", + "delete", + "forward", + "copy", + "pin", + "unpin", + "ban", + "unban", + "restrict", + "promote", + "approve", + "decline", + "answer", + "set", + "create", + "close", + "reopen", + "stop", + "leave", + "uploadStickerFile", + "addStickerToSet", +] as const; + +const isMutatingMethod = (method: string): boolean => { + for (const prefix of MUTATING_METHOD_PREFIXES) { + if (method.startsWith(prefix)) { + return true; + } + } + return false; +}; + +const policyForMethod = (method: string, base: RetryPolicy): RetryPolicy => { + if (!isMutatingMethod(method)) { + return base; + } + return { + ...base, + retryNetworkErrors: false, + retryServerErrors: false, + }; +}; + +// Recognise aborts — either caller-driven cancellations or the internal +// request-timeout — so they propagate as control flow instead of being +// wrapped in `TelegramNetworkError` (which would trigger retries). +// `AbortSignal.timeout` rejects with a `DOMException` named `TimeoutError` +// on Node/undici, so both names are treated as terminal aborts. +const isAbortLike = (err: unknown, signal?: AbortSignal): boolean => { + if (signal?.aborted) { + return true; + } + if (typeof err !== "object" || err === null || !("name" in err)) { + return false; + } + const name = (err as { name: unknown }).name; + return name === "AbortError" || name === "TimeoutError"; +}; + +const combineSignals = ( + signals: (AbortSignal | undefined)[] +): AbortSignal | undefined => { + const present = signals.filter((s): s is AbortSignal => s !== undefined); + if (present.length === 0) { + return; + } + if (present.length === 1) { + return present[0]; + } + return AbortSignal.any(present); +}; + +interface ApiResponseOk { + ok: true; + result: T; +} + +interface ApiResponseErr { + description: string; + error_code: number; + ok: false; + parameters?: import("../generated/types").ResponseParameters; +} + +type ApiResponse = ApiResponseOk | ApiResponseErr; + +const hasTopLevelBlob = (params: Record): boolean => { + for (const value of Object.values(params)) { + if (value instanceof Blob) { + return true; + } + } + return false; +}; + +// `appendFormField` JSON-stringifies non-primitive values, which would +// silently drop binary content if a `Blob` were nested inside an object or +// array. Telegram's Bot API only accepts `InputFile`s at the top level of +// the multipart body, so a nested `Blob` is always a caller bug — fail +// loudly instead of letting it serialize to "{}". +const containsBlobDeep = (value: unknown): boolean => { + if (value instanceof Blob) { + return true; + } + if (Array.isArray(value)) { + return value.some(containsBlobDeep); + } + if (value && typeof value === "object") { + return Object.values(value as Record).some( + containsBlobDeep + ); + } + return false; +}; + +const appendFormField = (form: FormData, key: string, value: unknown): void => { + if (value === undefined || value === null) { + return; + } + if (value instanceof Blob) { + form.append(key, value); + return; + } + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ) { + form.append(key, String(value)); + return; + } + form.append(key, JSON.stringify(value)); +}; + +const buildBody = ( + params: Record +): { body: string | FormData; headers: Record } => { + if (hasTopLevelBlob(params)) { + for (const [key, value] of Object.entries(params)) { + if (!(value instanceof Blob) && containsBlobDeep(value)) { + throw new Error( + `Telegram client: parameter "${key}" contains a nested Blob; lift uploads into top-level multipart fields` + ); + } + } + const form = new FormData(); + for (const [key, value] of Object.entries(params)) { + appendFormField(form, key, value); + } + return { body: form, headers: {} }; + } + if (containsBlobDeep(params)) { + throw new Error( + "Telegram client: request params contain a nested Blob; lift uploads into top-level multipart fields" + ); + } + return { + body: JSON.stringify(params), + headers: { "content-type": "application/json" }, + }; +}; + +export class TelegramClient { + private readonly token: string; + private readonly baseUrl: string; + private readonly retryPolicy: RetryPolicy; + private readonly fetchImpl: typeof fetch; + private readonly requestTimeoutMs: number | null; + + constructor(opts: TelegramClientOptions) { + const token = typeof opts.token === "string" ? opts.token.trim() : ""; + if (!token) { + throw new Error( + "TelegramClient: token is required and cannot be empty or whitespace" + ); + } + this.token = token; + this.baseUrl = (opts.baseUrl ?? BASE_URL).replace(/\/+$/, ""); + this.retryPolicy = mergeRetryPolicy(opts.retry); + this.fetchImpl = opts.fetch ?? fetch; + const requestTimeoutMs = + opts.requestTimeoutMs === undefined + ? DEFAULT_REQUEST_TIMEOUT_MS + : opts.requestTimeoutMs; + if ( + requestTimeoutMs !== null && + (!Number.isFinite(requestTimeoutMs) || requestTimeoutMs < 0) + ) { + throw new RangeError( + `TelegramClient.requestTimeoutMs must be null or a non-negative finite number; got ${String(requestTimeoutMs)}` + ); + } + this.requestTimeoutMs = requestTimeoutMs; + } + + async invoke( + method: M, + params: Methods[M]["params"], + signal?: AbortSignal + ): Promise { + // Timeout signal is built once per invoke so the deadline covers + // retries, backoff, and any migration follow-ups. + const timeoutSignal = + this.requestTimeoutMs === null + ? undefined + : AbortSignal.timeout(this.requestTimeoutMs); + const combined = combineSignals([signal, timeoutSignal]); + return await this.invokeOnce(method, params, combined, 0); + } + + private async invokeOnce( + method: M, + params: Methods[M]["params"], + combined: AbortSignal | undefined, + migrations: number + ): Promise { + const url = `${this.baseUrl}/bot${this.token}/${method}`; + const { body, headers } = buildBody(params as Record); + + try { + return await withRetry( + async () => { + let response: Response; + try { + response = await this.fetchImpl(url, { + method: "POST", + headers, + body, + signal: combined, + }); + } catch (err) { + if (isAbortLike(err, combined)) { + throw err; + } + throw new TelegramNetworkError(method, err); + } + + let payload: ApiResponse; + try { + payload = (await response.json()) as ApiResponse< + Methods[M]["result"] + >; + } catch (err) { + if (isAbortLike(err, combined)) { + throw err; + } + throw new TelegramNetworkError(method, err); + } + + if (!payload.ok) { + throw new TelegramApiError({ + method, + errorCode: payload.error_code, + description: payload.description, + parameters: payload.parameters, + }); + } + + return payload.result; + }, + { policy: policyForMethod(method, this.retryPolicy), signal: combined } + ); + } catch (err) { + const newChatId = migrationTargetFor(err); + if (newChatId !== undefined && migrations < 1) { + const migrated = { + ...(params as Record), + chat_id: newChatId, + } as Methods[M]["params"]; + return await this.invokeOnce( + method, + migrated, + combined, + migrations + 1 + ); + } + throw err; + } + } + + fileUrl(filePath: string): string { + return `${this.baseUrl}/file/bot${this.token}/${filePath}`; + } + + async downloadFile( + filePath: string, + signal?: AbortSignal + ): Promise { + const url = this.fileUrl(filePath); + const timeoutSignal = + this.requestTimeoutMs === null + ? undefined + : AbortSignal.timeout(this.requestTimeoutMs); + const combined = combineSignals([signal, timeoutSignal]); + + return await withRetry( + async () => { + let response: Response; + try { + response = await this.fetchImpl(url, { signal: combined }); + } catch (err) { + if (isAbortLike(err, combined)) { + throw err; + } + throw new TelegramNetworkError("downloadFile", err); + } + + if (!response.ok) { + const snippet = await readErrorSnippet(response, combined); + if (combined?.aborted) { + throw combined.reason ?? new DOMException("Aborted", "AbortError"); + } + const base = `Telegram file download failed with HTTP ${response.status}`; + throw new TelegramApiError({ + method: "downloadFile", + errorCode: response.status, + description: snippet ? `${base}: ${snippet}` : base, + }); + } + + return response; + }, + { policy: this.retryPolicy, signal: combined } + ); + } +} + +const ERROR_SNIPPET_MAX_LEN = 200; + +const readErrorSnippet = async ( + response: Response, + signal?: AbortSignal +): Promise => { + try { + const text = await response.text(); + if (!text) { + return; + } + return text.length > ERROR_SNIPPET_MAX_LEN + ? `${text.slice(0, ERROR_SNIPPET_MAX_LEN)}…` + : text; + } catch (err) { + if (isAbortLike(err, signal)) { + throw err; + } + return; + } +}; + +const migrationTargetFor = (err: unknown): number | undefined => { + if (!(err instanceof TelegramApiError)) { + return; + } + return err.migrateToChatId; +}; diff --git a/packages/spectrum-ts/src/providers/telegram/runtime/errors.ts b/packages/spectrum-ts/src/providers/telegram/runtime/errors.ts new file mode 100644 index 0000000..c16d904 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/runtime/errors.ts @@ -0,0 +1,59 @@ +import type { ResponseParameters } from "../generated/types"; + +export interface TelegramApiErrorOptions { + description: string; + errorCode: number; + method: string; + parameters?: ResponseParameters; +} + +export class TelegramApiError extends Error { + readonly method: string; + readonly errorCode: number; + readonly description: string; + readonly parameters?: ResponseParameters; + + constructor(opts: TelegramApiErrorOptions) { + super( + `Telegram ${opts.method} failed (${opts.errorCode}): ${opts.description}` + ); + this.name = "TelegramApiError"; + this.method = opts.method; + this.errorCode = opts.errorCode; + this.description = opts.description; + this.parameters = opts.parameters; + } + + get isRateLimit(): boolean { + return this.errorCode === 429; + } + + get isServerError(): boolean { + return this.errorCode >= 500 && this.errorCode < 600; + } + + get isClientError(): boolean { + return ( + this.errorCode >= 400 && this.errorCode < 500 && this.errorCode !== 429 + ); + } + + get retryAfter(): number | undefined { + return this.parameters?.retry_after; + } + + get migrateToChatId(): number | undefined { + return this.parameters?.migrate_to_chat_id; + } +} + +export class TelegramNetworkError extends Error { + readonly method: string; + + constructor(method: string, cause: unknown) { + const reason = cause instanceof Error ? cause.message : String(cause); + super(`Telegram ${method} network error: ${reason}`, { cause }); + this.name = "TelegramNetworkError"; + this.method = method; + } +} diff --git a/packages/spectrum-ts/src/providers/telegram/runtime/polling.ts b/packages/spectrum-ts/src/providers/telegram/runtime/polling.ts new file mode 100644 index 0000000..a134eb7 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/runtime/polling.ts @@ -0,0 +1,101 @@ +import type { Update } from "../generated/types"; +import type { TelegramClient } from "./client"; + +export interface PollingOptions { + /** Omitted = `DEFAULT_ALLOWED_UPDATES`. Empty array = Telegram's default. */ + allowedUpdates?: string[]; + dropPendingUpdates?: boolean; + /** Long-polling timeout in seconds; Telegram caps at 50. */ + timeout?: number; +} + +const DEFAULT_TIMEOUT_SECONDS = 30; +const MAX_TIMEOUT_SECONDS = 50; + +const sanitizeTimeout = (raw: number | undefined): number => { + if (raw === undefined || !Number.isFinite(raw)) { + return DEFAULT_TIMEOUT_SECONDS; + } + const normalized = Math.trunc(raw); + if (normalized < 0) { + return 0; + } + if (normalized > MAX_TIMEOUT_SECONDS) { + return MAX_TIMEOUT_SECONDS; + } + return normalized; +}; + +// `message_reaction`, `poll`, and `poll_answer` are opt-in. +// `message_reaction_count` aggregates are excluded — no Spectrum content +// type maps to them. `poll` is consumed internally to keep the cached +// option list in sync for `allow_adding_options` polls. +const DEFAULT_ALLOWED_UPDATES: readonly string[] = [ + "message", + "edited_message", + "channel_post", + "edited_channel_post", + "message_reaction", + "poll", + "poll_answer", +]; + +// `offset: -1` returns the most recent pending update; advancing past it +// confirms-and-drops every older queued update. +const discardPendingUpdates = async ( + client: TelegramClient, + signal: AbortSignal +): Promise => { + const tail = await client.invoke( + "getUpdates", + { offset: -1, timeout: 0 }, + signal + ); + const last = tail.at(-1); + return last ? last.update_id + 1 : 0; +}; + +export async function* pollUpdates( + client: TelegramClient, + signal: AbortSignal, + options: PollingOptions = {} +): AsyncIterable { + const timeout = sanitizeTimeout(options.timeout); + const allowedUpdates = options.allowedUpdates ?? [...DEFAULT_ALLOWED_UPDATES]; + let offset = 0; + + if (options.dropPendingUpdates) { + try { + offset = await discardPendingUpdates(client, signal); + } catch (err) { + if (signal.aborted) { + return; + } + throw err; + } + } + + while (!signal.aborted) { + let batch: Update[]; + try { + batch = await client.invoke( + "getUpdates", + { offset, timeout, allowed_updates: allowedUpdates }, + signal + ); + } catch (err) { + if (signal.aborted) { + return; + } + throw err; + } + + for (const update of batch) { + if (signal.aborted) { + return; + } + yield update; + offset = update.update_id + 1; + } + } +} diff --git a/packages/spectrum-ts/src/providers/telegram/runtime/retry.ts b/packages/spectrum-ts/src/providers/telegram/runtime/retry.ts new file mode 100644 index 0000000..332ec91 --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/runtime/retry.ts @@ -0,0 +1,135 @@ +import { TelegramApiError, TelegramNetworkError } from "./errors"; + +export interface RetryPolicy { + baseDelayMs: number; + maxAttempts: number; + maxDelayMs: number; + retryNetworkErrors: boolean; + retryServerErrors: boolean; +} + +export const DEFAULT_RETRY_POLICY: RetryPolicy = { + maxAttempts: 5, + baseDelayMs: 500, + maxDelayMs: 30_000, + retryServerErrors: true, + retryNetworkErrors: true, +}; + +// Naive `{ ...defaults, ...override }` lets an explicit `undefined` in +// `override` blow away the default. Drop undefined entries first so +// callers can spread a sparse partial without breaking the policy. +export const mergeRetryPolicy = ( + override: Partial | undefined +): RetryPolicy => { + if (!override) { + return { ...DEFAULT_RETRY_POLICY }; + } + const merged: RetryPolicy = { ...DEFAULT_RETRY_POLICY }; + for (const [key, value] of Object.entries(override) as [ + keyof RetryPolicy, + RetryPolicy[keyof RetryPolicy] | undefined, + ][]) { + if (value !== undefined) { + (merged[key] as RetryPolicy[typeof key]) = + value as RetryPolicy[typeof key]; + } + } + return merged; +}; + +const sleep = (ms: number, signal?: AbortSignal): Promise => + new Promise((resolve, reject) => { + let settled = false; + let timer: ReturnType | undefined; + + const finalize = (fn: () => void) => { + if (settled) { + return; + } + settled = true; + if (timer !== undefined) { + clearTimeout(timer); + } + signal?.removeEventListener("abort", onAbort); + fn(); + }; + + const onAbort = () => { + finalize(() => reject(signal?.reason)); + }; + + signal?.addEventListener("abort", onAbort, { once: true }); + // Re-check after attaching the listener to close the race window where + // the signal aborts between entry and listener registration. + if (signal?.aborted) { + onAbort(); + return; + } + + timer = setTimeout(() => { + finalize(resolve); + }, ms); + }); + +const backoffDelay = (attempt: number, policy: RetryPolicy): number => { + const exp = policy.baseDelayMs * 2 ** (attempt - 1); + const capped = Math.min(exp, policy.maxDelayMs); + // Full jitter in [50%, 100%] of the capped delay so callers never get a + // 0ms "retry immediately" window that defeats the backoff. + return Math.floor(capped * (0.5 + Math.random() * 0.5)); +}; + +const nextDelay = ( + error: unknown, + attempt: number, + policy: RetryPolicy +): number | undefined => { + if (attempt >= policy.maxAttempts) { + return; + } + if (error instanceof TelegramApiError) { + if (error.isRateLimit) { + const retryAfter = error.retryAfter; + if (retryAfter !== undefined) { + // `retry_after` is Telegram telling us exactly when the rate limit + // window opens. Clamping it to `maxDelayMs` would retry too early, + // earn another 429, and burn through `maxAttempts` without ever + // respecting the limit. Callers wanting to bound how long a single + // request can hang can use `TelegramClientOptions.requestTimeoutMs`, + // which still applies via the merged abort signal. + return retryAfter * 1000; + } + return backoffDelay(attempt, policy); + } + if (error.isServerError && policy.retryServerErrors) { + return backoffDelay(attempt, policy); + } + return; + } + if (error instanceof TelegramNetworkError && policy.retryNetworkErrors) { + return backoffDelay(attempt, policy); + } + return; +}; + +export const withRetry = async ( + op: () => Promise, + opts: { policy?: Partial; signal?: AbortSignal } = {} +): Promise => { + const policy: RetryPolicy = mergeRetryPolicy(opts.policy); + let attempt = 1; + + while (true) { + try { + return await op(); + } catch (error) { + const delay = nextDelay(error, attempt, policy); + if (delay === undefined) { + throw error; + } + await sleep(delay, opts.signal); + attempt += 1; + } + } +}; diff --git a/packages/spectrum-ts/src/providers/telegram/types.ts b/packages/spectrum-ts/src/providers/telegram/types.ts new file mode 100644 index 0000000..5a9964a --- /dev/null +++ b/packages/spectrum-ts/src/providers/telegram/types.ts @@ -0,0 +1,72 @@ +import z from "zod"; +import type { SchemaMessage } from "../../platform/types"; +import type { User } from "./generated/types"; +import type { TelegramCache } from "./runtime/cache"; +import type { TelegramClient } from "./runtime/client"; + +export const cacheConfigSchema = z + .object({ + messages: z.number().int().positive().optional(), + polls: z.number().int().positive().optional(), + pollVotes: z.number().int().positive().optional(), + albumConcurrent: z.number().int().positive().optional(), + albumDebounceMs: z.number().int().nonnegative().optional(), + albumCeilingMs: z.number().int().nonnegative().optional(), + coalesceAlbums: z.boolean().optional(), + }) + .optional(); + +export const configSchema = z.object({ + token: z.string().trim().min(1), + apiBaseUrl: z.string().trim().url().optional(), + pollingTimeout: z.number().int().positive().max(50).optional(), + dropPendingUpdates: z.boolean().optional(), + cache: cacheConfigSchema, +}); + +export type TelegramConfig = z.infer; + +// All fields optional: `telegram.user({ userID })` resolves with no chat +// context and the Bot API has no "fetch user by id" endpoint. +export const userSchema = z.object({ + chatId: z.number().int().optional(), + firstName: z.string().optional(), + isBot: z.boolean().optional(), + lastName: z.string().optional(), + username: z.string().optional(), + languageCode: z.string().optional(), +}); + +export const spaceSchema = z.object({ + id: z.string(), + chatId: z.number().int(), + type: z.enum(["private", "group", "supergroup", "channel"]), + title: z.string().optional(), + username: z.string().optional(), +}); + +export const spaceParamsSchema = z.object({ + chatId: z.union([z.string().trim().min(1), z.number().int()]), +}); + +// `mediaGroupId`: shared id across all members of a Telegram album. +// `caption`: present on media messages that ship with caption text. +export const messageSchema = z.object({ + mediaGroupId: z.string().optional(), + caption: z.string().optional(), +}); + +export type TelegramMessage = SchemaMessage< + typeof userSchema, + typeof spaceSchema +> & { + mediaGroupId?: string; + caption?: string; +}; + +export interface TelegramRuntime { + abort: AbortController; + cache: TelegramCache; + client: TelegramClient; + me: User; +} diff --git a/packages/spectrum-ts/tsup.config.ts b/packages/spectrum-ts/tsup.config.ts index 10de15e..b2b26ba 100644 --- a/packages/spectrum-ts/tsup.config.ts +++ b/packages/spectrum-ts/tsup.config.ts @@ -5,6 +5,7 @@ export default defineConfig({ index: "src/index.ts", "providers/index": "src/providers/index.ts", "providers/imessage/index": "src/providers/imessage/index.ts", + "providers/telegram/index": "src/providers/telegram/index.ts", "providers/terminal/index": "src/providers/terminal/index.ts", "providers/whatsapp-business/index": "src/providers/whatsapp-business/index.ts",