Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getPullConsumerFor(ci) method to get a by Consumer with a ConsumerInfo (bypassing additional lookup) #736

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions jetstream/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
} from "../nats-base-client/core.ts";
import {
ApiPagedRequest,
ConsumerInfo,
ExternalStream,
MsgDeleteRequest,
MsgRequest,
Expand Down Expand Up @@ -110,6 +111,13 @@ export class ConsumersImpl implements Consumers {
return Promise.resolve();
}

getPullConsumerFor(ci: ConsumerInfo): Consumer {
if (ci.config.deliver_subject !== undefined) {
throw new Error("push consumer not supported");
}
return new PullConsumerImpl(this.api, ci);
}

async get(
stream: string,
name: string | Partial<OrderedConsumerOptions> = {},
Expand Down Expand Up @@ -196,6 +204,11 @@ export class StreamImpl implements Stream {
});
}

getConsumerFromInfo(ci: ConsumerInfo): Consumer {
return new ConsumersImpl(new ConsumerAPIImpl(this.api.nc, this.api.opts))
.getPullConsumerFor(ci);
}

getConsumer(
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer> {
Expand Down
96 changes: 95 additions & 1 deletion jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
assertRejects,
assertStringIncludes,
} from "https://deno.land/[email protected]/assert/mod.ts";
import { deferred, nanos } from "../../nats-base-client/mod.ts";
import { deferred, delay, nanos } from "../../nats-base-client/mod.ts";
import {
AckPolicy,
Consumer,
Expand Down Expand Up @@ -551,3 +551,97 @@ Deno.test("consumers - inboxPrefix is respected", async () => {
await done;
await cleanup(ns, nc);
});

Deno.test("consumers - getPullConsumerFor", async () => {
const { ns, nc } = await setup(jetstreamServerConf(), { inboxPrefix: "x" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");

const ci = await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

let c = 0;
nc.subscribe("$JS.API.CONSUMER.INFO.>", {
callback: (_) => {
c++;
},
});

const consumer = js.consumers.getPullConsumerFor(ci);
const iter = await consumer.consume({ bind: true });

for await (const _m of iter) {
break;
}

await nc.flush();
assertEquals(c, 0);

await cleanup(ns, nc);
});

Deno.test("consumers - getPullConsumerFor non existing misses heartbeats", async () => {
const { ns, nc } = await setup(jetstreamServerConf(), { inboxPrefix: "x" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");

const ci = await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
inactive_threshold: nanos(2000),
});

const consumer = js.consumers.getPullConsumerFor(ci);
await consumer.delete();

await delay(1000);
await assertRejects(
() => {
return jsm.consumers.info("messages", "c");
},
Error,
"consumer not found",
);

let c = 0;
nc.subscribe("$JS.API.CONSUMER.INFO.>", {
callback: (_) => {
c++;
},
});

const iter = await consumer.consume({ bind: true, idle_heartbeat: 1000 });
(async () => {
const status = await iter.status();
for await (const s of status) {
if (s.type === ConsumerEvents.HeartbeatsMissed) {
if (s.data === 2) {
iter.stop();
}
}
}
})().catch();

for await (const _m of iter) {
break;
}

await nc.flush();
assertEquals(c, 0);

await cleanup(ns, nc);
});
11 changes: 11 additions & 0 deletions jetstream/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,15 @@ export interface Consumers {
stream: string,
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;

/**
* Returns a Consumer based on the ConsumerInfo specified.
* Note this method can throw, and it doesn't validate that the
* underlying consumer exists. When using a consumer obtained
* by this method it is important to check for ConsumerEvents#HeartbeatsMissed
* @param info
*/
getPullConsumerFor(info: ConsumerInfo): Consumer;
}

export interface ConsumerOpts {
Expand Down Expand Up @@ -885,6 +894,8 @@ export interface Stream {
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;

getConsumerFromInfo(ci: ConsumerInfo): Consumer;

getMessage(query: MsgRequest): Promise<StoredMsg>;

deleteMessage(seq: number, erase?: boolean): Promise<boolean>;
Expand Down
Loading