diff --git a/Cargo.lock b/Cargo.lock index e82843bc..e6b1ea0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1594,9 +1594,9 @@ dependencies = [ [[package]] name = "parakeet-rs" -version = "0.2.9" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7667842fd2f3b97b029a30fb9a00138867c6915229f5acd6bd809d08250d2ee" +checksum = "447e20c2f0ecc35e581aa30da451e4238221a0dfd020c4ee6bdba7cb8c7b52dd" dependencies = [ "eyre", "hound", diff --git a/Cargo.toml b/Cargo.toml index b55a26c1..0cd52846 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,7 @@ rodio = { version = "0.19", default-features = false, features = ["wav"] } whisper-rs = "0.15.1" # Parakeet speech-to-text (optional, ONNX-based) -parakeet-rs = { version = "0.2.9", optional = true } +parakeet-rs = { version = "0.3.1", optional = true } # CPU count for thread detection @@ -89,7 +89,7 @@ gpu-hipblas = ["whisper-rs/hipblas"] parakeet = ["dep:parakeet-rs"] parakeet-cuda = ["parakeet", "parakeet-rs/cuda"] parakeet-tensorrt = ["parakeet", "parakeet-rs/tensorrt"] -parakeet-rocm = ["parakeet", "parakeet-rs/rocm"] +parakeet-rocm = ["parakeet", "parakeet-rs/migraphx"] # Dynamic loading for system ONNX Runtime (used by Nix builds) parakeet-load-dynamic = ["parakeet", "parakeet-rs/load-dynamic"] diff --git a/contrib/nemotron-streaming-test-config.toml b/contrib/nemotron-streaming-test-config.toml new file mode 100644 index 00000000..095cdf8c --- /dev/null +++ b/contrib/nemotron-streaming-test-config.toml @@ -0,0 +1,46 @@ +# Voxtype test config: Nemotron streaming transcription +# +# Setup: +# 1. Build with parakeet support: +# cargo build --features parakeet +# +# 2. Download the Nemotron model: +# voxtype setup model +# (select "nemotron-speech-streaming-en-0.6b") +# +# 3. Run the daemon: +# voxtype -c contrib/nemotron-streaming-test-config.toml daemon + +engine = "parakeet" + +[hotkey] +key = "Super_R" +enabled = true +mode = "push-to-talk" + +[audio] +device = "default" +max_duration_secs = 30 + +[audio.feedback] +enabled = true + +[parakeet] +# Downloaded via: voxtype setup model +model = "nemotron-speech-streaming-en-0.6b" + +# Auto-detected from model files, but you can force it: +# model_type = "nemotron" + +# Streaming is auto-enabled for Nemotron models. +# Set to false to use batch transcription instead: +# streaming = false + +[output] +mode = "type" +state_file = "auto" + +[output.notification] +on_recording_start = true +on_recording_stop = true +on_transcription = true diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 67889192..a9f49320 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -61,6 +61,8 @@ The main key to hold for recording. Must be a valid Linux evdev key name. - `PAUSE` - Pause/Break key - `RIGHTALT` - Right Alt key - `F13` through `F24` - Extended function keys +- `MEDIA` - Media key (often a dedicated button on multimedia keyboards) +- `RECORD` - Record key - `INSERT` - Insert key - `HOME` - Home key - `END` - End key @@ -74,10 +76,25 @@ The main key to hold for recording. Must be a valid Linux evdev key name. key = "PAUSE" ``` +**Numeric keycodes:** + +You can also specify keys by their numeric keycode if the key name isn't in the built-in list. Use a prefix to indicate the source tool, since different tools report different numbers for the same key: + +- `WEV_234` or `X11_234` or `XEV_234` - XKB keycode as shown by `wev` or `xev` (offset by 8 from the kernel value) +- `EVTEST_226` - kernel keycode as shown by `evtest` +- Hex values are also accepted: `WEV_0xEA`, `EVTEST_0xE2` + +Bare numeric values (e.g. `226`) are not accepted because `wev`/`xev` and `evtest` report different numbers for the same key. + **Finding key names:** ```bash +# Using evtest (shows kernel keycodes): sudo evtest # Select keyboard, press desired key, note KEY_XXXX name + +# Using wev on Wayland (shows XKB keycodes): +wev +# Press the key, note the keycode number — use with WEV_ prefix ``` ### modifiers @@ -593,6 +610,100 @@ voxtype --whisper-context-optimization daemon **Note:** This setting only applies when using the local whisper backend (`backend = "local"`). It has no effect with remote transcription. +### eager_processing + +**Type:** Boolean +**Default:** `false` +**Required:** No + +Enable eager input processing. When enabled, audio is split into chunks and transcribed in parallel with continued recording, reducing perceived latency on slower machines. + +**Values:** +- `false` (default) - Traditional mode: record all audio, then transcribe +- `true` - Eager mode: transcribe chunks while recording continues + +**How it works:** + +1. While recording, audio is split into fixed-size chunks (default 5 seconds) +2. Each chunk is sent for transcription as soon as it's ready +3. Recording continues while earlier chunks are being transcribed +4. When recording stops, all chunk results are combined + +**When to use eager processing:** +- You have a slower CPU where transcription takes several seconds +- You regularly dictate longer passages (10+ seconds) +- You want to minimize the delay between speaking and text output + +**When to keep default (`false`):** +- You have a fast CPU or GPU acceleration +- Your recordings are typically short (under 5 seconds) +- You want maximum transcription accuracy (single-pass is more consistent) + +**Example:** +```toml +[whisper] +model = "base.en" +eager_processing = true +eager_chunk_secs = 5.0 # 5 second chunks +eager_overlap_secs = 0.5 # 0.5 second overlap +``` + +**CLI override:** +```bash +voxtype --eager-processing daemon +``` + +**Note:** Eager processing is experimental. There may be occasional word duplications or omissions at chunk boundaries. + +### eager_chunk_secs + +**Type:** Float +**Default:** `5.0` +**Required:** No + +Duration of each audio chunk in seconds when eager processing is enabled. + +**Example:** +```toml +[whisper] +eager_processing = true +eager_chunk_secs = 3.0 # Shorter chunks for faster feedback +``` + +**CLI override:** +```bash +voxtype --eager-processing --eager-chunk-secs 3.0 daemon +``` + +**Trade-offs:** +- Shorter chunks: Faster feedback, but more boundary artifacts +- Longer chunks: Better accuracy, but less parallelism benefit + +### eager_overlap_secs + +**Type:** Float +**Default:** `0.5` +**Required:** No + +Overlap duration in seconds between adjacent chunks when eager processing is enabled. Overlap helps catch words that span chunk boundaries. + +**Example:** +```toml +[whisper] +eager_processing = true +eager_chunk_secs = 5.0 +eager_overlap_secs = 1.0 # More overlap for better boundary handling +``` + +**CLI override:** +```bash +voxtype --eager-processing --eager-overlap-secs 1.0 daemon +``` + +**Trade-offs:** +- More overlap: Better word boundary handling, slightly more processing +- Less overlap: Faster processing, but may miss words at boundaries + ### initial_prompt **Type:** String @@ -893,6 +1004,7 @@ The model architecture type. Usually auto-detected based on files present in the **Values:** - `tdt` - Token-Duration-Transducer (recommended, proper punctuation) - `ctc` - Connectionist Temporal Classification (faster, character-level) +- `nemotron` - Nemotron streaming transducer (supports real-time streaming output) **Example:** ```toml @@ -901,6 +1013,42 @@ model = "parakeet-tdt-0.6b-v3" model_type = "tdt" ``` +**Auto-detection:** + +The model type is detected from the files in the model directory: + +| Model Type | Required Files | +|-----------|---------------| +| TDT | `encoder-model.onnx`, `decoder_joint-model.onnx`, `vocab.txt` | +| CTC | `model.onnx` (or `model_int8.onnx`), `tokenizer.json` | +| Nemotron | `encoder.onnx`, `decoder_joint.onnx`, `tokenizer.model` | + +### streaming + +**Type:** Boolean +**Default:** Auto (enabled for Nemotron, disabled for TDT/CTC) +**Required:** No + +When enabled, text is typed live during recording as the model produces output. Each audio chunk (560ms) is processed incrementally, and the resulting text is typed immediately at the cursor position. + +Streaming is automatically enabled when using a Nemotron model and can be explicitly overridden. + +**Example:** +```toml +[parakeet] +model = "/path/to/nemotron-model" +model_type = "nemotron" +streaming = true # This is the default for Nemotron +``` + +**To disable streaming for a Nemotron model (batch mode instead):** +```toml +[parakeet] +model = "/path/to/nemotron-model" +model_type = "nemotron" +streaming = false # Wait until recording stops, then transcribe all at once +``` + ### on_demand_loading **Type:** Boolean @@ -916,8 +1064,9 @@ model = "parakeet-tdt-0.6b-v3" on_demand_loading = true # Free memory when not transcribing ``` -### Complete Example +### Complete Examples +**TDT model (recommended for batch transcription):** ```toml engine = "parakeet" @@ -926,6 +1075,16 @@ model = "parakeet-tdt-0.6b-v3" on_demand_loading = false # Keep model loaded for fast response ``` +**Nemotron model (streaming transcription):** +```toml +engine = "parakeet" + +[parakeet] +model = "/path/to/nemotron-0.6b" +model_type = "nemotron" +# streaming = true is the default for Nemotron +``` + --- ## [output] @@ -1016,20 +1175,21 @@ fallback_to_clipboard = true # Use clipboard if typing drivers fail ### driver_order **Type:** Array of strings -**Default:** `["wtype", "dotool", "ydotool", "clipboard", "xclip"]` +**Default:** `["wtype", "eitype", "dotool", "ydotool", "clipboard", "xclip"]` **Required:** No Custom order of output drivers to try when `mode = "type"`. Each driver is tried in sequence until one succeeds. This allows you to prefer specific drivers or exclude others entirely. **Available drivers:** -- `wtype` - Wayland virtual keyboard (best CJK/Unicode support, wlroots compositors only) +- `wtype` - Wayland virtual keyboard protocol (best CJK/Unicode support, wlroots compositors only) +- `eitype` - Wayland via libei/EI protocol (works on GNOME, KDE, and compositors with libei support) - `dotool` - uinput-based typing (supports keyboard layouts, works on X11/Wayland/TTY) - `ydotool` - uinput-based typing (requires daemon, X11/Wayland/TTY) - `clipboard` - Wayland clipboard via wl-copy - `xclip` - X11 clipboard via xclip **Default behavior (no driver_order set):** -The default chain is: wtype → dotool → ydotool → clipboard → xclip +The default chain is: wtype → eitype → dotool → ydotool → clipboard → xclip **Examples:** @@ -1046,8 +1206,8 @@ driver_order = ["dotool", "ydotool", "xclip"] # Force single driver (no fallback) driver_order = ["ydotool"] -# KDE/GNOME Wayland (wtype doesn't work) -driver_order = ["dotool", "ydotool", "clipboard"] +# GNOME/KDE Wayland (prefer eitype, wtype doesn't work) +driver_order = ["eitype", "dotool", "clipboard"] ``` **CLI override:** diff --git a/docs/SMOKE_TESTS.md b/docs/SMOKE_TESTS.md index 045e4ac4..a1f8d0ab 100644 --- a/docs/SMOKE_TESTS.md +++ b/docs/SMOKE_TESTS.md @@ -270,6 +270,47 @@ voxtype record start && sleep 3 && voxtype record stop journalctl --user -u voxtype --since "1 minute ago" | grep -E "Loading|Unloading" ``` +## Eager Processing + +Tests parallel transcription of audio chunks during recording: + +```bash +# 1. Enable eager processing in config.toml: +# [whisper] +# eager_processing = true +# eager_chunk_secs = 3.0 # Use short chunks for visible testing +# eager_overlap_secs = 0.5 + +# 2. Restart daemon +systemctl --user restart voxtype + +# 3. Record for 10+ seconds (to generate multiple chunks) +voxtype record start +sleep 12 +voxtype record stop + +# 4. Check logs for chunk processing: +journalctl --user -u voxtype --since "1 minute ago" | grep -iE "eager|chunk" +# Expected: "Spawning eager transcription for chunk 0" +# "Spawning eager transcription for chunk 1" +# "Chunk 0 completed" +# "Combined eager chunks" + +# 5. Verify combined output is coherent (no obvious word duplication) +# The final transcription should read naturally + +# 6. Test cancellation during eager recording +voxtype record start +sleep 5 +voxtype record cancel +journalctl --user -u voxtype --since "30 seconds ago" | grep -iE "cancel|abort" +# Expected: chunk tasks are cancelled, no transcription output + +# 7. Restore default (disabled) when done testing: +# [whisper] +# eager_processing = false +``` + ## Model Switching ```bash diff --git a/docs/USER_MANUAL.md b/docs/USER_MANUAL.md index ce7310a5..50f20fd8 100644 --- a/docs/USER_MANUAL.md +++ b/docs/USER_MANUAL.md @@ -17,6 +17,7 @@ Voxtype is a push-to-talk voice-to-text tool for Linux. Optimized for Wayland, w - [Whisper Models](#whisper-models) - [Remote Whisper Servers](#remote-whisper-servers) - [CLI Backend (whisper-cli)](#cli-backend-whisper-cli) +- [Eager Processing](#eager-processing) - [Output Modes](#output-modes) - [Post-Processing with LLMs](#post-processing-with-llms) - [Profiles](#profiles) @@ -334,6 +335,8 @@ Any key supported by the Linux evdev system can be used as a hotkey: | Right Alt | `RIGHTALT` | | Right Ctrl | `RIGHTCTRL` | | F13-F24 | `F13`, `F14`, ... `F24` | +| Media | `MEDIA` | +| Record | `RECORD` | | Insert | `INSERT` | | Home | `HOME` | | End | `END` | @@ -355,6 +358,19 @@ sudo evtest # Look for "KEY_XXXXX" - use the part after KEY_ ``` +### Numeric Keycodes + +If your key isn't in the built-in list, you can specify it by numeric keycode. Use a prefix to indicate which tool you got the number from, since `wev`/`xev` and `evtest` report different numbers for the same key (XKB keycodes are offset by 8 from kernel keycodes): + +```toml +[hotkey] +key = "WEV_234" # XKB keycode from wev/xev (KEY_MEDIA) +key = "EVTEST_226" # Kernel keycode from evtest (KEY_MEDIA) +key = "WEV_0xEA" # Hex also works +``` + +Prefixes: `WEV_`, `X11_`, `XEV_` (XKB keycode), `EVTEST_` (kernel keycode). + ### Using Modifier Keys Require modifier keys to be held along with your hotkey: @@ -576,6 +592,36 @@ Parakeet is NVIDIA's FastConformer-based ASR model. It offers: See [PARAKEET.md](PARAKEET.md) for detailed setup instructions. +### Nemotron Streaming (Experimental) + +Nemotron is a streaming variant of the Parakeet architecture. Unlike TDT and CTC models that transcribe after you stop recording, Nemotron types text live as you speak. + +**How it works:** +- Audio is split into 560ms chunks during recording +- Each chunk is fed to the Nemotron model immediately +- Recognized text is typed at the cursor in real-time +- When you release the hotkey, any remaining audio is flushed + +**Setup:** +1. Download a Nemotron ONNX model (contains `encoder.onnx`, `decoder_joint.onnx`, `tokenizer.model`) +2. Configure voxtype to use it: + +```toml +engine = "parakeet" + +[parakeet] +model = "/path/to/nemotron-model" +# model_type = "nemotron" is auto-detected from model files +``` + +3. Start the daemon. Text will appear live as you speak. + +**Notes:** +- Streaming mode is automatically enabled when a Nemotron model is detected +- Set `streaming = false` in `[parakeet]` to disable streaming and use batch mode instead +- Post-processing commands are not applied during streaming (text is output raw) +- Requires a Parakeet-enabled binary (`voxtype-*-parakeet-*`) + --- ## Multi-Model Support @@ -938,6 +984,107 @@ This adds minimal overhead compared to the FFI approach since file I/O is fast o --- +## Eager Processing + +Eager processing transcribes audio in chunks while you're still recording. Instead of waiting until you release the hotkey to start transcription, voxtype begins processing audio in the background as you speak. When you stop recording, the final chunk is transcribed and all results are combined. + +### When to Use Eager Processing + +Eager processing is most valuable when: + +1. **You have slow transcription hardware**: On machines where transcription takes longer than the recording itself, eager processing parallelizes the work to reduce overall wait time. + +2. **You make long recordings**: For recordings over 15-30 seconds, starting transcription early means less waiting when you're done speaking. + +3. **You use large models**: Larger Whisper models (medium, large-v3) are slower. Eager processing helps hide some of that latency. + +**Not recommended when:** +- You use fast models (tiny, base) on modern hardware with GPU acceleration +- Your recordings are typically short (under 5 seconds) +- You're on a laptop and want to minimize battery usage + +### How It Works + +With eager processing enabled: + +1. Audio accumulates as you record +2. Every `eager_chunk_secs` (default: 5 seconds), a chunk is extracted and sent for transcription +3. Chunks overlap by `eager_overlap_secs` (default: 0.5 seconds) to avoid missing words at boundaries +4. When you stop recording, all chunk results are combined and deduplicated +5. The final text is output + +The overlap region helps catch words that might be split across chunk boundaries. The deduplication logic matches overlapping text to produce a clean result. + +### Configuration + +Enable eager processing in `~/.config/voxtype/config.toml`: + +```toml +[whisper] +model = "medium.en" + +# Enable eager input processing +eager_processing = true + +# Chunk duration (default: 5.0 seconds) +eager_chunk_secs = 5.0 + +# Overlap between chunks (default: 0.5 seconds) +eager_overlap_secs = 0.5 +``` + +Or via CLI flags: + +```bash +voxtype --eager-processing --eager-chunk-secs 5.0 daemon +``` + +### Tuning Chunk Size + +The chunk duration affects the trade-off between parallelization and overhead: + +| Chunk Size | Pros | Cons | +|------------|------|------| +| 3 seconds | More parallelization, faster for slow models | More boundary handling, slightly higher CPU | +| 5 seconds | Good balance for most cases | - | +| 10 seconds | Fewer chunks to combine | Less parallelization benefit | + +For testing, try `eager_chunk_secs = 3.0` to see more chunk messages in the logs. + +### Trade-offs + +**Benefits:** +- Reduced perceived latency on slow hardware +- Better experience for long recordings +- Parallelizes transcription work across recording time + +**Limitations:** +- Boundary handling may occasionally produce artifacts (repeated or dropped words at chunk edges) +- Slightly higher CPU usage during recording +- Adds complexity to the transcription pipeline + +For most users with modern hardware and GPU acceleration, the default (disabled) provides the cleanest results. Enable eager processing when latency is a problem that outweighs the small risk of boundary artifacts. + +### Verifying It Works + +Run voxtype with verbose logging: + +```bash +voxtype -vv +``` + +Then record for 10+ seconds. You should see log messages like: + +``` +[DEBUG] Spawning eager transcription for chunk 0 +[DEBUG] Spawning eager transcription for chunk 1 +[DEBUG] Chunk 0 completed: "This is the first part of my recording" +[DEBUG] Chunk 1 completed: "the first part of my recording and here is more" +[DEBUG] Combined eager chunks with deduplication +``` + +--- + ## Output Modes ### Type Mode (Default) @@ -973,16 +1120,23 @@ systemctl --user enable --now ydotool **Compositor Compatibility:** -wtype does not work on all Wayland compositors. KDE Plasma and GNOME do not support the virtual keyboard protocol that wtype requires. +wtype does not work on all Wayland compositors. KDE Plasma and GNOME do not support the virtual keyboard protocol that wtype requires. However, eitype uses the libei/EI protocol which is supported by GNOME and KDE. -| Desktop | wtype | dotool | ydotool | clipboard | Notes | -|---------|-------|--------|---------|-----------|-------| -| Hyprland, Sway, River | ✓ | ✓ | ✓ | wl-copy | wtype recommended (best CJK support) | -| KDE Plasma (Wayland) | ✗ | ✓ | ✓ | wl-copy | dotool recommended (keyboard layout support) | -| GNOME (Wayland) | ✗ | ✓ | ✓ | wl-copy | dotool recommended (keyboard layout support) | -| X11 (any) | ✗ | ✓ | ✓ | xclip | dotool or ydotool; xclip for clipboard | +| Desktop | wtype | eitype | dotool | ydotool | clipboard | Notes | +|---------|-------|--------|--------|---------|-----------|-------| +| Hyprland, Sway, River | ✓ | * | ✓ | ✓ | wl-copy | wtype recommended (best CJK support) | +| KDE Plasma (Wayland) | ✗ | ✓ | ✓ | ✓ | wl-copy | eitype recommended (native EI protocol) | +| GNOME (Wayland) | ✗ | ✓ | ✓ | ✓ | wl-copy | eitype recommended (native EI protocol) | +| X11 (any) | ✗ | ✗ | ✓ | ✓ | xclip | dotool or ydotool; xclip for clipboard | -**KDE Plasma and GNOME users:** Install dotool (recommended) or set up ydotool for type mode to work. +\* eitype works on wlroots compositors with libei support. + +**KDE Plasma and GNOME users:** Install eitype (recommended) or dotool for type mode to work. + +For eitype (recommended for GNOME/KDE): +```bash +cargo install eitype +``` For dotool (recommended for non-US keyboards): ```bash @@ -1054,7 +1208,7 @@ mode = "paste" ### Fallback Behavior -Voxtype uses a fallback chain: wtype → dotool → ydotool → clipboard (wl-copy) → xclip +Voxtype uses a fallback chain: wtype → eitype → dotool → ydotool → clipboard (wl-copy) → xclip ```toml [output] @@ -1062,7 +1216,7 @@ mode = "type" fallback_to_clipboard = true # Falls back to clipboard if typing fails ``` -On Wayland, wtype is tried first (best CJK support), then dotool (supports keyboard layouts), then ydotool, then wl-copy (Wayland clipboard). On X11, xclip is available as an additional clipboard fallback. +On Wayland, wtype is tried first (best CJK support), then eitype (libei protocol, works on GNOME/KDE), then dotool (supports keyboard layouts), then ydotool, then wl-copy (Wayland clipboard). On X11, xclip is available as an additional clipboard fallback. ### Custom Driver Order @@ -1075,7 +1229,7 @@ mode = "type" driver_order = ["ydotool", "wtype", "clipboard"] ``` -**Available drivers:** `wtype`, `dotool`, `ydotool`, `clipboard` (wl-copy), `xclip` (X11) +**Available drivers:** `wtype`, `eitype`, `dotool`, `ydotool`, `clipboard` (wl-copy), `xclip` (X11) **Examples:** @@ -1086,8 +1240,8 @@ driver_order = ["ydotool", "xclip"] # Force ydotool only (no fallback) driver_order = ["ydotool"] -# KDE/GNOME Wayland (wtype doesn't work) -driver_order = ["dotool", "ydotool", "clipboard"] +# GNOME/KDE Wayland (prefer eitype, wtype doesn't work) +driver_order = ["eitype", "dotool", "clipboard"] ``` **CLI override:** diff --git a/src/cli.rs b/src/cli.rs index 23e6c63b..af0f22d7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -68,11 +68,23 @@ pub struct Cli { #[arg(long, value_name = "PROMPT")] pub initial_prompt: Option, + /// Enable eager input processing (transcribe chunks while recording continues) + #[arg(long)] + pub eager_processing: bool, + + /// Chunk duration in seconds for eager processing (default: 5.0) + #[arg(long, value_name = "SECS")] + pub eager_chunk_secs: Option, + + /// Overlap between chunks in seconds for eager processing (default: 0.5) + #[arg(long, value_name = "SECS")] + pub eager_overlap_secs: Option, + /// Override transcription engine: "whisper" (default) or "parakeet" (EXPERIMENTAL) #[arg(long, value_name = "ENGINE")] pub engine: Option, - /// Override hotkey (e.g., SCROLLLOCK, PAUSE, F13) + /// Override hotkey (e.g., SCROLLLOCK, PAUSE, F13, MEDIA, WEV_234, EVTEST_226) #[arg(long, value_name = "KEY")] pub hotkey: Option, diff --git a/src/config.rs b/src/config.rs index b2f5e1bb..03973099 100644 --- a/src/config.rs +++ b/src/config.rs @@ -120,6 +120,18 @@ translate = false # Default: 300 (5 minutes). Only applies when gpu_isolation = false. # cold_model_timeout_secs = 300 +# --- Eager processing settings --- +# +# Enable eager input processing (transcribe chunks while recording continues) +# Reduces perceived latency on slower machines by processing audio in parallel. +# eager_processing = false +# +# Duration of each audio chunk in seconds (default: 5.0) +# eager_chunk_secs = 5.0 +# +# Overlap between chunks in seconds (helps catch words at boundaries, default: 0.5) +# eager_overlap_secs = 0.5 + # --- Remote backend settings (used when backend = "remote") --- # # Remote server endpoint URL (required for remote backend) @@ -390,6 +402,14 @@ fn default_cold_model_timeout() -> u64 { 300 // 5 minutes } +fn default_eager_chunk_secs() -> f32 { + 5.0 +} + +fn default_eager_overlap_secs() -> f32 { + 0.5 +} + fn default_whisper_model() -> String { "base.en".to_string() } @@ -710,6 +730,22 @@ pub struct WhisperConfig { #[serde(default = "default_context_window_optimization")] pub context_window_optimization: bool, + // --- Eager processing settings --- + /// Enable eager input processing (transcribe chunks while recording continues) + /// When enabled, audio is split into chunks and transcribed in parallel with + /// continued recording. This reduces perceived latency on slower machines. + #[serde(default)] + pub eager_processing: bool, + + /// Duration of each audio chunk in seconds for eager processing + #[serde(default = "default_eager_chunk_secs")] + pub eager_chunk_secs: f32, + + /// Overlap between adjacent chunks in seconds for eager processing + /// Overlap helps catch words at chunk boundaries + #[serde(default = "default_eager_overlap_secs")] + pub eager_overlap_secs: f32, + /// Initial prompt to provide context for transcription /// Use this to hint at terminology, proper nouns, or formatting conventions. /// Example: "Technical discussion about Rust, TypeScript, and Kubernetes." @@ -717,7 +753,6 @@ pub struct WhisperConfig { pub initial_prompt: Option, // --- Multi-model settings --- - /// Secondary model to use when hotkey.model_modifier is held /// Example: "large-v3-turbo" for difficult audio #[serde(default)] @@ -774,9 +809,7 @@ impl WhisperConfig { } // Fall back to deprecated `backend` with warning if let Some(backend) = self.backend { - tracing::warn!( - "DEPRECATED: [whisper] backend is deprecated, use 'mode' instead" - ); + tracing::warn!("DEPRECATED: [whisper] backend is deprecated, use 'mode' instead"); tracing::warn!( " Change 'backend = \"{}\"' to 'mode = \"{}\"' in config.toml", match backend { @@ -808,6 +841,9 @@ impl Default for WhisperConfig { on_demand_loading: default_on_demand_loading(), gpu_isolation: false, context_window_optimization: default_context_window_optimization(), + eager_processing: false, + eager_chunk_secs: default_eager_chunk_secs(), + eager_overlap_secs: default_eager_overlap_secs(), initial_prompt: None, secondary_model: None, available_models: vec![], @@ -831,6 +867,8 @@ pub enum ParakeetModelType { /// TDT (Token-Duration-Transducer) - recommended, proper punctuation and word boundaries #[default] Tdt, + /// Nemotron (streaming transducer) - supports real-time streaming transcription + Nemotron, } /// Parakeet speech-to-text configuration (ONNX-based, alternative to Whisper) @@ -840,9 +878,10 @@ pub struct ParakeetConfig { /// Path to model directory containing ONNX model files /// For TDT: encoder-model.onnx, decoder_joint-model.onnx, vocab.txt /// For CTC: model.onnx, tokenizer.json + /// For Nemotron: encoder.onnx, encoder.onnx.data, decoder_joint.onnx, tokenizer.model pub model: String, - /// Model architecture type: "tdt" (default, recommended) or "ctc" + /// Model architecture type: "tdt" (default, recommended), "ctc", or "nemotron" /// Auto-detected from model directory structure if not specified #[serde(default)] pub model_type: Option, @@ -850,6 +889,11 @@ pub struct ParakeetConfig { /// Load model on-demand when recording starts (true) or keep loaded (false) #[serde(default = "default_on_demand_loading")] pub on_demand_loading: bool, + + /// Enable streaming transcription (text typed live during recording) + /// Default: auto (enabled when model_type is Nemotron, disabled otherwise) + #[serde(default)] + pub streaming: Option, } impl Default for ParakeetConfig { @@ -858,10 +902,19 @@ impl Default for ParakeetConfig { model: "parakeet-tdt-0.6b-v3".to_string(), model_type: None, // Auto-detect on_demand_loading: false, + streaming: None, // Auto: enabled for Nemotron, disabled otherwise } } } +impl ParakeetConfig { + /// Check if streaming is enabled (auto-detects based on model type) + pub fn streaming_enabled(&self, detected_model_type: ParakeetModelType) -> bool { + self.streaming + .unwrap_or(matches!(detected_model_type, ParakeetModelType::Nemotron)) + } +} + /// Transcription engine selection (which ASR technology to use) #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)] #[serde(rename_all = "lowercase")] @@ -1100,8 +1153,10 @@ pub enum OutputMode { #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum OutputDriver { - /// wtype - Wayland-native, best Unicode/CJK support + /// wtype - Wayland-native via virtual-keyboard protocol, best Unicode/CJK support Wtype, + /// eitype - Wayland via libei/EI protocol, works on GNOME/KDE + Eitype, /// dotool - Works on X11/Wayland/TTY, supports keyboard layouts Dotool, /// ydotool - Works on X11/Wayland/TTY, requires daemon @@ -1116,6 +1171,7 @@ impl std::fmt::Display for OutputDriver { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { OutputDriver::Wtype => write!(f, "wtype"), + OutputDriver::Eitype => write!(f, "eitype"), OutputDriver::Dotool => write!(f, "dotool"), OutputDriver::Ydotool => write!(f, "ydotool"), OutputDriver::Clipboard => write!(f, "clipboard"), @@ -1130,12 +1186,13 @@ impl std::str::FromStr for OutputDriver { fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { "wtype" => Ok(OutputDriver::Wtype), + "eitype" => Ok(OutputDriver::Eitype), "dotool" => Ok(OutputDriver::Dotool), "ydotool" => Ok(OutputDriver::Ydotool), "clipboard" => Ok(OutputDriver::Clipboard), "xclip" => Ok(OutputDriver::Xclip), _ => Err(format!( - "Unknown driver '{}'. Valid options: wtype, dotool, ydotool, clipboard, xclip", + "Unknown driver '{}'. Valid options: wtype, eitype, dotool, ydotool, clipboard, xclip", s )), } @@ -1184,6 +1241,9 @@ impl Default for Config { on_demand_loading: default_on_demand_loading(), gpu_isolation: false, context_window_optimization: default_context_window_optimization(), + eager_processing: false, + eager_chunk_secs: default_eager_chunk_secs(), + eager_overlap_secs: default_eager_overlap_secs(), initial_prompt: None, secondary_model: None, available_models: vec![], @@ -1975,7 +2035,10 @@ mod tests { let config: Config = toml::from_str(toml_str).unwrap(); assert_eq!(config.engine, TranscriptionEngine::Parakeet); assert!(config.parakeet.is_some()); - assert_eq!(config.parakeet.as_ref().unwrap().model, "parakeet-tdt-0.6b-v3"); + assert_eq!( + config.parakeet.as_ref().unwrap().model, + "parakeet-tdt-0.6b-v3" + ); } #[test] @@ -2003,15 +2066,39 @@ mod tests { #[test] fn test_output_driver_from_str() { - assert_eq!("wtype".parse::().unwrap(), OutputDriver::Wtype); - assert_eq!("dotool".parse::().unwrap(), OutputDriver::Dotool); - assert_eq!("ydotool".parse::().unwrap(), OutputDriver::Ydotool); - assert_eq!("clipboard".parse::().unwrap(), OutputDriver::Clipboard); - assert_eq!("xclip".parse::().unwrap(), OutputDriver::Xclip); + assert_eq!( + "wtype".parse::().unwrap(), + OutputDriver::Wtype + ); + assert_eq!( + "dotool".parse::().unwrap(), + OutputDriver::Dotool + ); + assert_eq!( + "ydotool".parse::().unwrap(), + OutputDriver::Ydotool + ); + assert_eq!( + "clipboard".parse::().unwrap(), + OutputDriver::Clipboard + ); + assert_eq!( + "xclip".parse::().unwrap(), + OutputDriver::Xclip + ); // Case insensitive - assert_eq!("WTYPE".parse::().unwrap(), OutputDriver::Wtype); - assert_eq!("Ydotool".parse::().unwrap(), OutputDriver::Ydotool); - assert_eq!("XCLIP".parse::().unwrap(), OutputDriver::Xclip); + assert_eq!( + "WTYPE".parse::().unwrap(), + OutputDriver::Wtype + ); + assert_eq!( + "Ydotool".parse::().unwrap(), + OutputDriver::Ydotool + ); + assert_eq!( + "XCLIP".parse::().unwrap(), + OutputDriver::Xclip + ); // Invalid assert!("invalid".parse::().is_err()); } @@ -2405,11 +2492,17 @@ mod tests { assert_eq!(config.profiles.len(), 2); let slack = config.get_profile("slack").unwrap(); - assert_eq!(slack.post_process_command, Some("cleanup-for-slack.sh".to_string())); + assert_eq!( + slack.post_process_command, + Some("cleanup-for-slack.sh".to_string()) + ); assert!(slack.output_mode.is_none()); let code = config.get_profile("code").unwrap(); - assert_eq!(code.post_process_command, Some("cleanup-for-code.sh".to_string())); + assert_eq!( + code.post_process_command, + Some("cleanup-for-code.sh".to_string()) + ); assert_eq!(code.output_mode, Some(OutputMode::Clipboard)); } @@ -2438,7 +2531,10 @@ mod tests { let config: Config = toml::from_str(toml_str).unwrap(); let slow = config.get_profile("slow").unwrap(); - assert_eq!(slow.post_process_command, Some("slow-llm-command".to_string())); + assert_eq!( + slow.post_process_command, + Some("slow-llm-command".to_string()) + ); assert_eq!(slow.post_process_timeout_ms, Some(60000)); } diff --git a/src/daemon.rs b/src/daemon.rs index 2eb40877..679dd749 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -6,12 +6,14 @@ use crate::audio::feedback::{AudioFeedback, SoundEvent}; use crate::audio::{self, AudioCapture}; use crate::config::{ActivationMode, Config, FileMode, OutputMode}; +use crate::eager::{self, EagerConfig}; use crate::error::Result; use crate::hotkey::{self, HotkeyEvent}; use crate::model_manager::ModelManager; use crate::output; +use crate::output::TextOutput; use crate::output::post_process::PostProcessor; -use crate::state::State; +use crate::state::{ChunkResult, State}; use crate::text::TextProcessor; use crate::transcribe::Transcriber; use pidlock::Pidlock; @@ -21,9 +23,15 @@ use std::sync::Arc; use std::time::Duration; use tokio::process::Command; use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::mpsc; /// Send a desktop notification with optional engine icon -async fn send_notification(title: &str, body: &str, show_engine_icon: bool, engine: crate::config::TranscriptionEngine) { +async fn send_notification( + title: &str, + body: &str, + show_engine_icon: bool, + engine: crate::config::TranscriptionEngine, +) { let title = if show_engine_icon { format!("{} {}", crate::output::engine_icon(engine), title) } else { @@ -31,12 +39,7 @@ async fn send_notification(title: &str, body: &str, show_engine_icon: bool, engi }; let _ = Command::new("notify-send") - .args([ - "--app-name=Voxtype", - "--expire-time=2000", - &title, - body, - ]) + .args(["--app-name=Voxtype", "--expire-time=2000", &title, body]) .stdout(Stdio::null()) .stderr(Stdio::null()) .status() @@ -307,6 +310,18 @@ fn cleanup_model_override() { /// Result type for transcription task type TranscriptionResult = std::result::Result; +/// Commands sent to the persistent streaming transcriber task +enum StreamingCommand { + /// Feed audio samples to the transcriber + Audio(Vec), + /// Flush remaining audio (silence padding) and send final text + Flush, + /// Reset transcriber state for a new utterance (no text output) + Reset, + /// Shut down the task entirely + Shutdown, +} + /// Main daemon that orchestrates all components pub struct Daemon { config: Config, @@ -319,9 +334,29 @@ pub struct Daemon { // Model manager for multi-model support model_manager: Option, // Background task for loading model on-demand - model_load_task: Option, crate::error::TranscribeError>>>, + model_load_task: Option< + tokio::task::JoinHandle< + std::result::Result, crate::error::TranscribeError>, + >, + >, // Background task for transcription (allows cancel during transcription) transcription_task: Option>, + // Background tasks for eager chunk transcriptions (chunk_index, task) + eager_chunk_tasks: Vec<( + usize, + tokio::task::JoinHandle>, + )>, + // Streaming transcription: channels for chunk→transcriber→text communication + streaming_chunk_tx: Option>, + streaming_text_rx: Option>, + // Handle for the streaming transcriber blocking task + streaming_task: Option>, + // Chunk size in samples for streaming transcription + streaming_chunk_size: usize, + // Cached output chain and driver index for streaming sessions. + // Avoids recreating the chain and re-probing is_available() per delta. + streaming_output_chain: Option>>, + streaming_output_index: Option, } impl Daemon { @@ -382,6 +417,13 @@ impl Daemon { model_manager: None, model_load_task: None, transcription_task: None, + eager_chunk_tasks: Vec::new(), + streaming_chunk_tx: None, + streaming_text_rx: None, + streaming_task: None, + streaming_chunk_size: 8960, // Default, updated when streaming transcriber is initialized + streaming_output_chain: None, + streaming_output_index: None, } } @@ -483,6 +525,393 @@ impl Daemon { } } + /// Check if streaming transcription should be used for the current config + fn should_use_streaming(&self) -> bool { + if self.config.engine != crate::config::TranscriptionEngine::Parakeet { + return false; + } + if let Some(ref parakeet) = self.config.parakeet { + // Detect model type from config or filesystem + let model_type = parakeet.model_type.unwrap_or_else(|| { + // Auto-detect from model files on disk + #[cfg(feature = "parakeet")] + { + match crate::transcribe::parakeet::resolve_model_path(¶keet.model) { + Ok(path) => crate::transcribe::parakeet::detect_model_type(&path), + Err(_) => { + // Can't resolve path; fall back to checking explicit streaming flag + if parakeet.streaming == Some(true) { + crate::config::ParakeetModelType::Nemotron + } else { + crate::config::ParakeetModelType::Tdt + } + } + } + } + #[cfg(not(feature = "parakeet"))] + { + crate::config::ParakeetModelType::Tdt + } + }); + parakeet.streaming_enabled(model_type) + } else { + false + } + } + + /// Initialize the persistent streaming transcriber at startup. + /// Loads the model once and spawns a long-lived blocking task. + /// Call `begin_streaming_session()` to start each recording, not this. + fn init_streaming_transcriber( + &mut self, + ) -> std::result::Result<(), crate::error::TranscribeError> { + let config = self.config.clone(); + let mut streaming = crate::transcribe::create_streaming_transcriber(&config)?; + + let chunk_size = streaming.chunk_size(); + let (cmd_tx, mut cmd_rx) = mpsc::channel::(32); + let (text_tx, text_rx) = mpsc::channel::(32); + + let task = tokio::task::spawn_blocking(move || { + while let Some(cmd) = cmd_rx.blocking_recv() { + match cmd { + StreamingCommand::Audio(chunk) => { + match streaming.transcribe_chunk(&chunk) { + Ok(delta) if !delta.is_empty() => { + if text_tx.blocking_send(delta).is_err() { + tracing::debug!("Streaming text receiver dropped"); + break; + } + } + Ok(_) => {} // No new text yet + Err(e) => { + tracing::error!("Streaming chunk transcription failed: {}", e); + } + } + } + StreamingCommand::Flush => { + match streaming.flush() { + Ok(delta) if !delta.is_empty() => { + let _ = text_tx.blocking_send(delta); + } + Ok(_) => {} + Err(e) => { + tracing::error!("Streaming flush failed: {}", e); + } + } + tracing::debug!( + "Streaming flush complete, transcript: {:?}", + streaming.get_transcript() + ); + // Send sentinel so stop_streaming() knows flush is done + let _ = text_tx.blocking_send("\0".to_string()); + } + StreamingCommand::Reset => { + streaming.reset(); + tracing::debug!("Streaming transcriber reset for new utterance"); + } + StreamingCommand::Shutdown => { + tracing::debug!("Streaming transcriber shutting down"); + break; + } + } + } + }); + + self.streaming_chunk_tx = Some(cmd_tx); + self.streaming_text_rx = Some(text_rx); + self.streaming_task = Some(task); + self.streaming_chunk_size = chunk_size; + + tracing::info!( + "Streaming transcriber ready (chunk_size={} samples, {:.0}ms)", + chunk_size, + chunk_size as f64 / 16.0 + ); + Ok(()) + } + + /// Begin a new streaming session (reset state for new recording). + /// The model is already loaded; this just resets the transcriber state. + async fn begin_streaming_session(&mut self) { + if let Some(ref tx) = self.streaming_chunk_tx { + let _ = tx.send(StreamingCommand::Reset).await; + } + + // Pre-create and probe the output chain once for the entire streaming session + let chain = output::create_output_chain(&self.config.output); + let index = output::probe_output_chain(&chain).await; + if let Some(idx) = index { + tracing::debug!("Cached streaming output driver: {} (index {})", chain[idx].name(), idx); + } else { + tracing::warn!("No output driver available at streaming session start"); + } + self.streaming_output_chain = Some(chain); + self.streaming_output_index = index; + } + + /// Clear the cached output chain. Returns the chain if caller needs it for a final output. + fn take_streaming_output_cache(&mut self) -> (Option>>, Option) { + (self.streaming_output_chain.take(), self.streaming_output_index.take()) + } + + /// Stop streaming transcription: flush remaining audio, collect final text, then reset. + /// Waits for all in-flight chunks to be processed plus the flush silence chunks. + /// Returns any text produced after the main loop stopped polling text_rx. + async fn stop_streaming(&mut self) -> String { + let mut final_text = String::new(); + + // Send flush command — this queues behind any in-flight audio chunks, + // so the blocking task will process all pending audio before flushing. + if let Some(ref tx) = self.streaming_chunk_tx { + let _ = tx.send(StreamingCommand::Flush).await; + } + + // Collect all text until we receive the flush-done sentinel ("\0"). + // This includes text from in-flight audio chunks AND the flush itself. + if let Some(ref mut rx) = self.streaming_text_rx { + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + loop { + match tokio::time::timeout_at(deadline, rx.recv()).await { + Ok(Some(delta)) if delta == "\0" => { + // Sentinel: flush is done, all text collected + tracing::debug!("Received flush-done sentinel"); + break; + } + Ok(Some(delta)) if !delta.is_empty() => { + final_text.push_str(&delta); + } + Ok(Some(_)) => {} // empty delta, keep waiting + Ok(None) => { + tracing::debug!("Streaming text channel closed during flush"); + break; + } + Err(_) => { + tracing::warn!("Timed out waiting for streaming flush to complete"); + break; + } + } + } + } + + // Reset for next recording + if let Some(ref tx) = self.streaming_chunk_tx { + let _ = tx.send(StreamingCommand::Reset).await; + } + + final_text + } + + /// Cancel streaming transcription without flushing (reset for next recording) + async fn cancel_streaming(&mut self) { + if let Some(ref tx) = self.streaming_chunk_tx { + let _ = tx.send(StreamingCommand::Reset).await; + } + // Clear cached output chain since the streaming session is ending + self.streaming_output_chain = None; + self.streaming_output_index = None; + } + + /// Spawn a transcription task for a single chunk (eager processing) + fn spawn_chunk_transcription( + &mut self, + chunk_index: usize, + chunk_audio: Vec, + transcriber: Arc, + ) { + tracing::debug!( + "Spawning eager transcription for chunk {} ({:.1}s)", + chunk_index, + chunk_audio.len() as f32 / 16000.0 + ); + + let task = tokio::task::spawn_blocking(move || transcriber.transcribe(&chunk_audio)); + + self.eager_chunk_tasks.push((chunk_index, task)); + } + + /// Check for any ready chunks in accumulated audio and spawn transcription tasks + /// Returns the number of new chunks spawned + fn process_eager_chunks( + &mut self, + accumulated_audio: &[f32], + chunks_sent: &mut usize, + tasks_in_flight: &mut usize, + transcriber: &Arc, + ) -> usize { + let eager_config = EagerConfig::from_whisper_config(&self.config.whisper); + let complete_chunks = eager::count_complete_chunks(accumulated_audio.len(), &eager_config); + + let mut spawned = 0; + while *chunks_sent < complete_chunks { + if let Some(chunk_audio) = + eager::extract_chunk(accumulated_audio, *chunks_sent, &eager_config) + { + self.spawn_chunk_transcription(*chunks_sent, chunk_audio, transcriber.clone()); + *chunks_sent += 1; + *tasks_in_flight += 1; + spawned += 1; + } else { + break; + } + } + + spawned + } + + /// Poll for completed chunk transcription tasks and collect results + /// Returns any completed results + async fn poll_chunk_tasks(&mut self) -> Vec { + let mut completed = Vec::new(); + let mut remaining_tasks = Vec::new(); + + for (chunk_index, task) in self.eager_chunk_tasks.drain(..) { + if task.is_finished() { + // Task is finished, await will complete immediately + match task.await { + Ok(Ok(text)) => { + tracing::debug!("Chunk {} completed: {:?}", chunk_index, text); + completed.push(ChunkResult { text, chunk_index }); + } + Ok(Err(e)) => { + tracing::warn!("Chunk {} transcription failed: {}", chunk_index, e); + // Add empty result to maintain ordering + completed.push(ChunkResult { + text: String::new(), + chunk_index, + }); + } + Err(e) => { + tracing::warn!("Chunk {} task panicked: {}", chunk_index, e); + completed.push(ChunkResult { + text: String::new(), + chunk_index, + }); + } + } + } else { + remaining_tasks.push((chunk_index, task)); + } + } + + self.eager_chunk_tasks = remaining_tasks; + completed + } + + /// Wait for all remaining chunk tasks to complete + async fn wait_for_chunk_tasks(&mut self) -> Vec { + let mut results = Vec::new(); + + for (chunk_index, task) in self.eager_chunk_tasks.drain(..) { + match task.await { + Ok(Ok(text)) => { + tracing::debug!("Chunk {} completed (waited): {:?}", chunk_index, text); + results.push(ChunkResult { text, chunk_index }); + } + Ok(Err(e)) => { + tracing::warn!("Chunk {} transcription failed: {}", chunk_index, e); + results.push(ChunkResult { + text: String::new(), + chunk_index, + }); + } + Err(e) => { + if e.is_cancelled() { + tracing::debug!("Chunk {} task was cancelled", chunk_index); + } else { + tracing::warn!("Chunk {} task panicked: {}", chunk_index, e); + } + results.push(ChunkResult { + text: String::new(), + chunk_index, + }); + } + } + } + + results + } + + /// Finish eager recording: wait for all chunks, transcribe tail, combine results + async fn finish_eager_recording( + &mut self, + state: &mut State, + transcriber: Arc, + ) -> Option { + // Extract state data + let (accumulated_audio, mut chunk_results) = match state { + State::EagerRecording { + accumulated_audio, + chunk_results, + .. + } => (accumulated_audio.clone(), chunk_results.clone()), + _ => return None, + }; + + let audio_duration = accumulated_audio.len() as f32 / 16000.0; + tracing::info!( + "Finishing eager recording: {:.1}s of audio, {} chunks already transcribed", + audio_duration, + chunk_results.len() + ); + + // Wait for any in-flight chunk tasks + let mut waited_results = self.wait_for_chunk_tasks().await; + chunk_results.append(&mut waited_results); + + // Transcribe the tail (audio after last complete chunk) + let eager_config = EagerConfig::from_whisper_config(&self.config.whisper); + let chunks_sent = chunk_results + .iter() + .map(|r| r.chunk_index) + .max() + .map(|i| i + 1) + .unwrap_or(0); + let tail_start = chunks_sent * eager_config.stride_samples(); + + if tail_start < accumulated_audio.len() { + let tail_audio = accumulated_audio[tail_start..].to_vec(); + let tail_duration = tail_audio.len() as f32 / 16000.0; + + if tail_duration >= 0.3 { + tracing::debug!( + "Transcribing tail audio: {:.1}s (from sample {})", + tail_duration, + tail_start + ); + + let tail_transcriber = transcriber.clone(); + match tokio::task::spawn_blocking(move || tail_transcriber.transcribe(&tail_audio)) + .await + { + Ok(Ok(text)) => { + tracing::debug!("Tail transcription: {:?}", text); + chunk_results.push(ChunkResult { + text, + chunk_index: chunks_sent, + }); + } + Ok(Err(e)) => { + tracing::warn!("Tail transcription failed: {}", e); + } + Err(e) => { + tracing::warn!("Tail transcription task panicked: {}", e); + } + } + } + } + + // Combine all chunk results + let combined = eager::combine_chunk_results(chunk_results); + tracing::info!("Combined eager transcription: {:?}", combined); + + if combined.is_empty() { + None + } else { + Some(combined) + } + } + /// Start transcription task (non-blocking, stores JoinHandle for later completion) /// Returns true if transcription was started, false if skipped (too short) async fn start_transcription_task( @@ -499,7 +928,13 @@ impl Daemon { // Send notification if enabled if self.config.output.notification.on_recording_stop { - send_notification("Recording Stopped", "Transcribing...", self.config.output.notification.show_engine_icon, self.config.engine).await; + send_notification( + "Recording Stopped", + "Transcribing...", + self.config.output.notification.show_engine_icon, + self.config.engine, + ) + .await; } // Stop recording and get samples @@ -583,9 +1018,7 @@ impl Daemon { // Apply post-processing command (profile overrides default) let final_text = if let Some(profile) = active_profile { if let Some(ref cmd) = profile.post_process_command { - let timeout_ms = profile - .post_process_timeout_ms - .unwrap_or(30000); + let timeout_ms = profile.post_process_timeout_ms.unwrap_or(30000); let profile_config = crate::config::PostProcessConfig { command: cmd.clone(), timeout_ms, @@ -652,18 +1085,15 @@ impl Daemon { }; let file_mode = &self.config.output.file_mode; - match write_transcription_to_file(&output_path, &final_text, file_mode).await + match write_transcription_to_file(&output_path, &final_text, file_mode) + .await { Ok(()) => { let mode_str = match file_mode { FileMode::Overwrite => "wrote", FileMode::Append => "appended", }; - tracing::info!( - "{} transcription to {:?}", - mode_str, - output_path - ); + tracing::info!("{} transcription to {:?}", mode_str, output_path); } Err(e) => { tracing::error!( @@ -720,7 +1150,8 @@ impl Daemon { &final_text, self.config.output.notification.show_engine_icon, self.config.engine, - ).await; + ) + .await; } *state = State::Idle; @@ -801,7 +1232,10 @@ impl Daemon { let mut hotkey_listener = if self.config.hotkey.enabled { tracing::info!("Hotkey: {}", self.config.hotkey.key); let secondary_model = self.config.whisper.secondary_model.clone(); - Some(hotkey::create_listener(&self.config.hotkey, secondary_model)?) + Some(hotkey::create_listener( + &self.config.hotkey, + secondary_model, + )?) } else { tracing::info!( "Built-in hotkey disabled, use 'voxtype record' commands or compositor keybindings" @@ -837,8 +1271,15 @@ impl Daemon { } } crate::config::TranscriptionEngine::Parakeet => { - // Parakeet uses its own model loading - transcriber_preloaded = Some(Arc::from(crate::transcribe::create_transcriber(&self.config)?)); + if self.should_use_streaming() { + // Pre-load streaming transcriber (Nemotron) — no batch transcriber needed + self.init_streaming_transcriber()?; + } else { + // Parakeet batch mode uses its own model loading + transcriber_preloaded = Some(Arc::from( + crate::transcribe::create_transcriber(&self.config)?, + )); + } } } tracing::info!("Model loaded, ready for voice input"); @@ -869,6 +1310,9 @@ impl Daemon { // Audio capture (created fresh for each recording) let mut audio_capture: Option> = None; + // Audio receiver for streaming mode (receives chunks from audio capture) + let mut audio_rx: Option>> = None; + // Recording timeout let max_duration = Duration::from_secs(self.config.audio.max_duration_secs as u64); @@ -958,23 +1402,53 @@ impl Daemon { match audio::create_capture(&self.config.audio) { Ok(mut capture) => { tracing::debug!("Audio capture created, starting..."); - if let Err(e) = capture.start().await { - tracing::error!("Failed to start audio: {}", e); - continue; - } - tracing::debug!("Audio capture started successfully"); - audio_capture = Some(capture); - state = State::Recording { - started_at: std::time::Instant::now(), - model_override: model_override.clone(), - }; - self.update_state("recording"); - self.play_feedback(SoundEvent::RecordingStart); - - // Run pre-recording hook (e.g., enter compositor submap for cancel) - if let Some(cmd) = &self.config.output.pre_recording_command { - if let Err(e) = output::run_hook(cmd, "pre_recording").await { - tracing::warn!("{}", e); + match capture.start().await { + Ok(rx) => { + tracing::debug!("Audio capture started successfully"); + + // Check if streaming mode should be used + if self.should_use_streaming() && self.streaming_chunk_tx.is_some() { + tracing::info!("Using streaming transcription (Nemotron)"); + self.begin_streaming_session().await; + audio_rx = Some(rx); + audio_capture = Some(capture); + state = State::StreamingRecording { + started_at: std::time::Instant::now(), + model_override: model_override.clone(), + audio_buffer: Vec::new(), + text_output_so_far: String::new(), + }; + } else if self.config.whisper.eager_processing { + tracing::info!("Using eager input processing"); + audio_capture = Some(capture); + state = State::EagerRecording { + started_at: std::time::Instant::now(), + model_override: model_override.clone(), + accumulated_audio: Vec::new(), + chunks_sent: 0, + chunk_results: Vec::new(), + tasks_in_flight: 0, + }; + } else { + audio_capture = Some(capture); + state = State::Recording { + started_at: std::time::Instant::now(), + model_override: model_override.clone(), + }; + } + self.update_state("recording"); + self.play_feedback(SoundEvent::RecordingStart); + + // Run pre-recording hook (e.g., enter compositor submap for cancel) + if let Some(cmd) = &self.config.output.pre_recording_command { + if let Err(e) = output::run_hook(cmd, "pre_recording").await { + tracing::warn!("{}", e); + } + } + } + Err(e) => { + tracing::error!("Failed to start audio: {}", e); + continue; } } } @@ -1006,6 +1480,123 @@ impl Daemon { &mut audio_capture, transcriber, ).await; + } else if state.is_streaming_recording() { + // Handle streaming recording stop + let duration = state.recording_duration().unwrap_or_default(); + let text_so_far = match &state { + State::StreamingRecording { text_output_so_far, .. } => text_output_so_far.clone(), + _ => String::new(), + }; + tracing::info!( + "Streaming recording stopped ({:.1}s), already typed {} chars", + duration.as_secs_f32(), + text_so_far.len() + ); + + self.play_feedback(SoundEvent::RecordingStop); + + // Stop audio capture + audio_rx = None; + if let Some(mut capture) = audio_capture.take() { + let _ = capture.stop().await; + } + + // Flush the streaming transcriber and get any remaining text + // Drain any buffered audio first + if let State::StreamingRecording { audio_buffer, .. } = &state { + if !audio_buffer.is_empty() { + let remaining = audio_buffer.clone(); + if let Some(ref tx) = self.streaming_chunk_tx { + let _ = tx.send(StreamingCommand::Audio(remaining)).await; + } + } + } + + // Flush streaming transcriber and output any remaining text + let flush_text = self.stop_streaming().await; + if !flush_text.is_empty() { + tracing::debug!("Flush produced text: {:?}", flush_text); + // Take the cached chain for the final output, then clear cache + let (chain, cached_idx) = self.take_streaming_output_cache(); + let chain = chain.unwrap_or_else(|| output::create_output_chain(&self.config.output)); + if let Some(idx) = cached_idx { + if let Err(e) = output::output_with_cached_index(&chain, &flush_text, idx).await { + tracing::error!("Failed to output streaming flush text: {}", e); + } + } else { + let output_options = output::OutputOptions { + pre_output_command: None, + post_output_command: None, + }; + if let Err(e) = output::output_with_fallback(&chain, &flush_text, output_options).await { + tracing::error!("Failed to output streaming flush text: {}", e); + } + } + } else { + // No flush text, just clear the cache + self.take_streaming_output_cache(); + } + + if self.config.output.notification.on_recording_stop { + send_notification("Streaming Complete", "Done", self.config.output.notification.show_engine_icon, self.config.engine).await; + } + + // Run post_output_command to reset compositor submap + if let Some(cmd) = &self.config.output.post_output_command { + if let Err(e) = output::run_hook(cmd, "post_output").await { + tracing::warn!("{}", e); + } + } + + self.reset_to_idle(&mut state).await; + } else if state.is_eager_recording() { + // Handle eager recording stop - extract model_override first + let model_override = match &state { + State::EagerRecording { model_override, .. } => model_override.clone(), + _ => None, + }; + + let duration = state.recording_duration().unwrap_or_default(); + tracing::info!("Eager recording stopped ({:.1}s)", duration.as_secs_f32()); + + self.play_feedback(SoundEvent::RecordingStop); + + if self.config.output.notification.on_recording_stop { + send_notification("Recording Stopped", "Transcribing...", self.config.output.notification.show_engine_icon, self.config.engine).await; + } + + // Stop audio capture and get remaining samples + if let Some(mut capture) = audio_capture.take() { + if let Ok(final_samples) = capture.stop().await { + // Add final samples to accumulated audio + if let State::EagerRecording { accumulated_audio, .. } = &mut state { + accumulated_audio.extend(final_samples); + } + } + } + + let transcriber = match self.get_transcriber_for_recording( + model_override.as_deref(), + &transcriber_preloaded, + ).await { + Ok(t) => t, + Err(()) => { + state = State::Idle; + self.update_state("idle"); + continue; + } + }; + + self.update_state("transcribing"); + + if let Some(text) = self.finish_eager_recording(&mut state, transcriber).await { + // Move to outputting state and handle via transcription result flow + state = State::Transcribing { audio: Vec::new() }; + self.handle_transcription_result(&mut state, Ok(Ok(text))).await; + } else { + tracing::debug!("Eager recording produced empty result"); + self.reset_to_idle(&mut state).await; + } } } @@ -1066,23 +1657,52 @@ impl Daemon { match audio::create_capture(&self.config.audio) { Ok(mut capture) => { - if let Err(e) = capture.start().await { - tracing::error!("Failed to start audio: {}", e); - self.play_feedback(SoundEvent::Error); - continue; - } - audio_capture = Some(capture); - state = State::Recording { - started_at: std::time::Instant::now(), - model_override: model_override.clone(), - }; - self.update_state("recording"); - self.play_feedback(SoundEvent::RecordingStart); - - // Run pre-recording hook (e.g., enter compositor submap for cancel) - if let Some(cmd) = &self.config.output.pre_recording_command { - if let Err(e) = output::run_hook(cmd, "pre_recording").await { - tracing::warn!("{}", e); + match capture.start().await { + Ok(rx) => { + // Check if streaming mode should be used + if self.should_use_streaming() && self.streaming_chunk_tx.is_some() { + tracing::info!("Using streaming transcription (Nemotron, toggle mode)"); + self.begin_streaming_session().await; + audio_rx = Some(rx); + audio_capture = Some(capture); + state = State::StreamingRecording { + started_at: std::time::Instant::now(), + model_override: model_override.clone(), + audio_buffer: Vec::new(), + text_output_so_far: String::new(), + }; + } else if self.config.whisper.eager_processing { + tracing::info!("Using eager input processing"); + audio_capture = Some(capture); + state = State::EagerRecording { + started_at: std::time::Instant::now(), + model_override: model_override.clone(), + accumulated_audio: Vec::new(), + chunks_sent: 0, + chunk_results: Vec::new(), + tasks_in_flight: 0, + }; + } else { + audio_capture = Some(capture); + state = State::Recording { + started_at: std::time::Instant::now(), + model_override: model_override.clone(), + }; + } + self.update_state("recording"); + self.play_feedback(SoundEvent::RecordingStart); + + // Run pre-recording hook (e.g., enter compositor submap for cancel) + if let Some(cmd) = &self.config.output.pre_recording_command { + if let Err(e) = output::run_hook(cmd, "pre_recording").await { + tracing::warn!("{}", e); + } + } + } + Err(e) => { + tracing::error!("Failed to start audio: {}", e); + self.play_feedback(SoundEvent::Error); + continue; } } } @@ -1091,6 +1711,64 @@ impl Daemon { self.play_feedback(SoundEvent::Error); } } + } else if state.is_streaming_recording() { + // Handle streaming recording stop (toggle mode) + let duration = state.recording_duration().unwrap_or_default(); + tracing::info!( + "Streaming recording stopped (toggle, {:.1}s)", + duration.as_secs_f32() + ); + + self.play_feedback(SoundEvent::RecordingStop); + + audio_rx = None; + if let Some(mut capture) = audio_capture.take() { + let _ = capture.stop().await; + } + + // Flush remaining audio buffer + if let State::StreamingRecording { audio_buffer, .. } = &state { + if !audio_buffer.is_empty() { + let remaining = audio_buffer.clone(); + if let Some(ref tx) = self.streaming_chunk_tx { + let _ = tx.send(StreamingCommand::Audio(remaining)).await; + } + } + } + + let flush_text = self.stop_streaming().await; + if !flush_text.is_empty() { + tracing::debug!("Flush produced text: {:?}", flush_text); + let (chain, cached_idx) = self.take_streaming_output_cache(); + let chain = chain.unwrap_or_else(|| output::create_output_chain(&self.config.output)); + if let Some(idx) = cached_idx { + if let Err(e) = output::output_with_cached_index(&chain, &flush_text, idx).await { + tracing::error!("Failed to output streaming flush text: {}", e); + } + } else { + let output_options = output::OutputOptions { + pre_output_command: None, + post_output_command: None, + }; + if let Err(e) = output::output_with_fallback(&chain, &flush_text, output_options).await { + tracing::error!("Failed to output streaming flush text: {}", e); + } + } + } else { + self.take_streaming_output_cache(); + } + + if self.config.output.notification.on_recording_stop { + send_notification("Streaming Complete", "Done", self.config.output.notification.show_engine_icon, self.config.engine).await; + } + + if let Some(cmd) = &self.config.output.post_output_command { + if let Err(e) = output::run_hook(cmd, "post_output").await { + tracing::warn!("{}", e); + } + } + + self.reset_to_idle(&mut state).await; } else if let State::Recording { model_override: current_model_override, .. } = &state { let transcriber = match self.get_transcriber_for_recording( current_model_override.as_deref(), @@ -1110,6 +1788,52 @@ impl Daemon { &mut audio_capture, transcriber, ).await; + } else if state.is_eager_recording() { + // Handle eager recording stop in toggle mode - extract model_override first + let model_override = match &state { + State::EagerRecording { model_override, .. } => model_override.clone(), + _ => None, + }; + + let duration = state.recording_duration().unwrap_or_default(); + tracing::info!("Eager recording stopped ({:.1}s)", duration.as_secs_f32()); + + self.play_feedback(SoundEvent::RecordingStop); + + if self.config.output.notification.on_recording_stop { + send_notification("Recording Stopped", "Transcribing...", self.config.output.notification.show_engine_icon, self.config.engine).await; + } + + // Stop audio capture and get remaining samples + if let Some(mut capture) = audio_capture.take() { + if let Ok(final_samples) = capture.stop().await { + if let State::EagerRecording { accumulated_audio, .. } = &mut state { + accumulated_audio.extend(final_samples); + } + } + } + + let transcriber = match self.get_transcriber_for_recording( + model_override.as_deref(), + &transcriber_preloaded, + ).await { + Ok(t) => t, + Err(()) => { + state = State::Idle; + self.update_state("idle"); + continue; + } + }; + + self.update_state("transcribing"); + + if let Some(text) = self.finish_eager_recording(&mut state, transcriber).await { + state = State::Transcribing { audio: Vec::new() }; + self.handle_transcription_result(&mut state, Ok(Ok(text))).await; + } else { + tracing::debug!("Eager recording produced empty result"); + self.reset_to_idle(&mut state).await; + } } } @@ -1126,15 +1850,24 @@ impl Daemon { tracing::info!("Recording cancelled via hotkey"); // Stop recording and discard audio + audio_rx = None; if let Some(mut capture) = audio_capture.take() { let _ = capture.stop().await; } + // Cancel streaming transcription if active + self.cancel_streaming().await; + // Cancel any pending model load task if let Some(task) = self.model_load_task.take() { task.abort(); } + // Cancel any pending eager chunk tasks + for (_, task) in self.eager_chunk_tasks.drain(..) { + task.abort(); + } + cleanup_output_mode_override(); cleanup_model_override(); cleanup_profile_override(); @@ -1184,6 +1917,82 @@ impl Daemon { } } + // Streaming: receive audio chunks from capture and feed to transcriber + Some(audio_chunk) = async { + match &mut audio_rx { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + }, if state.is_streaming_recording() => { + if let State::StreamingRecording { audio_buffer, .. } = &mut state { + audio_buffer.extend(audio_chunk); + + let chunk_size = self.streaming_chunk_size; + + // Send full chunks to the streaming transcriber + while audio_buffer.len() >= chunk_size { + let chunk: Vec = audio_buffer.drain(..chunk_size).collect(); + if let Some(ref tx) = self.streaming_chunk_tx { + if tx.send(StreamingCommand::Audio(chunk)).await.is_err() { + tracing::warn!("Streaming transcriber channel closed"); + break; + } + } + } + } + } + + // Streaming: receive text deltas from transcriber and output them live + Some(text_delta) = async { + match &mut self.streaming_text_rx { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + }, if state.is_streaming_recording() => { + if !text_delta.is_empty() { + tracing::debug!("Streaming text delta: {:?}", text_delta); + + // Use cached output chain and driver index to avoid per-delta overhead + let output_ok = if let (Some(ref chain), Some(cached_idx)) = + (&self.streaming_output_chain, self.streaming_output_index) + { + match output::output_with_cached_index(chain, &text_delta, cached_idx).await { + Ok(new_idx) => { + // Update cached index if driver changed due to fallback + if new_idx != cached_idx { + self.streaming_output_index = Some(new_idx); + } + true + } + Err(e) => { + tracing::error!("Failed to output streaming text: {}", e); + false + } + } + } else { + // No cached chain (shouldn't happen, but fall back gracefully) + let output_chain = output::create_output_chain(&self.config.output); + let output_options = output::OutputOptions { + pre_output_command: None, + post_output_command: None, + }; + match output::output_with_fallback(&output_chain, &text_delta, output_options).await { + Ok(()) => true, + Err(e) => { + tracing::error!("Failed to output streaming text: {}", e); + false + } + } + }; + + if output_ok { + if let State::StreamingRecording { text_output_so_far, .. } = &mut state { + text_output_so_far.push_str(&text_delta); + } + } + } + } + // Check for recording timeout and cancel requests _ = tokio::time::sleep(Duration::from_millis(100)), if state.is_recording() => { // Check for cancel request first @@ -1191,15 +2000,24 @@ impl Daemon { tracing::info!("Recording cancelled"); // Stop recording and discard audio + audio_rx = None; if let Some(mut capture) = audio_capture.take() { let _ = capture.stop().await; } + // Cancel streaming transcription if active + self.cancel_streaming().await; + // Cancel any pending model load task if let Some(task) = self.model_load_task.take() { task.abort(); } + // Cancel any pending eager chunk tasks + for (_, task) in self.eager_chunk_tasks.drain(..) { + task.abort(); + } + cleanup_output_mode_override(); cleanup_model_override(); cleanup_profile_override(); @@ -1230,9 +2048,19 @@ impl Daemon { ); // Stop recording + audio_rx = None; if let Some(mut capture) = audio_capture.take() { let _ = capture.stop().await; } + + // Cancel streaming transcription if active + self.cancel_streaming().await; + + // Cancel any pending eager chunk tasks + for (_, task) in self.eager_chunk_tasks.drain(..) { + task.abort(); + } + cleanup_output_mode_override(); cleanup_model_override(); cleanup_profile_override(); @@ -1304,23 +2132,50 @@ impl Daemon { match audio::create_capture(&self.config.audio) { Ok(mut capture) => { - if let Err(e) = capture.start().await { - tracing::error!("Failed to start audio: {}", e); - } else { - audio_capture = Some(capture); - state = State::Recording { - started_at: std::time::Instant::now(), - model_override, - }; - self.update_state("recording"); - self.play_feedback(SoundEvent::RecordingStart); - - // Run pre-recording hook (e.g., enter compositor submap for cancel) - if let Some(cmd) = &self.config.output.pre_recording_command { - if let Err(e) = output::run_hook(cmd, "pre_recording").await { - tracing::warn!("{}", e); + match capture.start().await { + Ok(rx) => { + if self.should_use_streaming() && self.streaming_chunk_tx.is_some() { + tracing::info!("Using streaming transcription (Nemotron, external trigger)"); + self.begin_streaming_session().await; + audio_rx = Some(rx); + audio_capture = Some(capture); + state = State::StreamingRecording { + started_at: std::time::Instant::now(), + model_override, + audio_buffer: Vec::new(), + text_output_so_far: String::new(), + }; + } else if self.config.whisper.eager_processing { + tracing::info!("Using eager input processing"); + audio_capture = Some(capture); + state = State::EagerRecording { + started_at: std::time::Instant::now(), + model_override, + accumulated_audio: Vec::new(), + chunks_sent: 0, + chunk_results: Vec::new(), + tasks_in_flight: 0, + }; + } else { + audio_capture = Some(capture); + state = State::Recording { + started_at: std::time::Instant::now(), + model_override, + }; + } + self.update_state("recording"); + self.play_feedback(SoundEvent::RecordingStart); + + // Run pre-recording hook (e.g., enter compositor submap for cancel) + if let Some(cmd) = &self.config.output.pre_recording_command { + if let Err(e) = output::run_hook(cmd, "pre_recording").await { + tracing::warn!("{}", e); + } } } + Err(e) => { + tracing::error!("Failed to start audio: {}", e); + } } } Err(e) => { @@ -1334,7 +2189,63 @@ impl Daemon { // Handle SIGUSR2 - stop recording (for compositor keybindings) _ = sigusr2.recv() => { tracing::debug!("Received SIGUSR2 (stop recording)"); - if let State::Recording { model_override, .. } = &state { + if state.is_streaming_recording() { + // Handle streaming recording stop via external trigger + let duration = state.recording_duration().unwrap_or_default(); + tracing::info!( + "Streaming recording stopped (external, {:.1}s)", + duration.as_secs_f32() + ); + + self.play_feedback(SoundEvent::RecordingStop); + + audio_rx = None; + if let Some(mut capture) = audio_capture.take() { + let _ = capture.stop().await; + } + + if let State::StreamingRecording { audio_buffer, .. } = &state { + if !audio_buffer.is_empty() { + let remaining = audio_buffer.clone(); + if let Some(ref tx) = self.streaming_chunk_tx { + let _ = tx.send(StreamingCommand::Audio(remaining)).await; + } + } + } + + let flush_text = self.stop_streaming().await; + if !flush_text.is_empty() { + tracing::debug!("Flush produced text: {:?}", flush_text); + let (chain, cached_idx) = self.take_streaming_output_cache(); + let chain = chain.unwrap_or_else(|| output::create_output_chain(&self.config.output)); + if let Some(idx) = cached_idx { + if let Err(e) = output::output_with_cached_index(&chain, &flush_text, idx).await { + tracing::error!("Failed to output streaming flush text: {}", e); + } + } else { + let output_options = output::OutputOptions { + pre_output_command: None, + post_output_command: None, + }; + if let Err(e) = output::output_with_fallback(&chain, &flush_text, output_options).await { + tracing::error!("Failed to output streaming flush text: {}", e); + } + } + } else { + self.take_streaming_output_cache(); + } + if self.config.output.notification.on_recording_stop { + send_notification("Streaming Complete", "Done", self.config.output.notification.show_engine_icon, self.config.engine).await; + } + + if let Some(cmd) = &self.config.output.post_output_command { + if let Err(e) = output::run_hook(cmd, "post_output").await { + tracing::warn!("{}", e); + } + } + + self.reset_to_idle(&mut state).await; + } else if let State::Recording { model_override, .. } = &state { let transcriber = match self.get_transcriber_for_recording( model_override.as_deref(), &transcriber_preloaded, @@ -1352,6 +2263,52 @@ impl Daemon { &mut audio_capture, transcriber, ).await; + } else if state.is_eager_recording() { + // Handle eager recording stop via external trigger - extract model_override first + let model_override = match &state { + State::EagerRecording { model_override, .. } => model_override.clone(), + _ => None, + }; + + let duration = state.recording_duration().unwrap_or_default(); + tracing::info!("Eager recording stopped ({:.1}s)", duration.as_secs_f32()); + + self.play_feedback(SoundEvent::RecordingStop); + + if self.config.output.notification.on_recording_stop { + send_notification("Recording Stopped", "Transcribing...", self.config.output.notification.show_engine_icon, self.config.engine).await; + } + + // Stop audio capture and get remaining samples + if let Some(mut capture) = audio_capture.take() { + if let Ok(final_samples) = capture.stop().await { + if let State::EagerRecording { accumulated_audio, .. } = &mut state { + accumulated_audio.extend(final_samples); + } + } + } + + let transcriber = match self.get_transcriber_for_recording( + model_override.as_deref(), + &transcriber_preloaded, + ).await { + Ok(t) => t, + Err(()) => { + state = State::Idle; + self.update_state("idle"); + continue; + } + }; + + self.update_state("transcribing"); + + if let Some(text) = self.finish_eager_recording(&mut state, transcriber).await { + state = State::Transcribing { audio: Vec::new() }; + self.handle_transcription_result(&mut state, Ok(Ok(text))).await; + } else { + tracing::debug!("Eager recording produced empty result"); + self.reset_to_idle(&mut state).await; + } } } @@ -1436,6 +2393,20 @@ impl Daemon { task.abort(); } + // Abort any pending eager chunk tasks + for (_, task) in self.eager_chunk_tasks.drain(..) { + task.abort(); + } + + // Shut down the streaming transcription task + if let Some(tx) = self.streaming_chunk_tx.take() { + let _ = tx.send(StreamingCommand::Shutdown).await; + } + if let Some(task) = self.streaming_task.take() { + let _ = task.await; + } + self.streaming_text_rx = None; + // Remove state file on shutdown if let Some(ref path) = self.state_file_path { cleanup_state_file(path); @@ -1635,7 +2606,7 @@ mod tests { // Should not panic }); } - + fn test_pidlock_acquisition_succeeds() { with_test_runtime_dir(|dir| { let lock_path = dir.join("voxtype.lock"); diff --git a/src/eager.rs b/src/eager.rs new file mode 100644 index 00000000..ad1a93e3 --- /dev/null +++ b/src/eager.rs @@ -0,0 +1,385 @@ +//! Eager input processing module +//! +//! Handles chunking audio during recording and processing chunks in parallel +//! with continued recording. This reduces perceived latency on slower machines. +//! +//! The basic approach: +//! 1. During recording, split audio into fixed-size chunks with small overlaps +//! 2. As each chunk is ready, spawn a transcription task for it +//! 3. Continue recording while transcription runs in parallel +//! 4. At the end, combine all chunk results, deduplicating at boundaries + +use crate::state::ChunkResult; + +/// Configuration for eager processing +#[derive(Debug, Clone)] +pub struct EagerConfig { + /// Duration of each chunk in seconds + pub chunk_secs: f32, + /// Overlap between adjacent chunks in seconds + pub overlap_secs: f32, + /// Sample rate (assumed 16kHz for whisper) + pub sample_rate: u32, +} + +impl EagerConfig { + /// Create config from whisper config settings + pub fn from_whisper_config(config: &crate::config::WhisperConfig) -> Self { + Self { + chunk_secs: config.eager_chunk_secs, + overlap_secs: config.eager_overlap_secs, + sample_rate: 16000, // Whisper expects 16kHz + } + } + + /// Get chunk size in samples + pub fn chunk_samples(&self) -> usize { + (self.chunk_secs * self.sample_rate as f32) as usize + } + + /// Get overlap size in samples + pub fn overlap_samples(&self) -> usize { + (self.overlap_secs * self.sample_rate as f32) as usize + } + + /// Get the stride between chunk starts (chunk - overlap) + pub fn stride_samples(&self) -> usize { + self.chunk_samples().saturating_sub(self.overlap_samples()) + } +} + +/// Extract a chunk from accumulated audio for transcription. +/// Returns None if there isn't enough audio for the requested chunk yet. +/// +/// # Arguments +/// * `accumulated` - All audio samples collected so far +/// * `chunk_index` - Which chunk to extract (0-based) +/// * `config` - Eager processing configuration +/// +/// # Returns +/// * `Some(Vec)` - The audio chunk to transcribe +/// * `None` - Not enough audio yet for this chunk +pub fn extract_chunk( + accumulated: &[f32], + chunk_index: usize, + config: &EagerConfig, +) -> Option> { + let chunk_size = config.chunk_samples(); + let stride = config.stride_samples(); + + // Calculate chunk boundaries + let start = chunk_index * stride; + let end = start + chunk_size; + + // Check if we have enough samples + if end > accumulated.len() { + return None; + } + + Some(accumulated[start..end].to_vec()) +} + +/// Check how many complete chunks are available in the accumulated audio. +/// A chunk is "complete" when we have enough samples to extract it plus +/// the overlap for the next chunk (so we don't cut off mid-word). +/// +/// # Arguments +/// * `accumulated_len` - Number of samples accumulated so far +/// * `config` - Eager processing configuration +/// +/// # Returns +/// Number of complete chunks available +pub fn count_complete_chunks(accumulated_len: usize, config: &EagerConfig) -> usize { + let stride = config.stride_samples(); + let chunk_size = config.chunk_samples(); + + if accumulated_len < chunk_size { + return 0; + } + + // Number of chunks where we have the full chunk + overlap for boundary handling + let available_after_first = accumulated_len.saturating_sub(chunk_size); + 1 + available_after_first / stride +} + +/// Combine transcription results from multiple chunks, handling duplicates +/// at chunk boundaries. +/// +/// # Arguments +/// * `results` - Vector of chunk results (may be in any order) +/// +/// # Returns +/// Combined transcription text with duplicates at boundaries removed +pub fn combine_chunk_results(mut results: Vec) -> String { + if results.is_empty() { + return String::new(); + } + + // Sort by chunk index to ensure correct order + results.sort_by_key(|r| r.chunk_index); + + if results.len() == 1 { + return results[0].text.clone(); + } + + let mut combined = String::new(); + + for (i, result) in results.iter().enumerate() { + if i == 0 { + // First chunk: use full text + combined = result.text.clone(); + } else { + // Subsequent chunks: deduplicate at boundary + let new_text = deduplicate_boundary(&combined, &result.text); + if !new_text.is_empty() { + if !combined.is_empty() && !combined.ends_with(' ') && !new_text.starts_with(' ') { + combined.push(' '); + } + combined.push_str(&new_text); + } + } + } + + combined.trim().to_string() +} + +/// Remove duplicate text at the boundary between previous and new transcription. +/// +/// This uses a simple approach: look for the longest suffix of `previous` that +/// matches a prefix of `new_text`, and return `new_text` with that prefix removed. +/// +/// # Arguments +/// * `previous` - Text transcribed so far (from earlier chunks) +/// * `new_text` - Text from the new chunk +/// +/// # Returns +/// The portion of `new_text` that isn't a duplicate of `previous` +fn deduplicate_boundary(previous: &str, new_text: &str) -> String { + let previous_words: Vec<&str> = previous.split_whitespace().collect(); + let new_words: Vec<&str> = new_text.split_whitespace().collect(); + + if previous_words.is_empty() || new_words.is_empty() { + return new_text.to_string(); + } + + // Look for overlap: find the longest suffix of previous that matches + // a prefix of new_text + let max_overlap = previous_words.len().min(new_words.len()); + + let mut best_overlap = 0; + for overlap_len in 1..=max_overlap { + let prev_suffix = &previous_words[previous_words.len() - overlap_len..]; + let new_prefix = &new_words[..overlap_len]; + + // Case-insensitive comparison for robustness + if prev_suffix + .iter() + .zip(new_prefix.iter()) + .all(|(a, b)| a.eq_ignore_ascii_case(b)) + { + best_overlap = overlap_len; + } + } + + if best_overlap > 0 { + // Remove the overlapping prefix from new_text + new_words[best_overlap..].join(" ") + } else { + new_text.to_string() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config() -> EagerConfig { + EagerConfig { + chunk_secs: 5.0, + overlap_secs: 0.5, + sample_rate: 16000, + } + } + + #[test] + fn test_chunk_samples() { + let config = test_config(); + assert_eq!(config.chunk_samples(), 80000); // 5 seconds * 16000 Hz + assert_eq!(config.overlap_samples(), 8000); // 0.5 seconds * 16000 Hz + assert_eq!(config.stride_samples(), 72000); // chunk - overlap + } + + #[test] + fn test_count_complete_chunks_empty() { + let config = test_config(); + assert_eq!(count_complete_chunks(0, &config), 0); + } + + #[test] + fn test_count_complete_chunks_less_than_one() { + let config = test_config(); + // Less than one chunk + assert_eq!(count_complete_chunks(40000, &config), 0); + } + + #[test] + fn test_count_complete_chunks_one() { + let config = test_config(); + // Exactly one chunk + assert_eq!(count_complete_chunks(80000, &config), 1); + } + + #[test] + fn test_count_complete_chunks_multiple() { + let config = test_config(); + // First chunk (80000) + stride for second (72000) = 152000 + assert_eq!(count_complete_chunks(152000, &config), 2); + // First chunk + 2 strides = 224000 + assert_eq!(count_complete_chunks(224000, &config), 3); + } + + #[test] + fn test_extract_chunk_insufficient_data() { + let config = test_config(); + let audio = vec![0.0; 40000]; // Less than one chunk + assert!(extract_chunk(&audio, 0, &config).is_none()); + } + + #[test] + fn test_extract_chunk_first() { + let config = test_config(); + let audio: Vec = (0..100000).map(|i| i as f32).collect(); + let chunk = extract_chunk(&audio, 0, &config).unwrap(); + assert_eq!(chunk.len(), 80000); + assert_eq!(chunk[0], 0.0); + } + + #[test] + fn test_extract_chunk_second() { + let config = test_config(); + let audio: Vec = (0..200000).map(|i| i as f32).collect(); + let chunk = extract_chunk(&audio, 1, &config).unwrap(); + assert_eq!(chunk.len(), 80000); + // Second chunk starts at stride (72000) + assert_eq!(chunk[0], 72000.0); + } + + #[test] + fn test_deduplicate_boundary_no_overlap() { + let result = deduplicate_boundary("hello world", "foo bar"); + assert_eq!(result, "foo bar"); + } + + #[test] + fn test_deduplicate_boundary_single_word_overlap() { + let result = deduplicate_boundary("hello world", "world foo bar"); + assert_eq!(result, "foo bar"); + } + + #[test] + fn test_deduplicate_boundary_multi_word_overlap() { + let result = deduplicate_boundary("hello world foo", "world foo bar baz"); + assert_eq!(result, "bar baz"); + } + + #[test] + fn test_deduplicate_boundary_case_insensitive() { + let result = deduplicate_boundary("Hello World", "world foo"); + assert_eq!(result, "foo"); + } + + #[test] + fn test_deduplicate_boundary_empty_previous() { + let result = deduplicate_boundary("", "hello world"); + assert_eq!(result, "hello world"); + } + + #[test] + fn test_deduplicate_boundary_empty_new() { + let result = deduplicate_boundary("hello world", ""); + assert_eq!(result, ""); + } + + #[test] + fn test_combine_chunk_results_empty() { + let results: Vec = vec![]; + assert_eq!(combine_chunk_results(results), ""); + } + + #[test] + fn test_combine_chunk_results_single() { + let results = vec![ChunkResult { + text: "hello world".to_string(), + chunk_index: 0, + }]; + assert_eq!(combine_chunk_results(results), "hello world"); + } + + #[test] + fn test_combine_chunk_results_multiple_no_overlap() { + let results = vec![ + ChunkResult { + text: "hello world".to_string(), + chunk_index: 0, + }, + ChunkResult { + text: "foo bar".to_string(), + chunk_index: 1, + }, + ]; + assert_eq!(combine_chunk_results(results), "hello world foo bar"); + } + + #[test] + fn test_combine_chunk_results_with_overlap() { + let results = vec![ + ChunkResult { + text: "hello world foo".to_string(), + chunk_index: 0, + }, + ChunkResult { + text: "foo bar baz".to_string(), + chunk_index: 1, + }, + ]; + assert_eq!(combine_chunk_results(results), "hello world foo bar baz"); + } + + #[test] + fn test_combine_chunk_results_out_of_order() { + // Results can arrive out of order; they should be sorted by chunk_index + let results = vec![ + ChunkResult { + text: "bar baz".to_string(), + chunk_index: 1, + }, + ChunkResult { + text: "hello world bar".to_string(), + chunk_index: 0, + }, + ]; + assert_eq!(combine_chunk_results(results), "hello world bar baz"); + } + + #[test] + fn test_combine_chunk_results_three_chunks() { + let results = vec![ + ChunkResult { + text: "one two three".to_string(), + chunk_index: 0, + }, + ChunkResult { + text: "three four five".to_string(), + chunk_index: 1, + }, + ChunkResult { + text: "five six seven".to_string(), + chunk_index: 2, + }, + ]; + assert_eq!( + combine_chunk_results(results), + "one two three four five six seven" + ); + } +} diff --git a/src/error.rs b/src/error.rs index 75be313b..08910492 100644 --- a/src/error.rs +++ b/src/error.rs @@ -108,6 +108,9 @@ pub enum OutputError { #[error("wtype not found in PATH. Install via your package manager.")] WtypeNotFound, + #[error("eitype not found in PATH. Install via: cargo install eitype")] + EitypeNotFound, + #[error("wl-copy not found in PATH. Install wl-clipboard via your package manager.")] WlCopyNotFound, diff --git a/src/hotkey/evdev_listener.rs b/src/hotkey/evdev_listener.rs index fdb2ed4c..3803ab5b 100644 --- a/src/hotkey/evdev_listener.rs +++ b/src/hotkey/evdev_listener.rs @@ -544,8 +544,24 @@ fn evdev_listener_loop( /// Parse a key name string to evdev Key fn parse_key_name(name: &str) -> Result { + let trimmed = name.trim(); + + // Try parsing as a prefixed numeric keycode (e.g. "wev_234", "evtest_226") + if let Some(key) = parse_prefixed_keycode(trimmed)? { + return Ok(key); + } + + // Bare numeric values are ambiguous — require a prefix + if trimmed.parse::().is_ok() || trimmed.starts_with("0x") || trimmed.starts_with("0X") { + return Err(HotkeyError::UnknownKey(format!( + "{}. Bare numeric keycodes are ambiguous (wev/xev and evtest use different numbering).\n \ + Use a prefix: WEV_234, X11_234, XEV_234 (XKB keycode, offset by 8) or EVTEST_226 (kernel keycode)", + name + ))); + } + // Normalize: uppercase and replace - or space with _ - let normalized: String = name + let normalized: String = trimmed .chars() .map(|c| match c { '-' | ' ' => '_', @@ -627,11 +643,15 @@ fn parse_key_name(name: &str) -> Result { "KEY_PLAYPAUSE" => Key::KEY_PLAYPAUSE, "KEY_NEXTSONG" => Key::KEY_NEXTSONG, "KEY_PREVIOUSSONG" => Key::KEY_PREVIOUSSONG, + "KEY_RECORD" => Key::KEY_RECORD, + "KEY_REWIND" => Key::KEY_REWIND, + "KEY_FASTFORWARD" => Key::KEY_FASTFORWARD, + "KEY_MEDIA" => Key::KEY_MEDIA, // If not found, return error with suggestions _ => { return Err(HotkeyError::UnknownKey(format!( - "{}. Try: SCROLLLOCK, PAUSE, F13-F24, or run 'evtest' to find key names", + "{}. Try: SCROLLLOCK, PAUSE, MEDIA, F13-F24, a numeric keycode (e.g. 226), or run 'evtest' to find key names", name ))); } @@ -640,6 +660,66 @@ fn parse_key_name(name: &str) -> Result { Ok(key) } +/// XKB keycodes are offset by 8 from Linux kernel keycodes +const XKB_OFFSET: u16 = 8; + +/// Try to parse a prefixed numeric keycode string. +/// +/// Supported prefixes: +/// - `wev_`, `x11_`, `xev_` — XKB keycode (subtract 8 to get kernel keycode) +/// - `evtest_` — raw kernel keycode (used directly) +/// +/// Returns `Ok(None)` if the string doesn't match any prefix pattern. +/// Returns `Ok(Some(key))` on successful parse. +/// Returns `Err` if the prefix is recognized but the number is invalid. +fn parse_prefixed_keycode(s: &str) -> Result, HotkeyError> { + let normalized = s.to_ascii_uppercase(); + + let (number_str, is_xkb) = + if let Some(n) = normalized.strip_prefix("WEV_") { + (n, true) + } else if let Some(n) = normalized.strip_prefix("X11_") { + (n, true) + } else if let Some(n) = normalized.strip_prefix("XEV_") { + (n, true) + } else if let Some(n) = normalized.strip_prefix("EVTEST_") { + (n, false) + } else { + return Ok(None); + }; + + let code: u16 = if let Some(hex) = number_str.strip_prefix("0X") { + u16::from_str_radix(hex, 16) + } else { + number_str.parse() + } + .map_err(|_| { + HotkeyError::UnknownKey(format!( + "{}. The value after the prefix must be a decimal or 0x-prefixed hex number", + s + )) + })?; + + let kernel_code = if is_xkb { + code.checked_sub(XKB_OFFSET).ok_or_else(|| { + HotkeyError::UnknownKey(format!( + "{}. XKB keycode must be >= {} (the XKB offset)", + s, XKB_OFFSET + )) + })? + } else { + code + }; + + tracing::debug!( + "Parsed numeric keycode '{}' as kernel keycode {}", + s, + kernel_code + ); + + Ok(Some(Key::new(kernel_code))) +} + #[cfg(test)] mod tests { use super::*; @@ -657,6 +737,50 @@ mod tests { assert_eq!(parse_key_name("LALT").unwrap(), Key::KEY_LEFTALT); } + #[test] + fn test_parse_media_keys() { + assert_eq!(parse_key_name("MEDIA").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("KEY_MEDIA").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("RECORD").unwrap(), Key::KEY_RECORD); + assert_eq!(parse_key_name("FASTFORWARD").unwrap(), Key::KEY_FASTFORWARD); + assert_eq!(parse_key_name("REWIND").unwrap(), Key::KEY_REWIND); + } + + #[test] + fn test_parse_wev_keycode() { + // wev shows XKB keycode 234 for KEY_MEDIA (kernel 226 + 8) + assert_eq!(parse_key_name("wev_234").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("WEV_234").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("x11_234").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("xev_234").unwrap(), Key::KEY_MEDIA); + } + + #[test] + fn test_parse_evtest_keycode() { + // evtest shows raw kernel keycode 226 for KEY_MEDIA + assert_eq!(parse_key_name("evtest_226").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("EVTEST_226").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("evtest_70").unwrap(), Key::KEY_SCROLLLOCK); + // hex format + assert_eq!(parse_key_name("evtest_0xe2").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("EVTEST_0xE2").unwrap(), Key::KEY_MEDIA); + } + + #[test] + fn test_parse_wev_keycode_hex() { + // XKB keycode 0xEA = 234 decimal, minus 8 = 226 = KEY_MEDIA + assert_eq!(parse_key_name("wev_0xEA").unwrap(), Key::KEY_MEDIA); + assert_eq!(parse_key_name("WEV_0xea").unwrap(), Key::KEY_MEDIA); + } + + #[test] + fn test_bare_numeric_keycode_rejected() { + // Bare numbers should be rejected as ambiguous + assert!(parse_key_name("226").is_err()); + assert!(parse_key_name("234").is_err()); + assert!(parse_key_name("0x226").is_err()); + } + #[test] fn test_parse_key_name_error() { assert!(parse_key_name("INVALID_KEY_NAME").is_err()); diff --git a/src/lib.rs b/src/lib.rs index 474f31f8..fa5c615a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,7 @@ pub mod cli; pub mod config; pub mod cpu; pub mod daemon; +pub mod eager; pub mod error; pub mod hotkey; pub mod model_manager; diff --git a/src/main.rs b/src/main.rs index f4ba7b4c..9c77a6bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -124,6 +124,15 @@ async fn main() -> anyhow::Result<()> { if let Some(prompt) = cli.initial_prompt { config.whisper.initial_prompt = Some(prompt); } + if cli.eager_processing { + config.whisper.eager_processing = true; + } + if let Some(secs) = cli.eager_chunk_secs { + config.whisper.eager_chunk_secs = secs; + } + if let Some(secs) = cli.eager_overlap_secs { + config.whisper.eager_overlap_secs = secs; + } if let Some(ref driver_str) = cli.driver { match parse_driver_order(driver_str) { Ok(drivers) => { diff --git a/src/output/eitype.rs b/src/output/eitype.rs new file mode 100644 index 00000000..9344079d --- /dev/null +++ b/src/output/eitype.rs @@ -0,0 +1,231 @@ +//! eitype-based text output +//! +//! Uses eitype to simulate keyboard input via the Emulated Input (EI) protocol. +//! This works on compositors that support libei, including GNOME/Mutter and KDE, +//! which do not support the virtual-keyboard protocol used by wtype. +//! +//! Requires: +//! - eitype installed +//! - Compositor with EI protocol support (GNOME, KDE, Sway with libei) + +use super::TextOutput; +use crate::error::OutputError; +use std::process::Stdio; +use tokio::process::Command; + +/// eitype-based text output +pub struct EitypeOutput { + /// Whether to send Enter key after output + auto_submit: bool, + /// Delay between key events in milliseconds + type_delay_ms: u32, + /// Delay before typing starts (ms) + pre_type_delay_ms: u32, + /// Convert newlines to Shift+Enter (for apps where Enter submits) + shift_enter_newlines: bool, +} + +impl EitypeOutput { + /// Create a new eitype output + pub fn new( + auto_submit: bool, + type_delay_ms: u32, + pre_type_delay_ms: u32, + shift_enter_newlines: bool, + ) -> Self { + Self { + auto_submit, + type_delay_ms, + pre_type_delay_ms, + shift_enter_newlines, + } + } + + /// Type a string of text using eitype + async fn type_text(&self, text: &str) -> Result<(), OutputError> { + if text.is_empty() { + return Ok(()); + } + + // eitype doesn't have a pre-type delay flag, so sleep if needed + if self.pre_type_delay_ms > 0 { + tokio::time::sleep(std::time::Duration::from_millis(self.pre_type_delay_ms as u64)) + .await; + } + + let mut cmd = Command::new("eitype"); + let mut debug_args = vec!["eitype".to_string()]; + + // Add inter-keystroke delay if configured + if self.type_delay_ms > 0 { + cmd.arg("-d").arg(self.type_delay_ms.to_string()); + debug_args.push(format!("-d {}", self.type_delay_ms)); + } + + debug_args.push(format!( + "\"{}\"", + text.chars().take(20).collect::() + )); + tracing::debug!("Running: {}", debug_args.join(" ")); + + let output = cmd + .arg(text) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .output() + .await + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + OutputError::EitypeNotFound + } else { + OutputError::InjectionFailed(e.to_string()) + } + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(OutputError::InjectionFailed(format!( + "eitype failed: {}", + stderr + ))); + } + + Ok(()) + } + + /// Send Shift+Enter key combination using eitype + async fn send_shift_enter(&self) -> Result<(), OutputError> { + let output = Command::new("eitype") + .args(["-M", "shift", "-k", "return"]) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .output() + .await + .map_err(|e| { + OutputError::InjectionFailed(format!("eitype Shift+Enter failed: {}", e)) + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!("Failed to send Shift+Enter: {}", stderr); + } + + Ok(()) + } + + /// Send Enter key using eitype + async fn send_enter(&self) -> Result<(), OutputError> { + let output = Command::new("eitype") + .args(["-k", "return"]) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .output() + .await + .map_err(|e| OutputError::InjectionFailed(format!("eitype Enter failed: {}", e)))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!("Failed to send Enter key: {}", stderr); + } + + Ok(()) + } + + /// Output text with newlines converted to Shift+Enter + async fn output_with_shift_enter_newlines(&self, text: &str) -> Result<(), OutputError> { + let segments: Vec<&str> = text.split('\n').collect(); + + for (i, segment) in segments.iter().enumerate() { + // Type the text segment + if !segment.is_empty() { + self.type_text(segment).await?; + } + + // Send Shift+Enter between segments (not after the last one) + if i < segments.len() - 1 { + self.send_shift_enter().await?; + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl TextOutput for EitypeOutput { + async fn output(&self, text: &str) -> Result<(), OutputError> { + if text.is_empty() { + return Ok(()); + } + + // If shift_enter_newlines is enabled, process text with Shift+Enter for newlines + if self.shift_enter_newlines && text.contains('\n') { + self.output_with_shift_enter_newlines(text).await?; + } else { + self.type_text(text).await?; + } + + // Send Enter key if auto_submit is configured + if self.auto_submit { + self.send_enter().await?; + } + + Ok(()) + } + + async fn is_available(&self) -> bool { + // Check if eitype exists in PATH + Command::new("which") + .arg("eitype") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .await + .map(|s| s.success()) + .unwrap_or(false) + } + + fn name(&self) -> &'static str { + "eitype" + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new() { + let output = EitypeOutput::new(false, 0, 0, false); + assert!(!output.auto_submit); + assert_eq!(output.type_delay_ms, 0); + assert_eq!(output.pre_type_delay_ms, 0); + assert!(!output.shift_enter_newlines); + } + + #[test] + fn test_new_with_enter() { + let output = EitypeOutput::new(true, 0, 0, false); + assert!(output.auto_submit); + } + + #[test] + fn test_new_with_type_delay() { + let output = EitypeOutput::new(false, 50, 0, false); + assert_eq!(output.type_delay_ms, 50); + assert_eq!(output.pre_type_delay_ms, 0); + } + + #[test] + fn test_new_with_pre_type_delay() { + let output = EitypeOutput::new(false, 0, 200, false); + assert_eq!(output.type_delay_ms, 0); + assert_eq!(output.pre_type_delay_ms, 200); + } + + #[test] + fn test_new_with_shift_enter_newlines() { + let output = EitypeOutput::new(false, 0, 0, true); + assert!(output.shift_enter_newlines); + } +} diff --git a/src/output/mod.rs b/src/output/mod.rs index e1a44029..751dd556 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -3,16 +3,18 @@ //! Provides text output via keyboard simulation or clipboard. //! //! Fallback chain for `mode = "type"`: -//! 1. wtype - Wayland-native, best Unicode/CJK support, no daemon needed -//! 2. dotool - Works on X11/Wayland/TTY, supports keyboard layouts, no daemon needed -//! 3. ydotool - Works on X11/Wayland/TTY, requires daemon -//! 4. clipboard (wl-copy) - Wayland clipboard fallback -//! 5. xclip - X11 clipboard fallback +//! 1. wtype - Wayland-native via virtual-keyboard protocol, best Unicode/CJK support, no daemon needed +//! 2. eitype - Wayland via libei/EI protocol, works on GNOME/KDE (no virtual-keyboard support) +//! 3. dotool - Works on X11/Wayland/TTY, supports keyboard layouts, no daemon needed +//! 4. ydotool - Works on X11/Wayland/TTY, requires daemon +//! 5. clipboard (wl-copy) - Wayland clipboard fallback +//! 6. xclip - X11 clipboard fallback //! //! Paste mode (clipboard + Ctrl+V) helps with system with non US keyboard layouts. pub mod clipboard; pub mod dotool; +pub mod eitype; pub mod paste; pub mod post_process; pub mod wtype; @@ -142,6 +144,7 @@ pub trait TextOutput: Send + Sync { /// Default driver order for type mode const DEFAULT_DRIVER_ORDER: &[OutputDriver] = &[ OutputDriver::Wtype, + OutputDriver::Eitype, OutputDriver::Dotool, OutputDriver::Ydotool, OutputDriver::Clipboard, @@ -165,6 +168,12 @@ fn create_driver_output( pre_type_delay_ms, config.shift_enter_newlines, )), + OutputDriver::Eitype => Box::new(eitype::EitypeOutput::new( + config.auto_submit, + config.type_delay_ms, + pre_type_delay_ms, + config.shift_enter_newlines, + )), OutputDriver::Dotool => Box::new(dotool::DotoolOutput::new( config.type_delay_ms, pre_type_delay_ms, @@ -272,6 +281,72 @@ pub fn create_output_chain_with_override( chain } +/// Probe the output chain once and return the index of the first available driver. +/// +/// This allows caching the result across multiple output calls within a streaming +/// session, avoiding repeated `is_available()` subprocess spawns per delta. +pub async fn probe_output_chain(chain: &[Box]) -> Option { + for (i, output) in chain.iter().enumerate() { + if output.is_available().await { + tracing::debug!("Probed output chain: {} (index {}) is available", output.name(), i); + return Some(i); + } + tracing::debug!("{} not available during probe, trying next", output.name()); + } + None +} + +/// Output text using a cached driver index, skipping `is_available()` probes. +/// +/// If the cached driver fails, falls back to a full probe of the remaining chain. +/// Returns the (possibly updated) driver index on success. +pub async fn output_with_cached_index( + chain: &[Box], + text: &str, + cached_index: usize, +) -> Result { + let normalized_text = normalize_quotes(text); + + // Try the cached driver directly + if cached_index < chain.len() { + match chain[cached_index].output(&normalized_text).await { + Ok(()) => { + tracing::debug!("Text output via cached {} (index {})", chain[cached_index].name(), cached_index); + return Ok(cached_index); + } + Err(e) => { + tracing::warn!( + "Cached output {} (index {}) failed: {}, falling back to probe", + chain[cached_index].name(), + cached_index, + e + ); + } + } + } + + // Cached driver failed or index out of bounds; full probe fallback + for (i, output) in chain.iter().enumerate() { + if i == cached_index { + continue; // Already tried this one + } + if !output.is_available().await { + continue; + } + match output.output(&normalized_text).await { + Ok(()) => { + tracing::debug!("Text output via fallback {} (index {})", output.name(), i); + return Ok(i); + } + Err(e) => { + tracing::warn!("{} failed: {}, trying next", output.name(), e); + } + } + } + + Err(OutputError::AllMethodsFailed) +} + /// Run a shell command (for pre/post hooks) pub async fn run_hook(command: &str, hook_name: &str) -> Result<(), String> { tracing::debug!("Running {} hook: {}", hook_name, command); @@ -352,6 +427,118 @@ pub async fn output_with_fallback( #[cfg(test)] mod tests { use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + /// Mock output driver for testing caching behavior + struct MockTextOutput { + name: &'static str, + available: bool, + output_count: Arc, + available_count: Arc, + } + + impl MockTextOutput { + fn new(name: &'static str, available: bool) -> Self { + Self { + name, + available, + output_count: Arc::new(AtomicUsize::new(0)), + available_count: Arc::new(AtomicUsize::new(0)), + } + } + + fn output_calls(&self) -> usize { + self.output_count.load(Ordering::Relaxed) + } + + fn available_calls(&self) -> usize { + self.available_count.load(Ordering::Relaxed) + } + } + + #[async_trait::async_trait] + impl TextOutput for MockTextOutput { + async fn output(&self, _text: &str) -> Result<(), OutputError> { + self.output_count.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + + async fn is_available(&self) -> bool { + self.available_count.fetch_add(1, Ordering::Relaxed); + self.available + } + + fn name(&self) -> &'static str { + self.name + } + } + + #[tokio::test] + async fn test_probe_output_chain_finds_first_available() { + let chain: Vec> = vec![ + Box::new(MockTextOutput::new("unavailable", false)), + Box::new(MockTextOutput::new("available", true)), + Box::new(MockTextOutput::new("also_available", true)), + ]; + let idx = probe_output_chain(&chain).await; + assert_eq!(idx, Some(1)); + } + + #[tokio::test] + async fn test_probe_output_chain_none_available() { + let chain: Vec> = vec![ + Box::new(MockTextOutput::new("a", false)), + Box::new(MockTextOutput::new("b", false)), + ]; + let idx = probe_output_chain(&chain).await; + assert_eq!(idx, None); + } + + #[tokio::test] + async fn test_cached_index_skips_is_available() { + let mock = MockTextOutput::new("cached", true); + let available_counter = mock.available_count.clone(); + let output_counter = mock.output_count.clone(); + + let chain: Vec> = vec![Box::new(mock)]; + + // Output 15 deltas using cached index + for _ in 0..15 { + let result = output_with_cached_index(&chain, "hello", 0).await; + assert!(result.is_ok()); + } + + // is_available should never have been called + assert_eq!(available_counter.load(Ordering::Relaxed), 0); + // output should have been called 15 times + assert_eq!(output_counter.load(Ordering::Relaxed), 15); + } + + #[tokio::test] + async fn test_streaming_output_latency_baseline() { + let chain: Vec> = vec![ + Box::new(MockTextOutput::new("mock_driver", true)), + ]; + + let idx = probe_output_chain(&chain).await.unwrap(); + let start = std::time::Instant::now(); + + // Simulate 15 streaming deltas + for i in 0..15 { + let text = format!("delta {}", i); + let result = output_with_cached_index(&chain, &text, idx).await; + assert!(result.is_ok()); + } + + let elapsed = start.elapsed(); + // 15 mock deltas should complete well under 50ms + assert!( + elapsed.as_millis() < 50, + "15 cached deltas took {}ms, expected <50ms", + elapsed.as_millis() + ); + } #[test] fn test_normalize_quotes_no_change() { diff --git a/src/setup/mod.rs b/src/setup/mod.rs index 4c21c047..f1fa0da7 100644 --- a/src/setup/mod.rs +++ b/src/setup/mod.rs @@ -54,6 +54,7 @@ pub struct OutputToolStatus { pub struct OutputChainStatus { pub display_server: DisplayServer, pub wtype: OutputToolStatus, + pub eitype: OutputToolStatus, pub ydotool: OutputToolStatus, pub ydotool_daemon: bool, pub wl_copy: OutputToolStatus, @@ -178,6 +179,16 @@ pub async fn detect_output_chain() -> OutputChainStatus { None }; + // Check eitype + let eitype_path = get_command_path("eitype").await; + let eitype_installed = eitype_path.is_some(); + let eitype_available = eitype_installed && display_server == DisplayServer::Wayland; + let eitype_note = if eitype_installed && display_server != DisplayServer::Wayland { + Some("Wayland only".to_string()) + } else { + None + }; + // Check ydotool let ydotool_path = get_command_path("ydotool").await; let ydotool_installed = ydotool_path.is_some(); @@ -216,6 +227,8 @@ pub async fn detect_output_chain() -> OutputChainStatus { // Determine primary method let primary_method = if wtype_available { Some("wtype".to_string()) + } else if eitype_available { + Some("eitype".to_string()) } else if ydotool_available { Some("ydotool".to_string()) } else if wl_copy_available || xclip_available { @@ -233,6 +246,13 @@ pub async fn detect_output_chain() -> OutputChainStatus { path: wtype_path, note: wtype_note, }, + eitype: OutputToolStatus { + name: "eitype", + installed: eitype_installed, + available: eitype_available, + path: eitype_path, + note: eitype_note, + }, ydotool: OutputToolStatus { name: "ydotool", installed: ydotool_installed, @@ -283,6 +303,12 @@ pub fn print_output_chain_status(status: &OutputChainStatus) { status.display_server == DisplayServer::Wayland, ); + // eitype + print_tool_status( + &status.eitype, + status.display_server == DisplayServer::Wayland, + ); + // ydotool if status.ydotool.installed { let daemon_status = if status.ydotool_daemon { @@ -323,6 +349,7 @@ pub fn print_output_chain_status(status: &OutputChainStatus) { if let Some(ref method) = status.primary_method { let method_desc = match method.as_str() { "wtype" => "wtype (CJK supported)", + "eitype" => "eitype (libei, GNOME/KDE native)", "ydotool" => "ydotool (CJK not supported)", "clipboard" => "clipboard (requires manual paste)", _ => method.as_str(), @@ -330,7 +357,7 @@ pub fn print_output_chain_status(status: &OutputChainStatus) { println!(" \x1b[32m→\x1b[0m Text will be typed via {}", method_desc); } else { println!(" \x1b[31m→\x1b[0m No text output method available!"); - println!(" Install wtype (Wayland) or ydotool (X11) for typing support"); + println!(" Install wtype (Wayland), eitype (GNOME/KDE), or ydotool (X11) for typing support"); } } @@ -667,6 +694,7 @@ pub async fn run_checks(config: &Config) -> anyhow::Result<()> { print_failure("No text output method available"); if output_status.display_server == DisplayServer::Wayland { println!(" Install wtype: sudo pacman -S wtype"); + println!(" Or eitype: cargo install eitype"); } else { println!(" Install ydotool: sudo pacman -S ydotool"); } @@ -675,6 +703,7 @@ pub async fn run_checks(config: &Config) -> anyhow::Result<()> { print_warning("Only clipboard mode available - typing won't work"); if output_status.display_server == DisplayServer::Wayland { println!(" Install wtype: sudo pacman -S wtype"); + println!(" Or eitype: cargo install eitype"); } else { println!(" Install ydotool: sudo pacman -S ydotool"); } @@ -703,34 +732,21 @@ pub async fn run_checks(config: &Config) -> anyhow::Result<()> { // Check Parakeet models (experimental) println!("\nParakeet Models (EXPERIMENTAL):"); - // Find available Parakeet models + // Find available Parakeet models by checking known model names let mut parakeet_models: Vec<(String, u64)> = Vec::new(); - if let Ok(entries) = std::fs::read_dir(&models_dir) { - for entry in entries.flatten() { - let path = entry.path(); - if path.is_dir() { - let name = entry.file_name().to_string_lossy().to_string(); - if name.contains("parakeet") { - // Check if it has the required ONNX files - let encoder_path = path.join("encoder-model.onnx"); - let has_encoder = encoder_path.exists(); - let has_decoder = path.join("decoder_joint-model.onnx").exists() - || path.join("model.onnx").exists(); - if has_encoder || has_decoder { - // Get total size of model files - let size = std::fs::read_dir(&path) - .map(|entries| { - entries - .flatten() - .filter_map(|e| e.metadata().ok()) - .map(|m| m.len()) - .sum() - }) - .unwrap_or(0); - parakeet_models.push((name, size)); - } - } - } + for known_model in model::valid_parakeet_model_names() { + let path = models_dir.join(known_model); + if path.is_dir() && model::validate_parakeet_model(&path).is_ok() { + let size = std::fs::read_dir(&path) + .map(|entries| { + entries + .flatten() + .filter_map(|e| e.metadata().ok()) + .map(|m| m.len()) + .sum() + }) + .unwrap_or(0); + parakeet_models.push((known_model.to_string(), size)); } } diff --git a/src/setup/model.rs b/src/setup/model.rs index af58181d..32c58dd8 100644 --- a/src/setup/model.rs +++ b/src/setup/model.rs @@ -94,6 +94,8 @@ struct ParakeetModelInfo { description: &'static str, files: &'static [(&'static str, u64)], // (filename, expected_size_bytes) huggingface_repo: &'static str, + /// Subdirectory within the HuggingFace repo (None = files are in repo root) + huggingface_path: Option<&'static str>, } const PARAKEET_MODELS: &[ParakeetModelInfo] = &[ @@ -109,6 +111,7 @@ const PARAKEET_MODELS: &[ParakeetModelInfo] = &[ ("config.json", 97), ], huggingface_repo: "istupakov/parakeet-tdt-0.6b-v3-onnx", + huggingface_path: None, }, ParakeetModelInfo { name: "parakeet-tdt-0.6b-v3-int8", @@ -121,6 +124,44 @@ const PARAKEET_MODELS: &[ParakeetModelInfo] = &[ ("config.json", 97), ], huggingface_repo: "istupakov/parakeet-tdt-0.6b-v3-onnx", + huggingface_path: None, + }, + ParakeetModelInfo { + name: "nemotron-speech-streaming-en-0.6b", + size_mb: 2570, + description: "Nemotron streaming, live text during recording", + files: &[ + ("encoder.onnx", 44_252_467), + ("encoder.onnx.data", 2_621_440_000), + ("decoder_joint.onnx", 37_539_691), + ("tokenizer.model", 257_151), + ], + huggingface_repo: "altunenes/parakeet-rs", + huggingface_path: Some("nemotron-speech-streaming-en-0.6b"), + }, + ParakeetModelInfo { + name: "nemotron-speech-streaming-en-0.6b-int8", + size_mb: 670, + description: "Nemotron streaming, quantized int8 (faster CPU)", + files: &[ + ("encoder.onnx", 0), + ("decoder_joint.onnx", 0), + ("tokenizer.model", 257_151), + ], + huggingface_repo: "lokkju/nemotron-speech-streaming-en-0.6b-int8", + huggingface_path: None, + }, + ParakeetModelInfo { + name: "nemotron-speech-streaming-en-0.6b-int4", + size_mb: 400, + description: "Nemotron streaming, quantized int4 (smallest/fastest CPU)", + files: &[ + ("encoder.onnx", 0), + ("decoder_joint.onnx", 0), + ("tokenizer.model", 257_151), + ], + huggingface_repo: "lokkju/nemotron-speech-streaming-en-0.6b-int4", + huggingface_path: None, }, ]; @@ -611,26 +652,34 @@ pub fn validate_parakeet_model(path: &Path) -> anyhow::Result<()> { anyhow::bail!("Model directory does not exist: {:?}", path); } - // Check for TDT structure: encoder + decoder + vocab - let has_encoder = path.join("encoder-model.onnx").exists() + // Check for TDT/CTC structure: encoder-model + decoder_joint-model + vocab.txt + let has_tdt_encoder = path.join("encoder-model.onnx").exists() || path.join("encoder-model.onnx.data").exists() || path.join("encoder-model.int8.onnx").exists(); - let has_decoder = path.join("decoder_joint-model.onnx").exists() + let has_tdt_decoder = path.join("decoder_joint-model.onnx").exists() || path.join("decoder_joint-model.int8.onnx").exists(); let has_vocab = path.join("vocab.txt").exists(); - if has_encoder && has_decoder && has_vocab { + // Check for Nemotron structure: encoder + decoder_joint + tokenizer.model + let has_nemotron_encoder = + path.join("encoder.onnx").exists() || path.join("encoder.onnx.data").exists(); + let has_nemotron_decoder = path.join("decoder_joint.onnx").exists(); + let has_tokenizer = path.join("tokenizer.model").exists(); + + if (has_tdt_encoder && has_tdt_decoder && has_vocab) + || (has_nemotron_encoder && has_nemotron_decoder && has_tokenizer) + { Ok(()) } else { let mut missing = Vec::new(); - if !has_encoder { + if !has_tdt_encoder && !has_nemotron_encoder { missing.push("encoder model"); } - if !has_decoder { + if !has_tdt_decoder && !has_nemotron_decoder { missing.push("decoder model"); } - if !has_vocab { - missing.push("vocab.txt"); + if !has_vocab && !has_tokenizer { + missing.push("tokenizer (vocab.txt or tokenizer.model)"); } anyhow::bail!("Incomplete Parakeet model, missing: {}", missing.join(", ")) } @@ -664,10 +713,16 @@ fn download_parakeet_model_by_info(model: &ParakeetModelInfo) -> anyhow::Result< continue; } - let url = format!( - "https://huggingface.co/{}/resolve/main/{}", - model.huggingface_repo, filename - ); + let url = match model.huggingface_path { + Some(path) => format!( + "https://huggingface.co/{}/resolve/main/{}/{}", + model.huggingface_repo, path, filename + ), + None => format!( + "https://huggingface.co/{}/resolve/main/{}", + model.huggingface_repo, filename + ), + }; println!("Downloading {}...", filename); @@ -1025,6 +1080,9 @@ language = "en" let model_names: Vec<&str> = PARAKEET_MODELS.iter().map(|m| m.name).collect(); assert!(model_names.contains(&"parakeet-tdt-0.6b-v3")); assert!(model_names.contains(&"parakeet-tdt-0.6b-v3-int8")); + assert!(model_names.contains(&"nemotron-speech-streaming-en-0.6b")); + assert!(model_names.contains(&"nemotron-speech-streaming-en-0.6b-int8")); + assert!(model_names.contains(&"nemotron-speech-streaming-en-0.6b-int4")); } #[test] @@ -1050,10 +1108,13 @@ language = "en" "Model {} should have file definitions", model.name ); - // All TDT models should have vocab.txt + // All models should have a tokenizer: vocab.txt (TDT/CTC) or tokenizer.model (Nemotron) + let has_tokenizer = model.files.iter().any(|(f, _)| { + *f == "vocab.txt" || *f == "tokenizer.model" || *f == "tokenizer.json" + }); assert!( - model.files.iter().any(|(f, _)| *f == "vocab.txt"), - "Model {} should have vocab.txt", + has_tokenizer, + "Model {} should have a tokenizer file (vocab.txt, tokenizer.model, or tokenizer.json)", model.name ); } @@ -1064,6 +1125,9 @@ language = "en" // Valid Parakeet models assert!(is_parakeet_model("parakeet-tdt-0.6b-v3")); assert!(is_parakeet_model("parakeet-tdt-0.6b-v3-int8")); + assert!(is_parakeet_model("nemotron-speech-streaming-en-0.6b")); + assert!(is_parakeet_model("nemotron-speech-streaming-en-0.6b-int8")); + assert!(is_parakeet_model("nemotron-speech-streaming-en-0.6b-int4")); // Invalid models assert!(!is_parakeet_model("base.en")); diff --git a/src/state.rs b/src/state.rs index 4d2f8b6e..d349573b 100644 --- a/src/state.rs +++ b/src/state.rs @@ -8,6 +8,15 @@ use std::time::Instant; /// Audio samples collected during recording (f32, mono, 16kHz) pub type AudioBuffer = Vec; +/// Result from transcribing a single chunk during eager processing +#[derive(Debug, Clone)] +pub struct ChunkResult { + /// Transcribed text from this chunk + pub text: String, + /// Which chunk this result corresponds to (0-indexed) + pub chunk_index: usize, +} + /// Application state #[derive(Debug, Clone)] pub enum State { @@ -22,6 +31,35 @@ pub enum State { model_override: Option, }, + /// Hotkey held, recording audio with eager chunk processing + EagerRecording { + /// When recording started + started_at: Instant, + /// Optional model override for this recording + model_override: Option, + /// Accumulated audio samples during recording + accumulated_audio: AudioBuffer, + /// Number of chunks already sent for transcription + chunks_sent: usize, + /// Results received from completed chunk transcriptions + chunk_results: Vec, + /// Number of transcription tasks currently in flight + tasks_in_flight: usize, + }, + + /// Hotkey held, recording audio with streaming transcription (Nemotron) + /// Text is typed live during recording as audio chunks are processed + StreamingRecording { + /// When recording started + started_at: Instant, + /// Optional model override for this recording + model_override: Option, + /// Audio samples buffered but not yet a full chunk + audio_buffer: Vec, + /// Text already typed to the output + text_output_so_far: String, + }, + /// Hotkey released, transcribing audio Transcribing { /// Recorded audio samples @@ -46,15 +84,50 @@ impl State { matches!(self, State::Idle) } - /// Check if in recording state + /// Check if in recording state (normal, eager, or streaming) pub fn is_recording(&self) -> bool { - matches!(self, State::Recording { .. }) + matches!( + self, + State::Recording { .. } + | State::EagerRecording { .. } + | State::StreamingRecording { .. } + ) + } + + /// Check if in streaming recording state specifically + pub fn is_streaming_recording(&self) -> bool { + matches!(self, State::StreamingRecording { .. }) } - /// Get recording duration if currently recording + /// Check if in eager recording state specifically + pub fn is_eager_recording(&self) -> bool { + matches!(self, State::EagerRecording { .. }) + } + + /// Get recording duration if currently recording (normal, eager, or streaming) pub fn recording_duration(&self) -> Option { match self { - State::Recording { started_at, .. } => Some(started_at.elapsed()), + State::Recording { started_at, .. } + | State::EagerRecording { started_at, .. } + | State::StreamingRecording { started_at, .. } => Some(started_at.elapsed()), + _ => None, + } + } + + /// Get the number of chunks sent for transcription (eager mode only) + pub fn eager_chunks_sent(&self) -> Option { + match self { + State::EagerRecording { chunks_sent, .. } => Some(*chunks_sent), + _ => None, + } + } + + /// Get the number of transcription tasks currently in flight (eager mode only) + pub fn eager_tasks_in_flight(&self) -> Option { + match self { + State::EagerRecording { + tasks_in_flight, .. + } => Some(*tasks_in_flight), _ => None, } } @@ -73,6 +146,32 @@ impl std::fmt::Display for State { State::Recording { started_at, .. } => { write!(f, "Recording ({:.1}s)", started_at.elapsed().as_secs_f32()) } + State::EagerRecording { + started_at, + chunks_sent, + tasks_in_flight, + .. + } => { + write!( + f, + "Recording ({:.1}s, {} chunks, {} pending)", + started_at.elapsed().as_secs_f32(), + chunks_sent, + tasks_in_flight + ) + } + State::StreamingRecording { + started_at, + text_output_so_far, + .. + } => { + write!( + f, + "Streaming ({:.1}s, {} chars typed)", + started_at.elapsed().as_secs_f32(), + text_output_so_far.len() + ) + } State::Transcribing { audio } => { let duration = audio.len() as f32 / 16000.0; write!(f, "Transcribing ({:.1}s of audio)", duration) @@ -128,4 +227,78 @@ mod tests { }; assert!(format!("{}", state).starts_with("Recording")); } + + #[test] + fn test_eager_recording_state() { + let state = State::EagerRecording { + started_at: Instant::now(), + model_override: None, + accumulated_audio: vec![], + chunks_sent: 2, + chunk_results: vec![], + tasks_in_flight: 1, + }; + assert!(state.is_recording()); + assert!(state.is_eager_recording()); + assert!(!state.is_idle()); + assert!(state.recording_duration().is_some()); + assert_eq!(state.eager_chunks_sent(), Some(2)); + assert_eq!(state.eager_tasks_in_flight(), Some(1)); + } + + #[test] + fn test_regular_recording_not_eager() { + let state = State::Recording { + started_at: Instant::now(), + model_override: None, + }; + assert!(state.is_recording()); + assert!(!state.is_eager_recording()); + assert_eq!(state.eager_chunks_sent(), None); + assert_eq!(state.eager_tasks_in_flight(), None); + } + + #[test] + fn test_streaming_recording_state() { + let state = State::StreamingRecording { + started_at: Instant::now(), + model_override: None, + audio_buffer: vec![], + text_output_so_far: "hello ".to_string(), + }; + assert!(state.is_recording()); + assert!(state.is_streaming_recording()); + assert!(!state.is_idle()); + assert!(!state.is_eager_recording()); + assert!(state.recording_duration().is_some()); + } + + #[test] + fn test_streaming_recording_display() { + let state = State::StreamingRecording { + started_at: Instant::now(), + model_override: None, + audio_buffer: vec![], + text_output_so_far: "hello world".to_string(), + }; + let display = format!("{}", state); + assert!(display.contains("Streaming")); + assert!(display.contains("11 chars typed")); + } + + #[test] + fn test_eager_recording_display() { + let state = State::EagerRecording { + started_at: Instant::now(), + model_override: None, + accumulated_audio: vec![], + chunks_sent: 3, + chunk_results: vec![], + tasks_in_flight: 2, + }; + let display = format!("{}", state); + assert!(display.contains("Recording")); + assert!(display.contains("3 chunks")); + assert!(display.contains("2 pending")); + } } diff --git a/src/transcribe/mod.rs b/src/transcribe/mod.rs index eea5c828..a7b3caa6 100644 --- a/src/transcribe/mod.rs +++ b/src/transcribe/mod.rs @@ -39,6 +39,28 @@ pub trait Transcriber: Send + Sync { } } +/// Trait for streaming speech-to-text (real-time transcription during recording) +/// +/// Unlike `Transcriber`, this maintains internal state and produces incremental +/// text output as audio chunks are fed in. Used by Nemotron streaming models. +pub trait StreamingTranscriber: Send { + /// Feed a chunk of audio and get incremental text back + /// Returns the new text produced by this chunk (delta, not cumulative) + fn transcribe_chunk(&mut self, chunk: &[f32]) -> Result; + + /// Flush remaining audio by feeding silence chunks to drain the decoder + fn flush(&mut self) -> Result; + + /// Reset state for a new utterance + fn reset(&mut self); + + /// Get the full accumulated transcript so far + fn get_transcript(&self) -> String; + + /// Chunk size in samples expected by this model + fn chunk_size(&self) -> usize; +} + /// Factory function to create transcriber based on configured engine pub fn create_transcriber(config: &Config) -> Result, TranscribeError> { match config.engine { @@ -50,7 +72,9 @@ pub fn create_transcriber(config: &Config) -> Result, Trans "Parakeet engine selected but [parakeet] config section is missing".to_string(), ) })?; - Ok(Box::new(parakeet::ParakeetTranscriber::new(parakeet_config)?)) + Ok(Box::new(parakeet::ParakeetTranscriber::new( + parakeet_config, + )?)) } #[cfg(not(feature = "parakeet"))] TranscriptionEngine::Parakeet => Err(TranscribeError::InitFailed( @@ -60,6 +84,29 @@ pub fn create_transcriber(config: &Config) -> Result, Trans } } +/// Factory function to create a streaming transcriber (currently only Nemotron) +#[cfg(feature = "parakeet")] +pub fn create_streaming_transcriber( + config: &Config, +) -> Result, TranscribeError> { + let parakeet_config = config.parakeet.as_ref().ok_or_else(|| { + TranscribeError::InitFailed( + "Streaming transcription requires [parakeet] config section".to_string(), + ) + })?; + parakeet::create_nemotron_streaming(parakeet_config) +} + +/// Factory function to create a streaming transcriber (stub when parakeet feature is disabled) +#[cfg(not(feature = "parakeet"))] +pub fn create_streaming_transcriber( + _config: &Config, +) -> Result, TranscribeError> { + Err(TranscribeError::InitFailed( + "Streaming transcription requires voxtype compiled with --features parakeet".to_string(), + )) +} + /// Factory function to create Whisper transcriber (local or remote) pub fn create_whisper_transcriber( config: &WhisperConfig, @@ -76,7 +123,10 @@ pub fn create_transcriber_with_config_path( // Apply GPU selection from VOXTYPE_VULKAN_DEVICE environment variable // This sets VK_LOADER_DRIVERS_SELECT to filter Vulkan drivers if let Some(vendor) = gpu::apply_gpu_selection() { - tracing::info!("GPU selection: {} (via VOXTYPE_VULKAN_DEVICE)", vendor.display_name()); + tracing::info!( + "GPU selection: {} (via VOXTYPE_VULKAN_DEVICE)", + vendor.display_name() + ); } match config.effective_mode() { diff --git a/src/transcribe/parakeet.rs b/src/transcribe/parakeet.rs index a742a178..257253ab 100644 --- a/src/transcribe/parakeet.rs +++ b/src/transcribe/parakeet.rs @@ -7,21 +7,29 @@ //! - CTC (Connectionist Temporal Classification): faster, character-level output //! - TDT (Token-Duration-Transducer): recommended, proper punctuation and word boundaries -use super::Transcriber; +use super::{StreamingTranscriber, Transcriber}; use crate::config::{ParakeetConfig, ParakeetModelType}; use crate::error::TranscribeError; -#[cfg(any(feature = "parakeet-cuda", feature = "parakeet-rocm", feature = "parakeet-tensorrt"))] +#[cfg(any( + feature = "parakeet-cuda", + feature = "parakeet-rocm", + feature = "parakeet-tensorrt" +))] use parakeet_rs::ExecutionProvider; -use parakeet_rs::{ExecutionConfig, Parakeet, ParakeetTDT, Transcriber as ParakeetTranscriberTrait}; +use parakeet_rs::{ + ExecutionConfig, Nemotron, Parakeet, ParakeetTDT, Transcriber as ParakeetTranscriberTrait, +}; use std::path::PathBuf; use std::sync::Mutex; -/// Internal enum to hold either CTC or TDT model instance +/// Internal enum to hold CTC, TDT, or Nemotron model instance enum ParakeetModel { /// CTC model (character-level, faster) Ctc(Mutex), /// TDT model (token-level, better quality output) Tdt(Mutex), + /// Nemotron model (streaming transducer) + Nemotron(Mutex), } /// Parakeet-based transcriber using ONNX Runtime @@ -54,19 +62,26 @@ impl ParakeetTranscriber { let model = match model_type { ParakeetModelType::Ctc => { - let parakeet = Parakeet::from_pretrained(&model_path, exec_config) - .map_err(|e| { + let parakeet = + Parakeet::from_pretrained(&model_path, exec_config).map_err(|e| { TranscribeError::InitFailed(format!("Parakeet CTC init failed: {}", e)) })?; ParakeetModel::Ctc(Mutex::new(parakeet)) } ParakeetModelType::Tdt => { - let parakeet = ParakeetTDT::from_pretrained(&model_path, exec_config) - .map_err(|e| { + let parakeet = + ParakeetTDT::from_pretrained(&model_path, exec_config).map_err(|e| { TranscribeError::InitFailed(format!("Parakeet TDT init failed: {}", e)) })?; ParakeetModel::Tdt(Mutex::new(parakeet)) } + ParakeetModelType::Nemotron => { + let nemotron = + Nemotron::from_pretrained(&model_path, exec_config).map_err(|e| { + TranscribeError::InitFailed(format!("Nemotron init failed: {}", e)) + })?; + ParakeetModel::Nemotron(Mutex::new(nemotron)) + } }; tracing::info!( @@ -82,7 +97,9 @@ impl ParakeetTranscriber { impl Transcriber for ParakeetTranscriber { fn transcribe(&self, samples: &[f32]) -> Result { if samples.is_empty() { - return Err(TranscribeError::AudioFormat("Empty audio buffer".to_string())); + return Err(TranscribeError::AudioFormat( + "Empty audio buffer".to_string(), + )); } let duration_secs = samples.len() as f32 / 16000.0; @@ -98,7 +115,10 @@ impl Transcriber for ParakeetTranscriber { let text = match &self.model { ParakeetModel::Ctc(parakeet) => { let mut parakeet = parakeet.lock().map_err(|e| { - TranscribeError::InferenceFailed(format!("Failed to lock Parakeet mutex: {}", e)) + TranscribeError::InferenceFailed(format!( + "Failed to lock Parakeet mutex: {}", + e + )) })?; let result = parakeet @@ -109,14 +129,20 @@ impl Transcriber for ParakeetTranscriber { None, // default timestamp mode ) .map_err(|e| { - TranscribeError::InferenceFailed(format!("Parakeet CTC inference failed: {}", e)) + TranscribeError::InferenceFailed(format!( + "Parakeet CTC inference failed: {}", + e + )) })?; result.text.trim().to_string() } ParakeetModel::Tdt(parakeet) => { let mut parakeet = parakeet.lock().map_err(|e| { - TranscribeError::InferenceFailed(format!("Failed to lock Parakeet mutex: {}", e)) + TranscribeError::InferenceFailed(format!( + "Failed to lock Parakeet mutex: {}", + e + )) })?; let result = parakeet @@ -127,11 +153,29 @@ impl Transcriber for ParakeetTranscriber { None, // default timestamp mode ) .map_err(|e| { - TranscribeError::InferenceFailed(format!("Parakeet TDT inference failed: {}", e)) + TranscribeError::InferenceFailed(format!( + "Parakeet TDT inference failed: {}", + e + )) })?; result.text.trim().to_string() } + ParakeetModel::Nemotron(nemotron) => { + let mut nemotron = nemotron.lock().map_err(|e| { + TranscribeError::InferenceFailed(format!( + "Failed to lock Nemotron mutex: {}", + e + )) + })?; + + nemotron.reset(); + let text = nemotron.transcribe_audio(samples).map_err(|e| { + TranscribeError::InferenceFailed(format!("Nemotron inference failed: {}", e)) + })?; + + text.trim().to_string() + } }; tracing::info!( @@ -149,6 +193,73 @@ impl Transcriber for ParakeetTranscriber { } } +/// Nemotron streaming transcriber for real-time output during recording +pub struct NemotronStreamingTranscriber { + model: Nemotron, +} + +impl NemotronStreamingTranscriber { + pub fn new(config: &ParakeetConfig) -> Result { + let model_path = resolve_model_path(&config.model)?; + + tracing::info!("Loading Nemotron streaming model from {:?}", model_path); + let start = std::time::Instant::now(); + + let exec_config = build_execution_config(); + + let model = Nemotron::from_pretrained(&model_path, exec_config) + .map_err(|e| TranscribeError::InitFailed(format!("Nemotron init failed: {}", e)))?; + + tracing::info!( + "Nemotron streaming model loaded in {:.2}s", + start.elapsed().as_secs_f32() + ); + + Ok(Self { model }) + } +} + +impl StreamingTranscriber for NemotronStreamingTranscriber { + fn transcribe_chunk(&mut self, chunk: &[f32]) -> Result { + // parakeet-rs transcribe_chunk() already returns only the new tokens (delta), + // not the full cumulative transcript. Return it directly. + self.model.transcribe_chunk(chunk).map_err(|e| { + TranscribeError::InferenceFailed(format!("Nemotron chunk inference failed: {}", e)) + }) + } + + fn flush(&mut self) -> Result { + // Feed 3 silence chunks to drain the decoder + let silence = vec![0.0f32; self.chunk_size()]; + let mut flushed = String::new(); + for _ in 0..3 { + let delta = self.transcribe_chunk(&silence)?; + flushed.push_str(&delta); + } + Ok(flushed) + } + + fn reset(&mut self) { + self.model.reset(); + } + + fn get_transcript(&self) -> String { + self.model.get_transcript() + } + + fn chunk_size(&self) -> usize { + // 560ms at 16kHz = 8960 samples + 8960 + } +} + +/// Factory function to create a Nemotron streaming transcriber +pub fn create_nemotron_streaming( + config: &ParakeetConfig, +) -> Result, TranscribeError> { + Ok(Box::new(NemotronStreamingTranscriber::new(config)?)) +} + /// Build execution config based on compile-time feature flags fn build_execution_config() -> Option { #[cfg(feature = "parakeet-cuda")] @@ -165,11 +276,15 @@ fn build_execution_config() -> Option { #[cfg(feature = "parakeet-rocm")] { - tracing::info!("Configuring ROCm execution provider for AMD GPU acceleration"); - return Some(ExecutionConfig::new().with_execution_provider(ExecutionProvider::ROCm)); + tracing::info!("Configuring MIGraphX execution provider for AMD GPU acceleration"); + return Some(ExecutionConfig::new().with_execution_provider(ExecutionProvider::MIGraphX)); } - #[cfg(not(any(feature = "parakeet-cuda", feature = "parakeet-tensorrt", feature = "parakeet-rocm")))] + #[cfg(not(any( + feature = "parakeet-cuda", + feature = "parakeet-tensorrt", + feature = "parakeet-rocm" + )))] { None } @@ -177,12 +292,27 @@ fn build_execution_config() -> Option { /// Auto-detect model type from directory structure /// +/// Nemotron models have: encoder.onnx, encoder.onnx.data, decoder_joint.onnx, tokenizer.model /// TDT models have: encoder-model.onnx, decoder_joint-model.onnx, vocab.txt /// CTC models have: model.onnx (or model_int8.onnx), tokenizer.json -fn detect_model_type(path: &PathBuf) -> ParakeetModelType { - // Check for TDT model structure - let has_encoder = path.join("encoder-model.onnx").exists() - || path.join("encoder-model.onnx.data").exists(); +pub fn detect_model_type(path: &PathBuf) -> ParakeetModelType { + // Check for Nemotron model structure (must come before TDT since both have encoder/decoder) + // Nemotron uses non-hyphenated names: encoder.onnx (not encoder-model.onnx) + let has_nemotron_encoder = + path.join("encoder.onnx").exists() || path.join("encoder.onnx.data").exists(); + let has_nemotron_decoder = path.join("decoder_joint.onnx").exists(); + let has_sentencepiece = path.join("tokenizer.model").exists(); + + if has_nemotron_encoder && has_nemotron_decoder && has_sentencepiece { + tracing::debug!( + "Auto-detected Nemotron model (found encoder.onnx + decoder_joint.onnx + tokenizer.model)" + ); + return ParakeetModelType::Nemotron; + } + + // Check for TDT model structure (hyphenated names: encoder-model.onnx) + let has_encoder = + path.join("encoder-model.onnx").exists() || path.join("encoder-model.onnx.data").exists(); let has_decoder = path.join("decoder_joint-model.onnx").exists(); if has_encoder && has_decoder { @@ -191,8 +321,7 @@ fn detect_model_type(path: &PathBuf) -> ParakeetModelType { } // Check for CTC model structure - let has_ctc_model = path.join("model.onnx").exists() - || path.join("model_int8.onnx").exists(); + let has_ctc_model = path.join("model.onnx").exists() || path.join("model_int8.onnx").exists(); let has_tokenizer = path.join("tokenizer.json").exists(); if has_ctc_model && has_tokenizer { @@ -210,7 +339,7 @@ fn detect_model_type(path: &PathBuf) -> ParakeetModelType { } /// Resolve model name to directory path -fn resolve_model_path(model: &str) -> Result { +pub fn resolve_model_path(model: &str) -> Result { // If it's already an absolute path, use it directly let path = PathBuf::from(model); if path.is_absolute() && path.exists() { @@ -239,8 +368,10 @@ fn resolve_model_path(model: &str) -> Result { Err(TranscribeError::ModelNotFound(format!( "Parakeet model '{}' not found. Looked in:\n - {}\n - {}\n - {}\n\n\ + Run: voxtype setup model\n\n\ Download TDT (recommended): https://huggingface.co/istupakov/parakeet-tdt-0.6b-v3-onnx\n\ - Download CTC: https://huggingface.co/nvidia/parakeet-ctc-0.6b", + Download CTC: https://huggingface.co/nvidia/parakeet-ctc-0.6b\n\ + Download Nemotron (streaming): https://huggingface.co/altunenes/parakeet-rs/tree/main/nemotron-speech-streaming-en-0.6b", model, model_path.display(), cwd_path.display(), @@ -307,6 +438,49 @@ mod tests { assert_eq!(detected, ParakeetModelType::Ctc); } + #[test] + fn test_detect_model_type_nemotron() { + let temp_dir = TempDir::new().unwrap(); + let model_path = temp_dir.path().to_path_buf(); + + // Create Nemotron model structure (non-hyphenated encoder.onnx) + fs::write(model_path.join("encoder.onnx"), b"dummy").unwrap(); + fs::write(model_path.join("encoder.onnx.data"), b"dummy").unwrap(); + fs::write(model_path.join("decoder_joint.onnx"), b"dummy").unwrap(); + fs::write(model_path.join("tokenizer.model"), b"dummy").unwrap(); + + let detected = detect_model_type(&model_path); + assert_eq!(detected, ParakeetModelType::Nemotron); + } + + #[test] + fn test_detect_model_type_nemotron_without_data_file() { + let temp_dir = TempDir::new().unwrap(); + let model_path = temp_dir.path().to_path_buf(); + + // Nemotron with encoder.onnx (no .data file) + decoder + tokenizer.model + fs::write(model_path.join("encoder.onnx"), b"dummy").unwrap(); + fs::write(model_path.join("decoder_joint.onnx"), b"dummy").unwrap(); + fs::write(model_path.join("tokenizer.model"), b"dummy").unwrap(); + + let detected = detect_model_type(&model_path); + assert_eq!(detected, ParakeetModelType::Nemotron); + } + + #[test] + fn test_detect_model_type_tdt_not_confused_with_nemotron() { + let temp_dir = TempDir::new().unwrap(); + let model_path = temp_dir.path().to_path_buf(); + + // TDT uses hyphenated names - should NOT match Nemotron + fs::write(model_path.join("encoder-model.onnx"), b"dummy").unwrap(); + fs::write(model_path.join("decoder_joint-model.onnx"), b"dummy").unwrap(); + fs::write(model_path.join("vocab.txt"), b"dummy").unwrap(); + + let detected = detect_model_type(&model_path); + assert_eq!(detected, ParakeetModelType::Tdt); + } + #[test] fn test_detect_model_type_defaults_to_tdt_when_ambiguous() { let temp_dir = TempDir::new().unwrap();