Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ permissions:
env:
# Bump to invalidate every cache entry without source surgery (e.g., after a
# known-bad cache or an Xcode toolchain upgrade we want to flush manually).
CACHE_SALT: v2-vmlx-5b84387
CACHE_SALT: v3-pr-cold-deriveddata
# Pin Xcode so cache keys are stable across runner image bumps. When you
# need to upgrade, change here AND in setup-xcode below.
XCODE_VERSION: "26.4.1"
Expand Down
32 changes: 12 additions & 20 deletions Packages/OsaurusCore/Managers/TTSService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,25 +157,9 @@ public final class TTSService: ObservableObject {
let voice = TTSConfigurationStore.load().voice
initTask = Task { [weak self] in
do {
// Route through the downloader explicitly so we get progress callbacks.
// When models are already cached this returns nearly instantly.
_ = try await PocketTtsResourceDownloader.ensureModels(
directory: nil,
progressHandler: { progress in
Task { @MainActor in
guard let self else { return }
let fraction: Double?
switch progress.phase {
case .downloading:
fraction = progress.fractionCompleted
case .listing, .compiling:
fraction = nil
}
self.modelState = .downloading(fraction: fraction)
}
}
)

// Let FluidAudio pick its default language pack so this call
// stays compatible across the workspace-pinned and package
// resolved PocketTTS APIs.
let mgr = PocketTtsManager(defaultVoice: voice)
try await mgr.initialize()
await MainActor.run {
Expand Down Expand Up @@ -216,9 +200,17 @@ public final class TTSService: ObservableObject {
.appendingPathComponent("fluidaudio", isDirectory: true)
.appendingPathComponent("Models", isDirectory: true)
.appendingPathComponent("pocket-tts", isDirectory: true)
let candidateDirs = [
repoDir,
repoDir
.appendingPathComponent("v2", isDirectory: true)
.appendingPathComponent("english", isDirectory: true),
]
let required = ModelNames.PocketTTS.requiredModels
let fm = FileManager.default
return required.allSatisfy { fm.fileExists(atPath: repoDir.appendingPathComponent($0).path) }
return candidateDirs.contains { directory in
required.allSatisfy { fm.fileExists(atPath: directory.appendingPathComponent($0).path) }
}
}

// MARK: - Playback
Expand Down
43 changes: 42 additions & 1 deletion Packages/OsaurusCore/Tests/Tool/ToolRegistryTimeoutTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// hanging the agent loop indefinitely.
//

import Dispatch
import Foundation
import Testing

Expand Down Expand Up @@ -36,6 +37,25 @@ struct ToolRegistryTimeoutTests {
}
}

/// Tool body that ignores cooperative Swift cancellation without burning a
/// Swift concurrency executor thread. This mirrors process / blocking I/O
/// classes where returning a timeout envelope must not wait for the losing
/// branch to drain.
private struct BlockingSleepTool: OsaurusTool {
let name: String = "test_blocking_sleep"
let description: String = "Test fixture: completes later than the timeout."
let parameters: JSONValue? = .object(["type": .string("object")])

func execute(argumentsJSON: String) async throws -> String {
await withCheckedContinuation { continuation in
DispatchQueue.global().asyncAfter(deadline: .now() + 5.0) {
continuation.resume()
}
}
return ToolEnvelope.success(tool: name, text: "did not time out")
}
}

/// Tool body that completes well within the test timeout. Used as a
/// happy-path control to confirm the timeout race doesn't fire
/// spuriously on fast tools.
Expand Down Expand Up @@ -69,7 +89,6 @@ struct ToolRegistryTimeoutTests {
#expect(parsed?["kind"] as? String == "timeout")
#expect(parsed?["tool"] as? String == tool.name)
#expect(parsed?["retryable"] as? Bool == true)

// Wall-clock budget: the envelope shape above proves the timeout
// branch won. Keep the elapsed assertion tied to the fixture's
// slow-body duration so loaded CI has room for scheduler latency,
Expand All @@ -82,6 +101,28 @@ struct ToolRegistryTimeoutTests {
)
}

@Test
func blockingToolReturnsTimeoutWithoutWaitingForBodyToDrain() async throws {
let tool = BlockingSleepTool()
let started = Date()
let result = try await ToolRegistry.runToolBody(
tool,
argumentsJSON: "{}",
timeoutSeconds: 0.1
)
let elapsed = Date().timeIntervalSince(started)

#expect(ToolEnvelope.isError(result))
let data = result.data(using: .utf8)!
let parsed = try JSONSerialization.jsonObject(with: data) as? [String: Any]
#expect(parsed?["kind"] as? String == "timeout")
#expect(parsed?["tool"] as? String == tool.name)
// The body ignores cancellation and finishes after 5s. Keeping
// this below 4s proves the timeout path returned without draining
// the blocked body while leaving room for busy CI runners.
#expect(elapsed < 4.8, "took \(elapsed)s — timeout waited for the blocked body")
}

@Test
func fastToolReturnsItsOwnResultBeforeTimeoutFires() async throws {
let tool = FastEchoTool()
Expand Down
143 changes: 96 additions & 47 deletions Packages/OsaurusCore/Tools/ToolRegistry.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,74 @@
// Central registry for chat tools. Provides OpenAI tool specs and execution by name.
//

import Foundation
import Combine
import Dispatch
import Foundation

private let toolRegistryTimeoutQueue = DispatchQueue(
label: "ai.osaurus.tool-registry.timeout",
qos: .userInitiated
)

private final class ToolBodyTimeoutRaceState: @unchecked Sendable {
private let lock = NSLock()
private var continuation: CheckedContinuation<String, Never>?
private var bodyTask: Task<Void, Never>?
private var timeoutTimer: DispatchSourceTimer?
private var cancelBodyWhenSet = false
private var cancelTimeoutWhenSet = false

init(continuation: CheckedContinuation<String, Never>) {
self.continuation = continuation
}

func setBodyTask(_ body: Task<Void, Never>) {
lock.lock()
bodyTask = body
let shouldCancelBody = cancelBodyWhenSet
lock.unlock()

if shouldCancelBody { body.cancel() }
}

func setTimeoutTimer(_ timeout: DispatchSourceTimer) {
lock.lock()
timeoutTimer = timeout
let shouldCancelTimeout = cancelTimeoutWhenSet
lock.unlock()

if shouldCancelTimeout {
timeout.setEventHandler {}
timeout.cancel()
}
}

func finish(with result: String, cancelBody: Bool, cancelTimeout: Bool) {
lock.lock()
guard let continuation else {
lock.unlock()
return
}
self.continuation = nil

let bodyToCancel = cancelBody ? bodyTask : nil
let timeoutToCancel = timeoutTimer
if cancelBody, bodyTask == nil {
cancelBodyWhenSet = true
}
if cancelTimeout, timeoutTimer == nil {
cancelTimeoutWhenSet = true
}
bodyTask = nil
timeoutTimer = nil
lock.unlock()

bodyToCancel?.cancel()
timeoutToCancel?.setEventHandler {}
timeoutToCancel?.cancel()
continuation.resume(returning: result)
}
}

@MainActor
final class ToolRegistry: ObservableObject {
Expand Down Expand Up @@ -338,7 +404,7 @@ final class ToolRegistry: ObservableObject {
/// fall through unchanged: parsing is best-effort, and tool bodies
/// keep their richer `requireXxx` helpers as the second line of
/// defence.
private nonisolated static func preflight(
nonisolated private static func preflight(
argumentsJSON: String,
schema: JSONValue?,
toolName: String
Expand Down Expand Up @@ -392,14 +458,13 @@ final class ToolRegistry: ObservableObject {
/// tests can drive it with a small `timeoutSeconds` value without
/// waiting for the full 120s production budget.
///
/// Each branch of the race converts thrown errors (including
/// `CancellationError` from the loser when we `cancelAll`) into a
/// structured `ToolEnvelope` *inside* its child task. That keeps
/// `withTaskGroup` non-throwing and prevents the cancelled sibling's
/// post-return throw from reaching the caller as the function's
/// error — historically the slow-tool case rethrew CancellationError
/// and stalled while the group drained.
internal nonisolated static func runToolBody(
/// The body and timeout run as unstructured tasks rather than a task
/// group. That is intentional: task-group scope exit drains cancelled
/// children, so a non-cooperative tool body can still delay the timeout
/// response until it returns. The race state resumes the caller once and
/// cancels the loser without waiting for that loser to observe
/// cancellation.
nonisolated static func runToolBody(
_ tool: OsaurusTool,
argumentsJSON: String,
timeoutSeconds: TimeInterval
Expand All @@ -412,49 +477,33 @@ final class ToolRegistry: ObservableObject {
tool: toolName,
retryable: true
)
// Sentinel returned by the cancelled loser branch so the
// consumer loop knows to ignore it. Cannot collide with any
// legitimate envelope because real envelopes are JSON.
let cancelledSentinel = "__osaurus_runToolBody_cancelled__"
return await withCheckedContinuation { continuation in
let race = ToolBodyTimeoutRaceState(continuation: continuation)
let timeoutTimer = DispatchSource.makeTimerSource(queue: toolRegistryTimeoutQueue)
let nanos = Int((max(0, timeoutSeconds) * 1_000_000_000).rounded(.up))
timeoutTimer.schedule(deadline: .now() + .nanoseconds(nanos))
timeoutTimer.setEventHandler {
race.finish(with: timeoutEnvelope, cancelBody: true, cancelTimeout: false)
}
race.setTimeoutTimer(timeoutTimer)
timeoutTimer.resume()

return await withTaskGroup(of: String.self) { group in
group.addTask {
let bodyTask = Task {
do {
return try await tool.execute(argumentsJSON: argumentsJSON)
let result = try await tool.execute(argumentsJSON: argumentsJSON)
race.finish(with: result, cancelBody: false, cancelTimeout: true)
} catch is CancellationError {
return cancelledSentinel
// A cooperative loser should not overwrite the timeout
// envelope. If cancellation happened before the timeout
// fired, the timeout timer remains responsible for the
// structured result.
return
} catch {
return ToolEnvelope.fromError(error, tool: toolName)
let result = ToolEnvelope.fromError(error, tool: toolName)
race.finish(with: result, cancelBody: false, cancelTimeout: true)
}
}
group.addTask {
let nanos = UInt64(timeoutSeconds * 1_000_000_000)
do {
try await Task.sleep(nanoseconds: nanos)
} catch {
// Cancelled because the body finished first — yield
// the sentinel so the caller's first non-sentinel
// result wins.
return cancelledSentinel
}
return timeoutEnvelope
}

// The first non-sentinel result is the winner; cancel the
// sibling and let `withTaskGroup` auto-drain on closure
// return. The drain is safe because every child branch
// converts its own errors into envelope strings — there
// are no uncaught throws to surface.
for await result in group {
if result == cancelledSentinel { continue }
group.cancelAll()
return result
}
return ToolEnvelope.failure(
kind: .executionError,
message: "Tool '\(toolName)' produced no result.",
tool: toolName
)
race.setBodyTask(bodyTask)
}
}

Expand Down
Loading