Skip to content

Commit 0b3e796

Browse files
Merge pull request #83 from rosemary21/feature/export-streams-csv
feature/export-streams-csv
2 parents 8fcdce6 + 079b0c6 commit 0b3e796

File tree

3 files changed

+311
-1
lines changed

3 files changed

+311
-1
lines changed

src/api/v1/streams.test.ts

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,17 @@ import app from "../../index";
33
import { StreamRepository } from "../../repositories/streamRepository";
44
import { Stream } from "../../db/schema";
55

6+
const TEST_SECRET = "test-jwt-secret-that-is-at-least-32-chars!!";
7+
8+
// Inject JWT_SECRET so the requireAuth middleware has something to check against
9+
beforeAll(() => {
10+
process.env.JWT_SECRET = TEST_SECRET;
11+
});
12+
13+
afterAll(() => {
14+
delete process.env.JWT_SECRET;
15+
});
16+
617
describe("Stream API Routes", () => {
718
beforeAll(() => {
819
process.env.API_KEYS = "test-1234";
@@ -265,3 +276,212 @@ describe("Stream API Routes", () => {
265276
});
266277
});
267278
});
279+
280+
// ---------------------------------------------------------------------------
281+
// CSV Export endpoint
282+
// ---------------------------------------------------------------------------
283+
284+
const makeExportStream = (overrides: Record<string, unknown> = {}) => ({
285+
id: "aaaaaaaa-1111-1111-1111-000000000001",
286+
payer: "0xPayerAddress",
287+
recipient: "0xRecipientAddress",
288+
status: "active",
289+
ratePerSecond: "0.001",
290+
startTime: new Date("2024-01-01T00:00:00Z"),
291+
endTime: null,
292+
totalAmount: "3600.0",
293+
lastSettledAt: new Date("2024-01-01T00:00:00Z"),
294+
createdAt: new Date("2024-06-01T12:00:00Z"),
295+
updatedAt: new Date("2024-06-01T12:00:00Z"),
296+
...overrides,
297+
});
298+
299+
describe("GET /api/v1/streams/export.csv", () => {
300+
describe("authentication", () => {
301+
it("should return 401 when no Authorization header is present", async () => {
302+
const response = await request(app).get("/api/v1/streams/export.csv");
303+
expect(response.status).toBe(401);
304+
expect(response.body.error).toBe("Unauthorized");
305+
});
306+
307+
it("should return 401 when the token is wrong", async () => {
308+
const response = await request(app)
309+
.get("/api/v1/streams/export.csv")
310+
.set("Authorization", "Bearer wrong-token");
311+
expect(response.status).toBe(401);
312+
expect(response.body.error).toBe("Unauthorized");
313+
});
314+
315+
it("should return 401 when Authorization header is malformed (no Bearer prefix)", async () => {
316+
const response = await request(app)
317+
.get("/api/v1/streams/export.csv")
318+
.set("Authorization", TEST_SECRET);
319+
expect(response.status).toBe(401);
320+
expect(response.body.error).toBe("Unauthorized");
321+
});
322+
});
323+
324+
describe("response format", () => {
325+
it("should return Content-Type text/csv for a valid request", async () => {
326+
const spy = jest
327+
.spyOn(StreamRepository.prototype, "findForExport")
328+
.mockResolvedValue({ rows: [], nextCursor: null });
329+
330+
const response = await request(app)
331+
.get("/api/v1/streams/export.csv")
332+
.set("Authorization", `Bearer ${TEST_SECRET}`);
333+
334+
expect(response.status).toBe(200);
335+
expect(response.headers["content-type"]).toMatch(/text\/csv/);
336+
spy.mockRestore();
337+
});
338+
339+
it("should set Content-Disposition to attachment with a .csv filename", async () => {
340+
const spy = jest
341+
.spyOn(StreamRepository.prototype, "findForExport")
342+
.mockResolvedValue({ rows: [], nextCursor: null });
343+
344+
const response = await request(app)
345+
.get("/api/v1/streams/export.csv")
346+
.set("Authorization", `Bearer ${TEST_SECRET}`);
347+
348+
expect(response.headers["content-disposition"]).toMatch(/attachment/);
349+
expect(response.headers["content-disposition"]).toMatch(/\.csv/);
350+
spy.mockRestore();
351+
});
352+
353+
it("should include a header row in the CSV", async () => {
354+
const spy = jest
355+
.spyOn(StreamRepository.prototype, "findForExport")
356+
.mockResolvedValue({ rows: [], nextCursor: null });
357+
358+
const response = await request(app)
359+
.get("/api/v1/streams/export.csv")
360+
.set("Authorization", `Bearer ${TEST_SECRET}`);
361+
362+
const lines = response.text.split("\r\n").filter(Boolean);
363+
expect(lines[0]).toBe(
364+
"id,payer,recipient,status,ratePerSecond,startTime,endTime,totalAmount,lastSettledAt,createdAt,updatedAt"
365+
);
366+
spy.mockRestore();
367+
});
368+
});
369+
370+
describe("row count and content", () => {
371+
it("should return only the header row when there are no streams", async () => {
372+
const spy = jest
373+
.spyOn(StreamRepository.prototype, "findForExport")
374+
.mockResolvedValue({ rows: [], nextCursor: null });
375+
376+
const response = await request(app)
377+
.get("/api/v1/streams/export.csv")
378+
.set("Authorization", `Bearer ${TEST_SECRET}`);
379+
380+
const lines = response.text.split("\r\n").filter(Boolean);
381+
expect(lines).toHaveLength(1); // header only
382+
spy.mockRestore();
383+
});
384+
385+
it("should contain one data row per stream in the fixture", async () => {
386+
const fixtures = [makeExportStream(), makeExportStream({ id: "bbbbbbbb-2222-2222-2222-000000000002" })];
387+
const spy = jest
388+
.spyOn(StreamRepository.prototype, "findForExport")
389+
.mockResolvedValue({ rows: fixtures as never[], nextCursor: null });
390+
391+
const response = await request(app)
392+
.get("/api/v1/streams/export.csv")
393+
.set("Authorization", `Bearer ${TEST_SECRET}`);
394+
395+
const lines = response.text.split("\r\n").filter(Boolean);
396+
// 1 header + 2 data rows
397+
expect(lines).toHaveLength(3);
398+
expect(lines[1]).toContain(fixtures[0].id);
399+
expect(lines[2]).toContain(fixtures[1].id);
400+
spy.mockRestore();
401+
});
402+
403+
it("should accumulate rows across multiple cursor pages", async () => {
404+
const page1 = [makeExportStream({ id: "id-page1" })];
405+
const page2 = [makeExportStream({ id: "id-page2" })];
406+
const cursor = { createdAt: new Date(), id: "id-page1" };
407+
408+
const spy = jest
409+
.spyOn(StreamRepository.prototype, "findForExport")
410+
.mockResolvedValueOnce({ rows: page1 as never[], nextCursor: cursor })
411+
.mockResolvedValueOnce({ rows: page2 as never[], nextCursor: null });
412+
413+
const response = await request(app)
414+
.get("/api/v1/streams/export.csv")
415+
.set("Authorization", `Bearer ${TEST_SECRET}`);
416+
417+
const lines = response.text.split("\r\n").filter(Boolean);
418+
expect(lines).toHaveLength(3); // header + 2 rows across 2 pages
419+
expect(response.text).toContain("id-page1");
420+
expect(response.text).toContain("id-page2");
421+
spy.mockRestore();
422+
});
423+
});
424+
425+
describe("CSV content correctness", () => {
426+
it("should properly escape fields containing commas", async () => {
427+
const streamWithComma = makeExportStream({ payer: "address,with,commas" });
428+
const spy = jest
429+
.spyOn(StreamRepository.prototype, "findForExport")
430+
.mockResolvedValue({ rows: [streamWithComma as never], nextCursor: null });
431+
432+
const response = await request(app)
433+
.get("/api/v1/streams/export.csv")
434+
.set("Authorization", `Bearer ${TEST_SECRET}`);
435+
436+
expect(response.text).toContain('"address,with,commas"');
437+
spy.mockRestore();
438+
});
439+
440+
it("should properly escape fields containing double quotes", async () => {
441+
const streamWithQuote = makeExportStream({ recipient: 'has"quote' });
442+
const spy = jest
443+
.spyOn(StreamRepository.prototype, "findForExport")
444+
.mockResolvedValue({ rows: [streamWithQuote as never], nextCursor: null });
445+
446+
const response = await request(app)
447+
.get("/api/v1/streams/export.csv")
448+
.set("Authorization", `Bearer ${TEST_SECRET}`);
449+
450+
expect(response.text).toContain('"has""quote"');
451+
spy.mockRestore();
452+
});
453+
454+
it("should output empty string for null endTime", async () => {
455+
const spy = jest
456+
.spyOn(StreamRepository.prototype, "findForExport")
457+
.mockResolvedValue({ rows: [makeExportStream({ endTime: null }) as never], nextCursor: null });
458+
459+
const response = await request(app)
460+
.get("/api/v1/streams/export.csv")
461+
.set("Authorization", `Bearer ${TEST_SECRET}`);
462+
463+
// The endTime column (6th, 0-indexed) should be empty between commas
464+
const dataLine = response.text.split("\r\n")[1];
465+
const cols = dataLine.split(",");
466+
expect(cols[6]).toBe(""); // endTime column
467+
spy.mockRestore();
468+
});
469+
});
470+
471+
describe("query filter pass-through", () => {
472+
it("should forward payer, recipient, and status filters to the repository", async () => {
473+
const spy = jest
474+
.spyOn(StreamRepository.prototype, "findForExport")
475+
.mockResolvedValue({ rows: [], nextCursor: null });
476+
477+
await request(app)
478+
.get("/api/v1/streams/export.csv?payer=0xA&recipient=0xB&status=paused")
479+
.set("Authorization", `Bearer ${TEST_SECRET}`);
480+
481+
expect(spy).toHaveBeenCalledWith(
482+
expect.objectContaining({ payer: "0xA", recipient: "0xB", status: "paused" })
483+
);
484+
spy.mockRestore();
485+
});
486+
});
487+
});

src/repositories/streamRepository.test.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,77 @@ describe("StreamRepository", () => {
146146
expect(countQuery.where).toHaveBeenCalled();
147147
});
148148
});
149+
150+
describe("findForExport", () => {
151+
const makeStream = (overrides: Partial<{ id: string; createdAt: Date }> = {}) => ({
152+
id: overrides.id ?? "aaaaaaaa-0000-0000-0000-000000000001",
153+
payer: "0xPayer",
154+
recipient: "0xRecipient",
155+
status: "active" as const,
156+
ratePerSecond: "1.0",
157+
startTime: new Date("2024-01-01T00:00:00Z"),
158+
endTime: null,
159+
totalAmount: "3600.0",
160+
lastSettledAt: new Date("2024-01-01T00:00:00Z"),
161+
createdAt: overrides.createdAt ?? new Date("2024-06-01T00:00:00Z"),
162+
updatedAt: new Date("2024-06-01T00:00:00Z"),
163+
});
164+
165+
it("should return all rows with null nextCursor when count <= batchSize", async () => {
166+
const mockRows = [makeStream({ id: "a" }), makeStream({ id: "b" })];
167+
(db.select as jest.Mock).mockReturnValue(createMockQuery(mockRows));
168+
169+
const result = await repository.findForExport({ batchSize: 500 });
170+
171+
expect(result.rows).toHaveLength(2);
172+
expect(result.nextCursor).toBeNull();
173+
});
174+
175+
it("should return a page and nextCursor when more rows than batchSize exist", async () => {
176+
// Simulate batchSize=2 but DB returns 3 rows (batchSize+1)
177+
const t = new Date("2024-06-01T00:00:00Z");
178+
const r1 = makeStream({ id: "id-1", createdAt: t });
179+
const r2 = makeStream({ id: "id-2", createdAt: t });
180+
const r3 = makeStream({ id: "id-3", createdAt: t }); // sentinel
181+
(db.select as jest.Mock).mockReturnValue(createMockQuery([r1, r2, r3]));
182+
183+
const result = await repository.findForExport({ batchSize: 2 });
184+
185+
expect(result.rows).toHaveLength(2);
186+
expect(result.rows[0].id).toBe("id-1");
187+
expect(result.rows[1].id).toBe("id-2");
188+
expect(result.nextCursor).toEqual({ createdAt: t, id: "id-2" });
189+
});
190+
191+
it("should apply payer/recipient/status filters", async () => {
192+
(db.select as jest.Mock).mockReturnValue(createMockQuery([]));
193+
194+
await repository.findForExport({ payer: "0xA", recipient: "0xB", status: "paused" });
195+
196+
// select was called — no assertion on internals, just that it resolves without error
197+
expect(db.select).toHaveBeenCalled();
198+
});
199+
200+
it("should return empty rows with null nextCursor for empty result set", async () => {
201+
(db.select as jest.Mock).mockReturnValue(createMockQuery([]));
202+
203+
const result = await repository.findForExport({});
204+
205+
expect(result.rows).toHaveLength(0);
206+
expect(result.nextCursor).toBeNull();
207+
});
208+
209+
it("should pass cursor arguments when provided", async () => {
210+
const cursorDate = new Date("2024-05-01T00:00:00Z");
211+
(db.select as jest.Mock).mockReturnValue(createMockQuery([]));
212+
213+
const result = await repository.findForExport({
214+
cursorCreatedAt: cursorDate,
215+
cursorId: "cursor-id",
216+
});
217+
218+
expect(result.rows).toHaveLength(0);
219+
expect(result.nextCursor).toBeNull();
220+
});
221+
});
149222
});

src/repositories/streamRepository.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { eq, and, desc, sql } from "drizzle-orm";
1+
import { eq, and, or, desc, lt, sql } from "drizzle-orm";
22
import { db } from "../db/index";
33
import { streams, Stream, NewStream } from "../db/schema";
44

@@ -18,6 +18,23 @@ export interface UpdateStreamParams {
1818
updatedAt?: Date;
1919
}
2020

21+
export interface ExportParams {
22+
payer?: string;
23+
recipient?: string;
24+
status?: "active" | "paused" | "cancelled" | "completed";
25+
/** Exclusive cursor: resume after this (createdAt, id) pair (both must be set together). */
26+
cursorCreatedAt?: Date;
27+
cursorId?: string;
28+
/** Number of rows per DB fetch (default 500). */
29+
batchSize?: number;
30+
}
31+
32+
export interface ExportBatch {
33+
rows: Stream[];
34+
/** null when this is the last page. */
35+
nextCursor: { createdAt: Date; id: string } | null;
36+
}
37+
2138
export class StreamRepository {
2239
async findById(id: string, includeDeleted = false): Promise<(Stream & { accruedEstimate: string }) | null> {
2340
const conditions = [eq(streams.id, id)];

0 commit comments

Comments
 (0)