diff --git a/jetstream/consumer.ts b/jetstream/consumer.ts index 2bb1049a..50d50ad8 100644 --- a/jetstream/consumer.ts +++ b/jetstream/consumer.ts @@ -977,6 +977,7 @@ export class OrderedPullConsumerImpl implements Consumer { iter: OrderedConsumerMessages | null; type: PullConsumerType; startSeq: number; + maxInitialReset: number; constructor( api: ConsumerAPI, @@ -998,6 +999,7 @@ export class OrderedPullConsumerImpl implements Consumer { this.iter = null; this.type = PullConsumerType.Unset; this.consumerOpts = opts; + this.maxInitialReset = 30; // to support a random start sequence we need to update the cursor this.startSeq = this.consumerOpts.opt_start_seq || 0; @@ -1067,6 +1069,7 @@ export class OrderedPullConsumerImpl implements Consumer { } async resetConsumer(seq = 0): Promise { + const isNew = this.serial === 0; // try to delete the consumer this.consumer?.delete().catch(() => {}); seq = seq === 0 ? 1 : seq; @@ -1096,7 +1099,7 @@ export class OrderedPullConsumerImpl implements Consumer { } } - if (seq === 0 && i >= 30) { + if (isNew && i >= this.maxInitialReset) { // consumer was never created, so we can fail this throw err; } else { @@ -1294,7 +1297,7 @@ export class OrderedPullConsumerImpl implements Consumer { async info(cached?: boolean): Promise { if (this.currentConsumer == null) { - this.currentConsumer = await this.resetConsumer(this.serial); + this.currentConsumer = await this.resetConsumer(this.startSeq); return Promise.resolve(this.currentConsumer); } if (cached && this.currentConsumer) { diff --git a/jetstream/tests/consumers_ordered_test.ts b/jetstream/tests/consumers_ordered_test.ts index 27bca953..a2e914f3 100644 --- a/jetstream/tests/consumers_ordered_test.ts +++ b/jetstream/tests/consumers_ordered_test.ts @@ -1052,3 +1052,24 @@ Deno.test("ordered consumers - next reset", async () => { await cleanup(ns, nc); }); + +Deno.test("ordered consumers - initial creation fails, consumer fails", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + + await jsm.streams.add({ name: "A", subjects: ["a"] }); + const js = nc.jetstream(); + + const c = await js.consumers.get("A") as OrderedPullConsumerImpl; + await jsm.streams.delete("A"); + c.maxInitialReset = 3; + await assertRejects( + () => { + return c.consume(); + }, + Error, + "stream not found", + ); + + await cleanup(ns, nc); +});