Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions packages/api/src/infrastructure/email/ReviewFeedbackTaskSpec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface ReviewFeedbackSignal {
prNumber: number;
newComments: PrFeedbackComment[];
newDecisions: PrReviewDecision[];
commitCursor: () => void;
commitCursor: () => Promise<void>;
}

export interface ReviewFeedbackTaskSpecOptions {
Expand All @@ -45,6 +45,48 @@ export function createReviewFeedbackTaskSpec(opts: ReviewFeedbackTaskSpecOptions
const commentCursors = new Map<string, number>();
const reviewCursors = new Map<string, number>();

/**
* Advance cursor: persist to store + update in-memory map.
*
* Two policies (matching blast radius of each failure mode):
* - persistFirst (echo-skip): no delivery happened → persist first, skip memory on failure → safe retry
* - memoryFirst (post-delivery): notification sent → advance memory first → prevent duplicate spam
*/
async function advanceCursor(
taskId: string,
prKey: string,
cursors: { comment: number; decision: number },
policy: 'persistFirst' | 'memoryFirst',
): Promise<void> {
const patch = {
review: {
lastCommentCursor: cursors.comment,
lastDecisionCursor: cursors.decision,
...(policy === 'memoryFirst' ? { lastNotifiedAt: Date.now() } : {}),
},
};
const setMemory = () => {
commentCursors.set(prKey, cursors.comment);
reviewCursors.set(prKey, cursors.decision);
};

if (policy === 'memoryFirst') {
setMemory();
try {
await opts.taskStore.patchAutomationState(taskId, patch);
} catch (e) {
opts.log.warn(`[review-feedback] cursor persist failed for ${prKey}, restart may replay`, e);
}
} else {
try {
await opts.taskStore.patchAutomationState(taskId, patch);
setMemory();
} catch (e) {
opts.log.warn(`[review-feedback] echo-skip persist failed for ${prKey}, will retry next tick`, e);
}
}
}

return {
id: 'review-feedback',
profile: 'poller',
Expand All @@ -71,8 +113,9 @@ export function createReviewFeedbackTaskSpec(opts: ReviewFeedbackTaskSpecOptions
opts.fetchReviews(repoFullName, prNumber),
]);

const commentCursor = commentCursors.get(prKey) ?? 0;
const reviewCursor = reviewCursors.get(prKey) ?? 0;
// #406: Seed from persisted automationState.review on first access (survives restart)
const commentCursor = commentCursors.get(prKey) ?? task.automationState?.review?.lastCommentCursor ?? 0;
const reviewCursor = reviewCursors.get(prKey) ?? task.automationState?.review?.lastDecisionCursor ?? 0;

const allNewComments = comments.filter((c) => c.id > commentCursor);
const allNewReviews = reviews.filter((r) => r.id > reviewCursor);
Expand All @@ -89,8 +132,7 @@ export function createReviewFeedbackTaskSpec(opts: ReviewFeedbackTaskSpecOptions
const allSkipped = newComments.length === 0 && newDecisions.length === 0;
const hadNewItems = allNewComments.length > 0 || allNewReviews.length > 0;
if (hadNewItems && allSkipped) {
commentCursors.set(prKey, maxCommentId);
reviewCursors.set(prKey, maxReviewId);
await advanceCursor(task.id, prKey, { comment: maxCommentId, decision: maxReviewId }, 'persistFirst');
continue;
}

Expand All @@ -103,10 +145,8 @@ export function createReviewFeedbackTaskSpec(opts: ReviewFeedbackTaskSpecOptions
prNumber,
newComments,
newDecisions,
commitCursor: () => {
commentCursors.set(prKey, maxCommentId);
reviewCursors.set(prKey, maxReviewId);
},
commitCursor: () =>
advanceCursor(task.id, prKey, { comment: maxCommentId, decision: maxReviewId }, 'memoryFirst'),
},
// #320 KD-15: unified subject_key format
subjectKey: task.subjectKey!,
Expand Down Expand Up @@ -144,7 +184,7 @@ export function createReviewFeedbackTaskSpec(opts: ReviewFeedbackTaskSpecOptions

if (routeResult.kind !== 'notified') return;

signal.commitCursor();
await signal.commitCursor();

if (opts.invokeTrigger) {
try {
Expand Down
238 changes: 236 additions & 2 deletions packages/api/test/scheduler/review-feedback-spec.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,25 @@ function mockTask(pr, overrides = {}) {
}

function mockTaskStore(tasks) {
return { listByKind: async () => tasks };
const patchCalls = [];
return {
listByKind: async () => tasks,
patchAutomationState: async (taskId, patch) => {
patchCalls.push({ taskId, patch });
const task = tasks.find((t) => t.id === taskId);
if (!task) return null;
// Return merged copy — do NOT mutate shared mock objects
return {
...task,
automationState: {
...task.automationState,
...patch,
review: patch.review ? { ...task.automationState?.review, ...patch.review } : task.automationState?.review,
},
};
},
_patchCalls: patchCalls,
};
}

function stubRouter(kind = 'notified') {
Expand Down Expand Up @@ -136,7 +154,7 @@ describe('ReviewFeedbackTaskSpec', () => {
const r1 = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(r1.run, true);
// Simulate execute → commitCursor
r1.workItems[0].signal.commitCursor();
await r1.workItems[0].signal.commitCursor();

// Second gate: same comment, should be filtered out
const r2 = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 2 });
Expand Down Expand Up @@ -561,6 +579,222 @@ describe('ReviewFeedbackTaskSpec', () => {
assert.equal(policy.suggestedSkill, undefined);
});

// ── #406: restart cursor persistence ──

it('gate seeds cursor from automationState.review on fresh instance (#406)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const { router } = stubRouter();
// Task has persisted cursor at comment=5, decision=3
const taskWithCursors = mockTask(
{ repoFullName: 'owner/repo', prNumber: 42, catId: 'opus', threadId: 'th-1', userId: 'u-1' },
{
automationState: {
review: { lastCommentCursor: 5, lastDecisionCursor: 3 },
},
},
);
const spec = createReviewFeedbackTaskSpec({
taskStore: mockTaskStore([taskWithCursors]),
// Comments with id <= 5 should be skipped (below persisted cursor)
fetchComments: async () => [
{ id: 3, author: 'old', body: 'old comment', createdAt: '2026-01-01', commentType: 'conversation' },
{ id: 5, author: 'old', body: 'last seen', createdAt: '2026-01-01', commentType: 'conversation' },
{ id: 8, author: 'alice', body: 'new comment', createdAt: '2026-01-02', commentType: 'conversation' },
],
fetchReviews: async () => [
{ id: 2, author: 'old', state: 'COMMENTED', body: 'old', submittedAt: '2026-01-01' },
{ id: 3, author: 'old', state: 'APPROVED', body: 'old', submittedAt: '2026-01-01' },
],
reviewFeedbackRouter: router,
log: noopLog,
});
const result = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(result.run, true);
// Only comment id=8 is new (above cursor 5)
assert.equal(result.workItems[0].signal.newComments.length, 1);
assert.equal(result.workItems[0].signal.newComments[0].id, 8);
// Reviews id=2,3 are at/below cursor 3 → none new
assert.equal(result.workItems[0].signal.newDecisions.length, 0);
});

it('gate excludes done tasks — no work items, no fetch (#406 regression)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const { router } = stubRouter();
let fetchCalled = false;
const doneTask = mockTask(
{ repoFullName: 'owner/repo', prNumber: 99, catId: 'opus', threadId: 'th-done', userId: 'u-1' },
{ status: 'done' },
);
const spec = createReviewFeedbackTaskSpec({
taskStore: mockTaskStore([doneTask]),
fetchComments: async () => {
fetchCalled = true;
return [{ id: 1, author: 'alice', body: 'new', createdAt: '2026-01-01', commentType: 'conversation' }];
},
fetchReviews: async () => [],
reviewFeedbackRouter: router,
log: noopLog,
});
const result = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(result.run, false, 'done task must be excluded from gate');
assert.equal(fetchCalled, false, 'should not even fetch comments for done tasks');
});

it('gate returns run:false when all items below persisted cursor (#406)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const { router } = stubRouter();
const taskWithCursors = mockTask(
{ repoFullName: 'owner/repo', prNumber: 42, catId: 'opus', threadId: 'th-1', userId: 'u-1' },
{
automationState: {
review: { lastCommentCursor: 10, lastDecisionCursor: 5 },
},
},
);
const spec = createReviewFeedbackTaskSpec({
taskStore: mockTaskStore([taskWithCursors]),
fetchComments: async () => [
{ id: 3, author: 'old', body: 'old', createdAt: '2026-01-01', commentType: 'conversation' },
{ id: 8, author: 'old', body: 'old', createdAt: '2026-01-01', commentType: 'conversation' },
],
fetchReviews: async () => [{ id: 2, author: 'old', state: 'APPROVED', body: '', submittedAt: '2026-01-01' }],
reviewFeedbackRouter: router,
log: noopLog,
});
const result = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(result.run, false, 'no new items above persisted cursor');
});

it('commitCursor persists to automationState.review via patchAutomationState (#406)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const store = mockTaskStore([mockTaskItem]);
const spec = createReviewFeedbackTaskSpec({
taskStore: store,
fetchComments: async () => [
{ id: 7, author: 'alice', body: 'new', createdAt: '2026-01-01', commentType: 'conversation' },
],
fetchReviews: async () => [{ id: 4, author: 'bob', state: 'APPROVED', body: 'LGTM', submittedAt: '2026-01-01' }],
reviewFeedbackRouter: {
async route() {
return { kind: 'notified', threadId: 'th-1', catId: 'opus', messageId: 'm1', content: 'fb' };
},
},
log: noopLog,
});
const gateResult = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(gateResult.run, true);
await spec.run.execute(gateResult.workItems[0].signal, 'pr:owner/repo#42', {});

// Verify patchAutomationState was called with correct cursor values
assert.equal(store._patchCalls.length, 1);
const call = store._patchCalls[0];
assert.equal(call.taskId, mockTaskItem.id);
assert.equal(call.patch.review.lastCommentCursor, 7);
assert.equal(call.patch.review.lastDecisionCursor, 4);
assert.equal(typeof call.patch.review.lastNotifiedAt, 'number');
});

it('echo-skip path also persists cursor to automationState (#406)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const store = mockTaskStore([mockTaskItem]);
const spec = createReviewFeedbackTaskSpec({
taskStore: store,
fetchComments: async () => [
{ id: 10, author: 'self', body: '@codex review', createdAt: '2026-01-01', commentType: 'conversation' },
],
fetchReviews: async () => [],
reviewFeedbackRouter: {
async route() {
return { kind: 'skipped', reason: 'test' };
},
},
log: noopLog,
isEchoComment: (c) => c.author === 'self',
});
const result = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(result.run, false, 'all echo → skip');

// Echo-skip should still persist cursor
assert.equal(store._patchCalls.length, 1);
assert.equal(store._patchCalls[0].patch.review.lastCommentCursor, 10);
});

it('echo-skip persist failure logs warning and allows retry next tick (#406 P2)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const warnings = [];
const failingStore = {
listByKind: async () => [mockTaskItem],
patchAutomationState: async () => {
throw new Error('Redis unavailable');
},
_patchCalls: [],
};
const spec = createReviewFeedbackTaskSpec({
taskStore: failingStore,
fetchComments: async () => [
{ id: 10, author: 'self', body: '@codex review', createdAt: '2026-01-01', commentType: 'conversation' },
],
fetchReviews: async () => [],
reviewFeedbackRouter: {
async route() {
return { kind: 'skipped', reason: 'test' };
},
},
log: { ...noopLog, warn: (...args) => warnings.push(args) },
isEchoComment: (c) => c.author === 'self',
});

// First gate: persist fails, warn logged, memory NOT advanced
const r1 = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(r1.run, false, 'echo-skip still returns run:false');
assert.ok(warnings.length > 0, 'should log warning on persist failure');
assert.ok(warnings[0][0].includes('echo-skip persist failed'), 'warning message identifies echo-skip');

// Second gate: same echo comment retried (memory cursor was NOT advanced)
warnings.length = 0;
const r2 = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 2 });
assert.equal(r2.run, false, 'echo comment still filtered on retry');
assert.ok(warnings.length > 0, 'retry also attempts persist and logs');
});

it('commitCursor persist failure after delivery still advances memory cursor (no duplicate spam)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const warnings = [];
const failingStore = {
listByKind: async () => [mockTaskItem],
patchAutomationState: async () => {
throw new Error('Redis unavailable');
},
_patchCalls: [],
};
const spec = createReviewFeedbackTaskSpec({
taskStore: failingStore,
fetchComments: async () => [
{ id: 7, author: 'alice', body: 'new review', createdAt: '2026-01-01', commentType: 'conversation' },
],
fetchReviews: async () => [],
reviewFeedbackRouter: {
async route() {
return { kind: 'notified', threadId: 'th-1', catId: 'opus', messageId: 'm1', content: 'fb' };
},
},
log: { ...noopLog, warn: (...args) => warnings.push(args) },
});

// Gate + execute: delivery succeeds, persist fails
const gateResult = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 1 });
assert.equal(gateResult.run, true);
await spec.run.execute(gateResult.workItems[0].signal, 'pr:owner/repo#42', {});

// Persist failed → warn logged
assert.ok(warnings.length > 0, 'should log warning on persist failure');
assert.ok(warnings[0][0].includes('cursor persist failed'), 'warning identifies cursor persist');

// But memory cursor advanced → next gate does NOT re-deliver (no duplicate spam)
const r2 = await spec.admission.gate({ taskId: spec.id, lastRunAt: null, tickCount: 2 });
assert.equal(r2.run, false, 'memory cursor prevents duplicate delivery');
});

it('non-authoritative bot comment is NOT filtered (Rule B negative)', async () => {
const { createReviewFeedbackTaskSpec } = await import('../../dist/infrastructure/email/ReviewFeedbackTaskSpec.js');
const { router } = stubRouter();
Expand Down
Loading
Loading