Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions crates/cactus/src/stt/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ unsafe extern "C" fn token_trampoline<F: FnMut(&str) -> bool>(

let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let chunk = unsafe { CStr::from_ptr(token) }.to_string_lossy();
if chunk.starts_with("<|") && chunk.ends_with("|>") {
return;
}
let on_token = unsafe { &mut *state.on_token.get() };
if !on_token(&chunk) {
state.stopped.set(true);
Expand Down
115 changes: 114 additions & 1 deletion crates/listener2-core/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ async fn spawn_batch_task(
AdapterKind::Hyprnote => {
spawn_batch_task_with_adapter::<HyprnoteAdapter>(args, myself).await
}
AdapterKind::Cactus => spawn_batch_task_with_adapter::<CactusAdapter>(args, myself).await,
AdapterKind::Cactus => spawn_cactus_batch_task(args, myself).await,
}
}

Expand Down Expand Up @@ -509,6 +509,119 @@ async fn spawn_argmax_streaming_batch_task(
Ok((rx_task, shutdown_tx))
}

async fn spawn_cactus_batch_task(
args: BatchArgs,
myself: ActorRef<BatchMsg>,
) -> Result<
(
tokio::task::JoinHandle<()>,
tokio::sync::oneshot::Sender<()>,
),
ActorProcessingErr,
> {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();

let rx_task = tokio::spawn(async move {
tracing::info!("cactus batch task: starting direct HTTP batch");
let start_notifier = args.start_notifier.clone();

let stream_result = CactusAdapter::transcribe_file_streaming(
&args.base_url,
&args.api_key,
&args.listen_params,
&args.file_path,
)
.await;

let mut stream = match stream_result {
Ok(s) => {
notify_start_result(&start_notifier, Ok(()));
s
}
Err(e) => {
let raw_error = format!("{:?}", e);
let error = format_user_friendly_error(&raw_error);
tracing::error!("cactus batch task: failed to start: {:?}", e);
notify_start_result(&start_notifier, Err(error.clone()));
let _ = myself.send_message(BatchMsg::StreamStartFailed(error));
return;
}
};

let response_timeout = Duration::from_secs(BATCH_STREAM_TIMEOUT_SECS);
let mut response_count = 0;
let mut ended_cleanly = false;

loop {
tokio::select! {
_ = &mut shutdown_rx => {
tracing::info!("cactus batch task: shutdown");
ended_cleanly = true;
break;
}
result = tokio::time::timeout(response_timeout, StreamExt::next(&mut stream)) => {
match result {
Ok(Some(Ok(event))) => {
response_count += 1;

let is_from_finalize = matches!(
&event.response,
StreamResponse::TranscriptResponse { from_finalize, .. } if *from_finalize
);

tracing::info!(
"cactus batch: response #{}{}",
response_count,
if is_from_finalize { " (from_finalize)" } else { "" }
);

if let Err(e) = myself.send_message(BatchMsg::StreamResponse {
response: Box::new(event.response),
percentage: event.percentage,
}) {
tracing::error!("failed to send cactus batch response: {:?}", e);
}

if is_from_finalize {
ended_cleanly = true;
break;
}
}
Ok(Some(Err(e))) => {
let raw_error = format!("{:?}", e);
let error = format_user_friendly_error(&raw_error);
tracing::error!("cactus batch error: {:?}", e);
let _ = myself.send_message(BatchMsg::StreamError(error));
break;
}
Ok(None) => {
tracing::info!("cactus batch completed (total: {})", response_count);
ended_cleanly = true;
break;
}
Err(elapsed) => {
tracing::warn!(timeout = ?elapsed, "cactus batch timeout");
let _ = myself.send_message(BatchMsg::StreamError(
format_user_friendly_error("timeout waiting for response"),
));
break;
}
}
}
}
}

if ended_cleanly {
if let Err(e) = myself.send_message(BatchMsg::StreamEnded) {
tracing::error!("failed to send cactus batch ended message: {:?}", e);
}
}
tracing::info!("cactus batch task exited");
});

Ok((rx_task, shutdown_tx))
}

async fn spawn_batch_task_with_adapter<A: RealtimeSttAdapter>(
args: BatchArgs,
myself: ActorRef<BatchMsg>,
Expand Down
16 changes: 4 additions & 12 deletions crates/owhisper-client/src/adapter/argmax/batch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::Duration;

use futures_util::{Stream, StreamExt};
use futures_util::StreamExt;
use hypr_audio_utils::{Source, f32_to_i16_bytes, resample_audio, source_from_path};
use owhisper_interface::batch::Response as BatchResponse;
use owhisper_interface::stream::StreamResponse;
Expand All @@ -11,7 +10,9 @@ use tokio_stream::StreamExt as TokioStreamExt;

use crate::ListenClientBuilder;
use crate::adapter::deepgram_compat::build_batch_url;
use crate::adapter::{BatchFuture, BatchSttAdapter, ClientWithMiddleware};
use crate::adapter::{
BatchFuture, BatchSttAdapter, ClientWithMiddleware, StreamingBatchEvent, StreamingBatchStream,
};
use crate::error::Error;

use super::{ArgmaxAdapter, keywords::ArgmaxKeywordStrategy, language::ArgmaxLanguageStrategy};
Expand Down Expand Up @@ -151,15 +152,6 @@ impl StreamingBatchConfig {
}
}

#[derive(Debug, Clone)]
pub struct StreamingBatchEvent {
pub response: StreamResponse,
pub percentage: f64,
}

pub type StreamingBatchStream =
Pin<Box<dyn Stream<Item = Result<StreamingBatchEvent, Error>> + Send>>;

impl ArgmaxAdapter {
pub async fn transcribe_file_streaming<P: AsRef<Path>>(
api_base: &str,
Expand Down
2 changes: 1 addition & 1 deletion crates/owhisper-client/src/adapter/argmax/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub(crate) mod language;
mod live;

#[cfg(feature = "argmax")]
pub use batch::{StreamingBatchConfig, StreamingBatchEvent, StreamingBatchStream};
pub use batch::StreamingBatchConfig;

pub use language::PARAKEET_V3_LANGS;

Expand Down
Loading
Loading