Skip to content

Commit

Permalink
Add test for reading after the stream end
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed May 16, 2023
1 parent 0e73a5f commit 6bc5c78
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion test/client.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Offset, ProximaStreamClient, StreamRegistryClient, StreamEndpoint } from "../src"
import { strict as assert } from "assert";
import { firstValueFrom, take, toArray } from "rxjs";
import { EMPTY, Observable, firstValueFrom, isEmpty, lastValueFrom, take, timeout, toArray } from "rxjs";

const streamTests = [{
stream: "proxima.eth-main.blocks.1_0",
Expand All @@ -10,6 +10,23 @@ const streamTests = [{
timestamp: 1438473600000,
}];

async function getStats(registry: StreamRegistryClient, name: string): Promise<StreamEndpoint | undefined> {
const streams = await registry.getStreams();
for (const stream of streams) {
if (stream.name == name) {
return Object.values(stream.endpoints)[0];
}
}
}

async function emitsNothingFor<T>(observable: Observable<T>, time: number): Promise<boolean> {
return lastValueFrom(
observable
.pipe(timeout({each: time, with: () => EMPTY}))
.pipe(isEmpty())
);
}

describe("StreamRegistryClient", () => {
it("should get all streams", async () => {
const registry = new StreamRegistryClient();
Expand Down Expand Up @@ -113,4 +130,16 @@ describe("ProximaStreamClient", () => {
expect(lastEvents).toHaveLength(0);
});
}

it("should return zero events if fetching from last offset", async () => {
const stream = "proxima.exchange-rates.0_1";
const client = new ProximaStreamClient({registry});
const stats = await getStats(registry, stream);
expect(stats).not.toBeUndefined();
const pauseable = await client.streamEvents(stream, stats!.stats.end!);
const subscription = pauseable.observable.subscribe({error: e => fail(e)});
const empty = await emitsNothingFor(pauseable.observable, 1000);
expect(empty).toBeTruthy();
subscription.unsubscribe();
});
});

0 comments on commit 6bc5c78

Please sign in to comment.