Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/pink-gorillas-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"kilo-code": patch
---

fix: resolve AbortSignal memory leak in CLI (MaxListenersExceededWarning)
5 changes: 5 additions & 0 deletions .changeset/pretty-memes-lose.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"kilo-code": patch
---

Split autocomplete suggestion in current line and next lines in most cases
107 changes: 107 additions & 0 deletions cli/src/state/atoms/__tests__/keyboard.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import { textBufferStringAtom, textBufferStateAtom } from "../textBuffer.js"
import { keyboardHandlerAtom, submissionCallbackAtom, submitInputAtom } from "../keyboard.js"
import { pendingApprovalAtom } from "../approval.js"
import { historyDataAtom, historyModeAtom, historyIndexAtom as _historyIndexAtom } from "../history.js"
import { chatMessagesAtom } from "../extension.js"
import { extensionServiceAtom, isServiceReadyAtom } from "../service.js"
import type { Key } from "../../../types/keyboard.js"
import type { CommandSuggestion, ArgumentSuggestion, FileMentionSuggestion } from "../../../services/autocomplete.js"
import type { Command } from "../../../commands/core/types.js"
import type { ExtensionChatMessage } from "../../../types/messages.js"
import type { ExtensionService } from "../../../services/extension.js"

describe("keypress atoms", () => {
let store: ReturnType<typeof createStore>
Expand Down Expand Up @@ -981,4 +984,108 @@ describe("keypress atoms", () => {
expect(store.get(textBufferStringAtom)).toBe("")
})
})

describe("global hotkeys", () => {
beforeEach(() => {
// Mock the extension service to prevent "ExtensionService not available" error
const mockService: Partial<ExtensionService> = {
initialize: vi.fn(),
dispose: vi.fn(),
on: vi.fn(),
off: vi.fn(),
sendWebviewMessage: vi.fn().mockResolvedValue(undefined),
isReady: vi.fn().mockReturnValue(true),
}
store.set(extensionServiceAtom, mockService as ExtensionService)
store.set(isServiceReadyAtom, true)
})

it("should cancel task when ESC is pressed while streaming", async () => {
// Set up streaming state by adding a partial message
// isStreamingAtom returns true when the last message is partial
const streamingMessage: ExtensionChatMessage = {
ts: Date.now(),
type: "say",
say: "text",
text: "Processing...",
partial: true, // This makes isStreamingAtom return true
}
store.set(chatMessagesAtom, [streamingMessage])

// Type some text first
const chars = ["h", "e", "l", "l", "o"]
for (const char of chars) {
const key: Key = {
name: char,
sequence: char,
ctrl: false,
meta: false,
shift: false,
paste: false,
}
store.set(keyboardHandlerAtom, key)
}

// Verify we have text in the buffer
expect(store.get(textBufferStringAtom)).toBe("hello")

// Press ESC while streaming
const escapeKey: Key = {
name: "escape",
sequence: "\x1b",
ctrl: false,
meta: false,
shift: false,
paste: false,
}
await store.set(keyboardHandlerAtom, escapeKey)

// When streaming, ESC should cancel the task and NOT clear the buffer
// (because it returns early from handleGlobalHotkeys)
expect(store.get(textBufferStringAtom)).toBe("hello")
})

it("should clear buffer when ESC is pressed while NOT streaming", async () => {
// Set up non-streaming state by adding a complete message
const completeMessage: ExtensionChatMessage = {
ts: Date.now(),
type: "say",
say: "text",
text: "Done",
partial: false, // This makes isStreamingAtom return false
}
store.set(chatMessagesAtom, [completeMessage])

// Type some text
const chars = ["h", "e", "l", "l", "o"]
for (const char of chars) {
const key: Key = {
name: char,
sequence: char,
ctrl: false,
meta: false,
shift: false,
paste: false,
}
store.set(keyboardHandlerAtom, key)
}

// Verify we have text in the buffer
expect(store.get(textBufferStringAtom)).toBe("hello")

// Press ESC while NOT streaming
const escapeKey: Key = {
name: "escape",
sequence: "\x1b",
ctrl: false,
meta: false,
shift: false,
paste: false,
}
await store.set(keyboardHandlerAtom, escapeKey)

// When not streaming, ESC should clear the buffer (normal behavior)
expect(store.get(textBufferStringAtom)).toBe("")
})
})
})
11 changes: 11 additions & 0 deletions cli/src/state/atoms/keyboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -803,10 +803,21 @@ function handleGlobalHotkeys(get: Getter, set: Setter, key: Key): boolean {
const isStreaming = get(isStreamingAtom)
if (isStreaming) {
set(cancelTaskAtom)
return true
}
// If not streaming, don't consume the key
}
break
case "escape": {
// ESC cancels the task when streaming (same as Ctrl+X)
const isStreaming = get(isStreamingAtom)
if (isStreaming) {
set(cancelTaskAtom)
return true
}
// If not streaming, don't consume the key - let mode-specific handlers deal with it
break
}
case "r":
if (key.ctrl) {
const hasResumeTask = get(hasResumeTaskAtom)
Expand Down
2 changes: 1 addition & 1 deletion cli/src/state/hooks/useHotkeys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export function useHotkeys(): UseHotkeysReturn {

// Priority 3: Streaming state - show cancel
if (isStreaming) {
return [{ keys: `${modifierKey}+X`, description: "to cancel" }]
return [{ keys: `Esc/${modifierKey}+X`, description: "to cancel" }]
}

// Priority 4: Followup suggestions visible
Expand Down
74 changes: 50 additions & 24 deletions src/core/task/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2566,26 +2566,38 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
>()
// kilocode_change end

let streamAbortSignal: AbortSignal | undefined
let streamAbortListener: (() => void) | undefined
let streamAbortPromise: Promise<never> | undefined

try {
const iterator = stream[Symbol.asyncIterator]()

const ensureAbortPromise = (): void => {
if (streamAbortPromise || !this.currentRequestAbortController) {
return
}

streamAbortSignal = this.currentRequestAbortController.signal
streamAbortPromise = new Promise<never>((_, reject) => {
if (streamAbortSignal!.aborted) {
reject(new Error("Request cancelled by user"))
} else {
streamAbortListener = () => reject(new Error("Request cancelled by user"))
streamAbortSignal!.addEventListener("abort", streamAbortListener)
}
})
}

// Helper to race iterator.next() with abort signal
const nextChunkWithAbort = async () => {
const nextPromise = iterator.next()

// If we have an abort controller, race it with the next chunk
if (this.currentRequestAbortController) {
const abortPromise = new Promise<never>((_, reject) => {
const signal = this.currentRequestAbortController!.signal
if (signal.aborted) {
reject(new Error("Request cancelled by user"))
} else {
signal.addEventListener("abort", () => {
reject(new Error("Request cancelled by user"))
})
}
})
return await Promise.race([nextPromise, abortPromise])
// If we have an abort controller, race it with the next chunk.
// Reuse a single abort promise/listener across all chunks to avoid accumulating listeners.
ensureAbortPromise()
if (streamAbortPromise) {
return await Promise.race([nextPromise, streamAbortPromise])
}

// No abort controller, just return the next chunk normally
Expand Down Expand Up @@ -3176,6 +3188,9 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
}
} finally {
this.isStreaming = false
if (streamAbortSignal && streamAbortListener) {
streamAbortSignal.removeEventListener("abort", streamAbortListener)
}
// Clean up the abort controller when streaming completes
this.currentRequestAbortController = undefined
}
Expand Down Expand Up @@ -4004,10 +4019,24 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
)
const iterator = stream[Symbol.asyncIterator]()

// Set up abort handling - when the signal is aborted, clean up the controller reference
abortSignal.addEventListener("abort", () => {
// Set up abort handling - store listener reference for cleanup
// to avoid accumulating listeners on the AbortSignal
const abortCleanupListener = () => {
console.log(`[Task#${this.taskId}.${this.instanceId}] AbortSignal triggered for current request`)
this.currentRequestAbortController = undefined
}
abortSignal.addEventListener("abort", abortCleanupListener)

// Create a single abort promise/listener for racing with first chunk
// to avoid accumulating listeners per attempt
let firstChunkAbortListener: (() => void) | undefined
const abortPromise = new Promise<never>((_, reject) => {
if (abortSignal.aborted) {
reject(new Error("Request cancelled by user"))
} else {
firstChunkAbortListener = () => reject(new Error("Request cancelled by user"))
abortSignal.addEventListener("abort", firstChunkAbortListener)
}
})

try {
Expand All @@ -4016,15 +4045,6 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {

// Race between the first chunk and the abort signal
const firstChunkPromise = iterator.next()
const abortPromise = new Promise<never>((_, reject) => {
if (abortSignal.aborted) {
reject(new Error("Request cancelled by user"))
} else {
abortSignal.addEventListener("abort", () => {
reject(new Error("Request cancelled by user"))
})
}
})

const firstChunk = await Promise.race([firstChunkPromise, abortPromise])
yield firstChunk.value
Expand Down Expand Up @@ -4130,6 +4150,12 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
Task.lastGlobalApiRequestTime = performance.now()
}
// kilocode_change end

// Clean up abort listeners to prevent memory leaks
abortSignal.removeEventListener("abort", abortCleanupListener)
if (firstChunkAbortListener) {
abortSignal.removeEventListener("abort", firstChunkAbortListener)
}
}

// Shared exponential backoff for retries (first-chunk and mid-stream)
Expand Down
Loading