Skip to content

Commit 1faefeb

Browse files
committed
fix(stream-plugin): trust server-side avatar go-live readiness
1 parent 8e5e11c commit 1faefeb

2 files changed

Lines changed: 228 additions & 6 deletions

File tree

src/actions/legacyCompat.test.ts

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,160 @@ describe('legacyCompatibilityActions', () => {
193193
},
194194
);
195195

196+
it(
197+
'trusts server-reported avatar ready phases from stream/start without a second hard failure gate',
198+
{ timeout: 10000 },
199+
async () => {
200+
setEnv('STREAM555_BASE_URL', 'https://stream.example');
201+
setEnv('STREAM555_AGENT_TOKEN', 'static-token');
202+
203+
const stopCalls: string[] = [];
204+
const service = {
205+
getCurrentSessionId: () => 'session-1',
206+
getBoundSessionId: () => 'session-1',
207+
getConfig: () => ({
208+
baseUrl: 'https://stream.example',
209+
agentToken: 'static-token',
210+
defaultSessionId: 'session-1',
211+
}),
212+
stopStream: async (sessionId?: string) => {
213+
stopCalls.push(sessionId ?? '');
214+
return { stopped: true, wasActive: true };
215+
},
216+
} as unknown as StreamControlService;
217+
const runtime = {
218+
getService: (name: string) => (name === 'stream555' ? service : undefined),
219+
} as IAgentRuntime;
220+
221+
const requestedUrls: string[] = [];
222+
globalThis.fetch = (async (input: unknown) => {
223+
requestedUrls.push(String(input));
224+
return buildJsonResponse(201, {
225+
status: 'started',
226+
phase: 'outputs_pending',
227+
sessionId: 'session-1',
228+
active: true,
229+
cfSessionId: 'cf-session-1',
230+
publisher: 'capture_service_rtmps',
231+
statusReason: 'ingest connected, waiting for outputs: twitch(pending), kick(pending)',
232+
requiredOutputsReady: false,
233+
cloudflare: { isConnected: false, state: 'unknown' },
234+
platforms: {
235+
twitch: { enabled: true, status: 'idle' },
236+
kick: { enabled: true, status: 'idle' },
237+
},
238+
});
239+
}) as typeof fetch;
240+
241+
const callbackPayloads: Array<Record<string, unknown>> = [];
242+
const action = findAction('STREAM555_GO_LIVE');
243+
const ok = await action.handler(
244+
runtime,
245+
{},
246+
undefined,
247+
{
248+
sessionId: 'session-1',
249+
inputType: 'avatar',
250+
layoutMode: 'camera-full',
251+
},
252+
(payload) => {
253+
callbackPayloads.push(payload as Record<string, unknown>);
254+
},
255+
);
256+
257+
assert.equal(ok, true);
258+
assert.equal(requestedUrls.length, 1);
259+
assert.match(requestedUrls[0] ?? '', /\/stream\/start$/);
260+
assert.equal(stopCalls.length, 0);
261+
262+
const lastPayload = callbackPayloads.at(-1) as {
263+
content?: { success?: boolean; data?: Record<string, unknown> };
264+
};
265+
assert.equal(lastPayload.content?.success, true);
266+
assert.equal(lastPayload.content?.data?.phase, 'outputs_pending');
267+
assert.equal(lastPayload.content?.data?.cfSessionId, 'cf-session-1');
268+
assert.equal(lastPayload.content?.data?.requiredOutputsReady, false);
269+
assert.equal(lastPayload.content?.data?.statusReason, 'ingest connected, waiting for outputs: twitch(pending), kick(pending)');
270+
},
271+
);
272+
273+
it(
274+
'accepts ready avatar phases observed during status polling even if Cloudflare lagged the boolean flag',
275+
{ timeout: 10000 },
276+
async () => {
277+
setEnv('STREAM555_BASE_URL', 'https://stream.example');
278+
setEnv('STREAM555_AGENT_TOKEN', 'static-token');
279+
280+
const stopCalls: string[] = [];
281+
const service = {
282+
getCurrentSessionId: () => 'session-1',
283+
getBoundSessionId: () => 'session-1',
284+
getConfig: () => ({
285+
baseUrl: 'https://stream.example',
286+
agentToken: 'static-token',
287+
defaultSessionId: 'session-1',
288+
}),
289+
stopStream: async (sessionId?: string) => {
290+
stopCalls.push(sessionId ?? '');
291+
return { stopped: true, wasActive: true };
292+
},
293+
} as unknown as StreamControlService;
294+
const runtime = {
295+
getService: (name: string) => (name === 'stream555' ? service : undefined),
296+
} as IAgentRuntime;
297+
298+
const requestedUrls: string[] = [];
299+
const responses = [
300+
buildJsonResponse(201, { status: 'started', cfSessionId: 'cf-session-1' }),
301+
buildJsonResponse(200, {
302+
sessionId: 'session-1',
303+
active: true,
304+
phase: 'outputs_pending',
305+
cfSessionId: 'cf-session-1',
306+
publisher: 'capture_service_rtmps',
307+
statusReason: 'ingest connected, waiting for outputs: twitch(pending)',
308+
requiredOutputsReady: false,
309+
cloudflare: { isConnected: false, state: 'unknown' },
310+
platforms: { twitch: { enabled: true, status: 'idle' } },
311+
}),
312+
];
313+
globalThis.fetch = (async (input: unknown) => {
314+
requestedUrls.push(String(input));
315+
const next = responses.shift();
316+
assert.ok(next, 'unexpected fetch');
317+
return next;
318+
}) as typeof fetch;
319+
320+
const callbackPayloads: Array<Record<string, unknown>> = [];
321+
const action = findAction('STREAM555_GO_LIVE');
322+
const ok = await action.handler(
323+
runtime,
324+
{},
325+
undefined,
326+
{
327+
sessionId: 'session-1',
328+
inputType: 'avatar',
329+
layoutMode: 'camera-full',
330+
},
331+
(payload) => {
332+
callbackPayloads.push(payload as Record<string, unknown>);
333+
},
334+
);
335+
336+
assert.equal(ok, true);
337+
assert.equal(stopCalls.length, 0);
338+
assert.match(requestedUrls[0] ?? '', /\/stream\/start$/);
339+
assert.match(requestedUrls[1] ?? '', /\/stream\/status$/);
340+
341+
const lastPayload = callbackPayloads.at(-1) as {
342+
content?: { success?: boolean; data?: Record<string, unknown> };
343+
};
344+
assert.equal(lastPayload.content?.success, true);
345+
assert.equal(lastPayload.content?.data?.phase, 'outputs_pending');
346+
assert.equal(lastPayload.content?.data?.requiredOutputsReady, false);
347+
},
348+
);
349+
196350
it(
197351
'prefers fresh API-key exchange over a stale configured bearer for go-live',
198352
{ timeout: 10000 },

src/actions/legacyCompat.ts

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,12 @@ interface StreamReadinessSnapshot {
6767
active: boolean;
6868
jobId?: string;
6969
cfSessionId?: string;
70+
phase?: string;
71+
publisher?: string;
7072
cloudflareConnected: boolean;
7173
cloudflareState?: string;
74+
requiredOutputsReady?: boolean;
75+
statusReason?: string;
7276
platforms: Record<string, unknown>;
7377
raw?: JsonObject;
7478
}
@@ -588,13 +592,65 @@ async function fetchStreamReadinessSnapshot(
588592
active: Boolean(response.data?.active),
589593
jobId: getStringField(response.data, 'jobId'),
590594
cfSessionId: getStringField(response.data, 'cfSessionId'),
595+
phase: getStringField(response.data, 'phase'),
596+
publisher: getStringField(response.data, 'publisher'),
591597
cloudflareConnected: Boolean(cloudflare?.isConnected),
592598
cloudflareState: getStringField(cloudflare, 'state'),
599+
requiredOutputsReady:
600+
typeof response.data?.requiredOutputsReady === 'boolean'
601+
? response.data.requiredOutputsReady
602+
: undefined,
603+
statusReason: getStringField(response.data, 'statusReason'),
593604
platforms,
594605
raw: response.data,
595606
};
596607
}
597608

609+
function extractReadinessSnapshot(
610+
sessionId: string,
611+
data: JsonObject | undefined,
612+
): StreamReadinessSnapshot | undefined {
613+
if (!data) return undefined;
614+
const cloudflare = getObject(data.cloudflare);
615+
const platforms = getObject(data.platforms) ?? {};
616+
return {
617+
sessionId,
618+
active: Boolean(data.active),
619+
jobId: getStringField(data, 'jobId'),
620+
cfSessionId: getStringField(data, 'cfSessionId'),
621+
phase: getStringField(data, 'phase'),
622+
publisher: getStringField(data, 'publisher'),
623+
cloudflareConnected: Boolean(cloudflare?.isConnected),
624+
cloudflareState: getStringField(cloudflare, 'state'),
625+
requiredOutputsReady:
626+
typeof data.requiredOutputsReady === 'boolean' ? data.requiredOutputsReady : undefined,
627+
statusReason: getStringField(data, 'statusReason'),
628+
platforms,
629+
raw: data,
630+
};
631+
}
632+
633+
function isReadyPhase(phase: string | undefined): boolean {
634+
const normalized = typeof phase === 'string' ? phase.trim().toLowerCase() : '';
635+
return (
636+
normalized === 'live' ||
637+
normalized === 'ingest_connected' ||
638+
normalized === 'outputs_pending'
639+
);
640+
}
641+
642+
function isStreamReadinessSatisfied(
643+
snapshot: StreamReadinessSnapshot | undefined,
644+
): boolean {
645+
if (!snapshot?.active || !snapshot.cfSessionId) {
646+
return false;
647+
}
648+
if (snapshot.cloudflareConnected) {
649+
return true;
650+
}
651+
return isReadyPhase(snapshot.phase);
652+
}
653+
598654
async function waitForStreamReadiness(
599655
baseUrl: string,
600656
headers: Record<string, string>,
@@ -607,11 +663,7 @@ async function waitForStreamReadiness(
607663

608664
while (Date.now() <= deadline) {
609665
lastSnapshot = await fetchStreamReadinessSnapshot(baseUrl, headers, sessionId);
610-
if (
611-
lastSnapshot.active &&
612-
lastSnapshot.cfSessionId &&
613-
lastSnapshot.cloudflareConnected
614-
) {
666+
if (isStreamReadinessSatisfied(lastSnapshot)) {
615667
return { ready: true, lastSnapshot };
616668
}
617669
if (Date.now() >= deadline) break;
@@ -786,10 +838,15 @@ const goLiveAction: Action = {
786838
},
787839
);
788840
let initialSnapshot: StreamReadinessSnapshot | undefined;
841+
let immediateReadySnapshot: StreamReadinessSnapshot | undefined;
789842
const responseCfSessionId = getStringField(startResponse.data, 'cfSessionId');
843+
if (startResponse.ok) {
844+
immediateReadySnapshot = extractReadinessSnapshot(sessionId, startResponse.data);
845+
}
790846
if (!startResponse.ok) {
791847
if (startResponse.status === 409) {
792848
initialSnapshot = await fetchStreamReadinessSnapshot(baseUrl, headers, sessionId);
849+
immediateReadySnapshot = initialSnapshot;
793850
}
794851
if (!(startResponse.status === 409 && (responseCfSessionId || initialSnapshot?.cfSessionId))) {
795852
throw new Error(
@@ -798,7 +855,9 @@ const goLiveAction: Action = {
798855
}
799856
}
800857

801-
const readiness = await waitForStreamReadiness(baseUrl, headers, sessionId);
858+
const readiness = isStreamReadinessSatisfied(immediateReadySnapshot)
859+
? { ready: true, lastSnapshot: immediateReadySnapshot }
860+
: await waitForStreamReadiness(baseUrl, headers, sessionId);
802861
if (!readiness.ready) {
803862
try {
804863
await service.stopStream(sessionId);
@@ -832,8 +891,17 @@ const goLiveAction: Action = {
832891
readiness.lastSnapshot?.cfSessionId ??
833892
responseCfSessionId ??
834893
initialSnapshot?.cfSessionId,
894+
phase: readiness.lastSnapshot?.phase ?? getStringField(startResponse.data, 'phase'),
895+
publisher: readiness.lastSnapshot?.publisher ?? getStringField(startResponse.data, 'publisher'),
835896
cloudflareConnected: readiness.lastSnapshot?.cloudflareConnected ?? false,
836897
cloudflareState: readiness.lastSnapshot?.cloudflareState,
898+
requiredOutputsReady:
899+
readiness.lastSnapshot?.requiredOutputsReady ??
900+
(typeof startResponse.data?.requiredOutputsReady === 'boolean'
901+
? startResponse.data.requiredOutputsReady
902+
: undefined),
903+
statusReason:
904+
readiness.lastSnapshot?.statusReason ?? getStringField(startResponse.data, 'statusReason'),
837905
destinationSync,
838906
platformStatuses: readiness.lastSnapshot?.platforms,
839907
streamStatus: readiness.lastSnapshot?.raw,

0 commit comments

Comments
 (0)