diff --git a/package.json b/package.json index 12237de..4c10ee7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@questdb/nodejs-client", - "version": "4.0.0", + "version": "3.0.0", "description": "QuestDB Node.js Client", "scripts": { "test": "vitest", diff --git a/src/sender.ts b/src/sender.ts index 5c20384..cdcab05 100644 --- a/src/sender.ts +++ b/src/sender.ts @@ -81,7 +81,7 @@ const RETRIABLE_STATUS_CODES = [500, 503, 504, 507, 509, 523, 524, 529, 599]; *

*

* It is recommended that the Sender is created by using one of the static factory methods, - * Sender.fromConfig(configString, extraOptions) or Sender.fromEnv(extraOptions)). + * Sender.fromConfig(configString, extraOptions) or Sender.fromEnv(extraOptions). * If the Sender is created via its constructor, at least the SenderOptions configuration object should be * initialized from a configuration string to make sure that the parameters are validated.
* Detailed description of the Sender's configuration options can be found in @@ -407,7 +407,7 @@ class Sender { "info", `Authenticating with ${(connectOptions as tls.ConnectionOptions).host}:${(connectOptions as tls.ConnectionOptions).port}`, ); - await this.socket.write(`${this.jwk.kid}\n`, (err) => { + await this.socket.write(`${this.jwk.kid}\n`, (err: Error) => { if (err) { reject(err); } @@ -570,7 +570,7 @@ class Sender { throw new Error("Sender is not connected"); } return new Promise((resolve, reject) => { - this.socket.write(dataToSend, (err) => { // Use the copied dataToSend + this.socket.write(dataToSend, (err: Error) => { // Use the copied dataToSend if (err) { reject(err); } else { @@ -614,7 +614,7 @@ class Sender { * If the last row is not finished it stays in the sender's buffer. * This operation is added to a queue and executed sequentially. * - * @return {Promise} Resolves to true when there was data in the buffer to send and it was sent successfully. + * @return {Promise} Resolves to true when there was data in the buffer to send, and it was sent successfully. */ async flush(): Promise { // Add to the promise chain to ensure sequential execution @@ -626,7 +626,7 @@ class Sender { } return this._executeFlush(); }) - .catch((err) => { + .catch((err: Error) => { // Log or handle error. If _executeFlush throws, it will be caught here. // The error should have already been logged by _executeFlush. // We re-throw to ensure the promise chain reflects the failure. @@ -893,7 +893,7 @@ async function authenticate( return new Promise((resolve, reject) => { sender.socket.write( `${Buffer.from(signature).toString("base64")}\n`, - (err) => { + (err: Error) => { if (err) { reject(err); } else { diff --git a/test/_utils_/mockhttp.ts b/test/_utils_/mockhttp.ts index 0d83640..8487bc0 100644 --- a/test/_utils_/mockhttp.ts +++ b/test/_utils_/mockhttp.ts @@ -2,9 +2,15 @@ import http from "node:http"; import https from "node:https"; class MockHttp { - server; - mockConfig; - numOfRequests; + server: http.Server | https.Server; + mockConfig: { + responseDelays?: number[], + responseCodes?: number[], + username?: string, + password?: string, + token?: string, + }; + numOfRequests: number; constructor() { this.reset(); @@ -15,7 +21,7 @@ class MockHttp { this.numOfRequests = 0; } - async start(listenPort, secure = false, options?: Record) { + async start(listenPort: number, secure: boolean = false, options?: Record) { const serverCreator = secure ? https.createServer : http.createServer; // @ts-expect-error - Testing different options, so typing is not important this.server = serverCreator(options, (req, res) => { diff --git a/test/options.test.ts b/test/options.test.ts index 9ef1c2f..12b867a 100644 --- a/test/options.test.ts +++ b/test/options.test.ts @@ -376,11 +376,9 @@ describe("Configuration string parser suite", function () { expect(() => SenderOptions.fromConfig("")).toThrow( "Configuration string is missing", ); - // @ts-expect-error - Testing invalid input expect(() => SenderOptions.fromConfig(null)).toThrow( "Configuration string is missing", ); - // @ts-expect-error - Testing invalid input expect(() => SenderOptions.fromConfig(undefined)).toThrow( "Configuration string is missing", ); diff --git a/test/sender.test.ts b/test/sender.test.ts index ef5f825..c925d7c 100644 --- a/test/sender.test.ts +++ b/test/sender.test.ts @@ -1,4 +1,4 @@ -import { Sender } from "../src/index"; +import { Sender } from "../src"; import { describe, it, expect, beforeAll, afterAll } from "vitest"; import { DEFAULT_BUFFER_SIZE, DEFAULT_MAX_BUFFER_SIZE } from "../src/sender"; import { readFileSync } from "fs"; @@ -8,6 +8,8 @@ import { GenericContainer } from "testcontainers"; import http from "http"; import { Agent } from "undici"; import { SenderOptions } from "../src/options"; +import { fail } from "node:assert"; +import { log } from "../src/logging"; const HTTP_OK = 200; @@ -31,7 +33,7 @@ const AUTH: SenderOptions["auth"] = { token: PRIVATE_KEY, }; -async function sleep(ms) { +async function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } @@ -48,6 +50,7 @@ describe("Sender configuration options suite", function () { it("throws exception if the username or the token is missing when TCP transport is used", async function () { try { await Sender.fromConfig("tcp::addr=hostname;username=bobo;").close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe( "TCP transport requires a username and a private key for authentication, please, specify the 'username' and 'token' config options", @@ -56,6 +59,7 @@ describe("Sender configuration options suite", function () { try { await Sender.fromConfig("tcp::addr=hostname;token=bobo_token;").close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe( "TCP transport requires a username and a private key for authentication, please, specify the 'username' and 'token' config options", @@ -68,6 +72,7 @@ describe("Sender configuration options suite", function () { await Sender.fromConfig( "tcps::addr=hostname;username=bobo;tls_roots=bla;", ).close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe( "'tls_roots' and 'tls_roots_password' options are not supported, please, use the 'tls_ca' option or the NODE_EXTRA_CA_CERTS environment variable instead", @@ -78,6 +83,7 @@ describe("Sender configuration options suite", function () { await Sender.fromConfig( "tcps::addr=hostname;token=bobo_token;tls_roots_password=bla;", ).close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe( "'tls_roots' and 'tls_roots_password' options are not supported, please, use the 'tls_ca' option or the NODE_EXTRA_CA_CERTS environment variable instead", @@ -86,10 +92,11 @@ describe("Sender configuration options suite", function () { }); it("throws exception if connect() is called when http transport is used", async function () { - let sender; + let sender: Sender; try { sender = Sender.fromConfig("http::addr=hostname"); await sender.connect(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe( "'connect()' should be called only if the sender connects via TCP", @@ -104,6 +111,7 @@ describe("Sender options test suite", function () { try { // @ts-expect-error - Testing invalid options await new Sender().close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe("The 'protocol' option is mandatory"); } @@ -111,8 +119,8 @@ describe("Sender options test suite", function () { it("fails if options are null", async function () { try { - // @ts-expect-error - Testing invalid options await new Sender(null).close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe("The 'protocol' option is mandatory"); } @@ -120,8 +128,8 @@ describe("Sender options test suite", function () { it("fails if options are undefined", async function () { try { - // @ts-expect-error - Testing invalid options await new Sender(undefined).close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe("The 'protocol' option is mandatory"); } @@ -131,6 +139,7 @@ describe("Sender options test suite", function () { try { // @ts-expect-error - Testing invalid options await new Sender({}).close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe("The 'protocol' option is mandatory"); } @@ -140,6 +149,7 @@ describe("Sender options test suite", function () { try { // @ts-expect-error - Testing invalid options await new Sender({ host: "host" }).close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe("The 'protocol' option is mandatory"); } @@ -148,6 +158,7 @@ describe("Sender options test suite", function () { it("fails if protocol option is invalid", async function () { try { await new Sender({ protocol: "abcd" }).close(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe("Invalid protocol: 'abcd'"); } @@ -284,6 +295,7 @@ describe("Sender options test suite", function () { max_buf_size: 8192, init_buf_size: 16384, }).close(); + fail('Expected error is not thrown'); } catch (err) { expect(err.message).toBe( "Max buffer size is 8192 bytes, requested buffer size: 16384", @@ -324,7 +336,7 @@ describe("Sender options test suite", function () { it("uses default logger if log function is not set", async function () { const sender = new Sender({ protocol: "http", host: "host" }); - expect(JSON.stringify(sender.log)).toEqual(JSON.stringify(sender.log)); + expect(sender.log).toBe(log); await sender.close(); }); @@ -335,13 +347,13 @@ describe("Sender options test suite", function () { host: "host", log: testFunc, }); - expect(JSON.stringify(sender.log)).toEqual(JSON.stringify(sender.log)); + expect(sender.log).toBe(testFunc); await sender.close(); }); it("uses default logger if log is set to null", async function () { const sender = new Sender({ protocol: "http", host: "host", log: null }); - expect(JSON.stringify(sender.log)).toEqual(JSON.stringify(sender.log)); + expect(sender.log).toBe(log); await sender.close(); }); @@ -351,14 +363,14 @@ describe("Sender options test suite", function () { host: "host", log: undefined, }); - expect(JSON.stringify(sender.log)).toEqual(JSON.stringify(sender.log)); + expect(sender.log).toBe(log); await sender.close(); }); it("uses default logger if log is not a function", async function () { // @ts-expect-error - Testing invalid options const sender = new Sender({ protocol: "http", host: "host", log: "" }); - expect(JSON.stringify(sender.log)).toEqual(JSON.stringify(sender.log)); + expect(sender.log).toBe(log); await sender.close(); }); }); @@ -373,7 +385,7 @@ describe("Sender auth config checks suite", function () { token: "privateKey", }, }).close(); - // fail('it should not be able to create the sender'); + fail("it should not be able to create the sender"); } catch (err) { expect(err.message).toBe( "Missing username, please, specify the 'keyId' property of the 'auth' config option. " + @@ -392,7 +404,7 @@ describe("Sender auth config checks suite", function () { token: "privateKey", }, }).close(); - // fail('it should not be able to create the sender'); + fail("it should not be able to create the sender"); } catch (err) { expect(err.message).toBe( "Missing username, please, specify the 'keyId' property of the 'auth' config option. " + @@ -412,7 +424,7 @@ describe("Sender auth config checks suite", function () { token: "privateKey", }, }).close(); - // fail('it should not be able to create the sender'); + fail("it should not be able to create the sender"); } catch (err) { expect(err.message).toBe( "Please, specify the 'keyId' property of the 'auth' config option as a string. " + @@ -430,7 +442,7 @@ describe("Sender auth config checks suite", function () { keyId: "username", }, }).close(); - // fail('it should not be able to create the sender'); + fail("it should not be able to create the sender"); } catch (err) { expect(err.message).toBe( "Missing private key, please, specify the 'token' property of the 'auth' config option. " + @@ -449,7 +461,7 @@ describe("Sender auth config checks suite", function () { token: "", }, }).close(); - // fail('it should not be able to create the sender'); + fail("it should not be able to create the sender"); } catch (err) { expect(err.message).toBe( "Missing private key, please, specify the 'token' property of the 'auth' config option. " + @@ -469,7 +481,7 @@ describe("Sender auth config checks suite", function () { token: true, }, }).close(); - // fail('it should not be able to create the sender'); + fail("it should not be able to create the sender"); } catch (err) { expect(err.message).toBe( "Please, specify the 'token' property of the 'auth' config option as a string. " + @@ -480,7 +492,7 @@ describe("Sender auth config checks suite", function () { }); describe("Sender HTTP suite", function () { - async function sendData(sender) { + async function sendData(sender: Sender) { await sender .table("test") .symbol("location", "us") @@ -529,7 +541,7 @@ describe("Sender HTTP suite", function () { expect(sender.agent[symbols[6]]).toEqual({ pipelining: 3 }); await sender.close(); - agent.destroy(); + await agent.destroy(); }); it("can ingest via HTTPS", async function () { @@ -556,7 +568,7 @@ describe("Sender HTTP suite", function () { await sendData(senderVerifyOff); expect(mockHttps.numOfRequests).toEqual(2); await senderVerifyOff.close(); - }); + }, 20000); it("can ingest via HTTP with basic auth", async function () { mockHttp.reset({ username: "user1", password: "pwd" }); @@ -649,10 +661,12 @@ describe("Sender HTTP suite", function () { }); it("fails when retry timeout expires", async function () { - // artificial delay (responseDelays) is same as retry timeout - // should result in the request failing on the second try + // TODO: artificial delay (responseDelays) is the same as retry timeout, + // This should result in the request failing on the second try. + // However, with undici transport sometimes we reach the third request too. + // Investigate why, probably because of pipelining? mockHttp.reset({ - responseCodes: [204, 500, 503], + responseCodes: [204, 500, 500], responseDelays: [1000, 1000, 1000], }); @@ -660,7 +674,7 @@ describe("Sender HTTP suite", function () { `http::addr=${PROXY_HOST}:${MOCK_HTTP_PORT};retry_timeout=1000`, ); await expect(sendData(sender)).rejects.toThrowError( - "HTTP request failed, statusCode=503, error=Request failed" + "HTTP request failed, statusCode=500, error=Request failed" ); await sender.close(); }); @@ -718,7 +732,7 @@ describe("Sender HTTP suite", function () { for (const sender of senders) { await sender.close(); } - agent.destroy(); + await agent.destroy(); }); }); @@ -748,7 +762,7 @@ describe("Sender connection suite", function () { return sender; } - async function sendData(sender) { + async function sendData(sender: Sender) { await sender .table("test") .symbol("location", "us") @@ -758,14 +772,14 @@ describe("Sender connection suite", function () { } async function assertSentData( - proxy, - authenticated, - expected, + proxy: MockProxy, + authenticated: boolean, + expected: string, timeout = 60000, ) { const interval = 100; const num = timeout / interval; - let actual; + let actual: string; for (let i = 0; i < num; i++) { const dataSentToRemote = proxy.getDataSentToRemote().join("").split("\n"); if (authenticated) { @@ -816,8 +830,6 @@ describe("Sender connection suite", function () { protocol: "tcp", port: PROXY_PORT, host: PROXY_HOST, - // @ts-expect-error invalid options - ca: readFileSync("test/certs/ca/ca.crt"), jwk: JWK, }); const connected = await sender.connect(); @@ -878,7 +890,6 @@ describe("Sender connection suite", function () { it("can connect unauthenticated and send data to server via secure connection", async function () { const proxy = await createProxy(false, proxyOptions); - // @ts-expect-error invalid options const sender = await createSender(null, true); await sendData(sender); await assertSentData( @@ -894,7 +905,7 @@ describe("Sender connection suite", function () { const sender = new Sender({ protocol: "tcp" }); try { await sender.connect(); - // fail('it should not be able to connect'); + fail("it should not be able to connect"); } catch (err) { expect(err.message).toBe("Hostname is not set"); } @@ -906,7 +917,7 @@ describe("Sender connection suite", function () { try { await sender.table("test").symbol("location", "us").atNow(); await sender.flush(); - // fail('it should not be able to send data'); + fail("it should not be able to send data"); } catch (err) { expect(err.message).toBe("TCP send failed, error=Sender is not connected"); } @@ -918,7 +929,7 @@ describe("Sender connection suite", function () { const sender = await createSender(AUTH, true); try { await sender.connect(); - // fail('it should not be able to connect again'); + fail("it should not be able to connect again"); } catch (err) { expect(err.message).toBe("Sender connected already"); } @@ -933,12 +944,11 @@ describe("Sender connection suite", function () { port: PROXY_PORT, host: PROXY_HOST, auth: AUTH, - // @ts-expect-error invalid options - ca: readFileSync("test/certs/ca/ca.crt"), + tls_ca: "test/certs/ca/ca.crt", }); try { await Promise.all([sender.connect(), sender.connect()]); - // fail('it should not be able to connect twice'); + fail("it should not be able to connect twice"); } catch (err) { expect(err.message).toBe("Sender connected already"); } @@ -953,7 +963,7 @@ describe("Sender connection suite", function () { ); try { await senderCertCheckFail.connect(); - // fail('it should not be able to connect'); + fail("it should not be able to connect"); } catch (err) { expect(err.message).toMatch( /^self[ -]signed certificate in certificate chain$/, @@ -991,9 +1001,8 @@ describe("Sender connection suite", function () { "Successfully connected to localhost:9088", /^Connection to .*1:9088 is closed$/, ]; - const log = (level, message) => { + const log = (level: "error" | "warn" | "info" | "debug", message: string) => { expect(level).toBe("info"); - // @ts-expect-error invalid options expect(message).toMatch(expectedMessages.shift()); }; const proxy = await createProxy(); @@ -1053,14 +1062,13 @@ describe("Client interop test suite", function () { sender.timestampColumn(column.name, column.value); break; default: - throw new Error("Unsupported column type"); + fail("Unsupported column type"); } } await sender.atNow(); - } catch { + } catch (e) { if (testCase.result.status !== "ERROR") { - // fail('Did not expect error: ' + e.message); - break; + fail("Did not expect error: " + e.message); } await sender.close(); continue; @@ -1078,11 +1086,10 @@ describe("Client interop test suite", function () { continue loopTestCase; } } - // fail('Line is not matching any of the expected results: ' + buffer.toString()); + fail("Line is not matching any of the expected results: " + buffer.toString()); } } else { - // fail('Expected error missing, instead we have a line: ' + buffer.toString()); - break; + fail("Expected error missing, instead we have a line: " + buffer.toString()); } await sender.close(); @@ -1104,6 +1111,7 @@ describe("Sender message builder test suite (anything not covered in client inte // @ts-expect-error - Testing invalid options .timestampColumn("timestampCol", 1658484765000000, "foobar") .atNow(); + fail("Expected error is not thrown"); } catch (err) { expect(err.message).toBe("Unknown timestamp unit: foobar"); } @@ -1782,9 +1790,9 @@ describe("Sender message builder test suite (anything not covered in client inte }); describe("Sender tests with containerized QuestDB instance", () => { - let container; + let container: any; - async function query(container, query) { + async function query(container: any, query: string) { const options = { hostname: container.getHost(), port: container.getMappedPort(QUESTDB_HTTP_PORT), @@ -1820,10 +1828,10 @@ describe("Sender tests with containerized QuestDB instance", () => { }); } - async function runSelect(container, select, expectedCount, timeout = 60000) { + async function runSelect(container: any, select: string, expectedCount: number, timeout = 60000) { const interval = 500; const num = timeout / interval; - let selectResult; + let selectResult: any; for (let i = 0; i < num; i++) { selectResult = await query(container, select); if (selectResult && selectResult.count >= expectedCount) { @@ -1836,7 +1844,11 @@ describe("Sender tests with containerized QuestDB instance", () => { ); } - function getFieldsString(schema) { + async function waitForTable(container: any, tableName: string, timeout = 30000) { + await runSelect(container, `tables() where table_name='${tableName}'`, 1, timeout); + } + + function getFieldsString(schema: any) { let fields = ""; for (const element of schema) { fields += `${element.name} ${element.type}, `; @@ -1851,8 +1863,8 @@ describe("Sender tests with containerized QuestDB instance", () => { const stream = await container.logs(); stream - .on("data", (line) => console.log(line)) - .on("err", (line) => console.error(line)) + .on("data", (line: string) => console.log(line)) + .on("err", (line: string) => console.error(line)) .on("end", () => console.log("Stream closed")); }, 3000000); @@ -1875,24 +1887,6 @@ describe("Sender tests with containerized QuestDB instance", () => { { name: "timestamp", type: "TIMESTAMP" }, ]; - // create table - const createTableResult = (await query( - container, - `CREATE TABLE ${tableName}(${getFieldsString(schema)}) TIMESTAMP (timestamp) PARTITION BY DAY BYPASS WAL;`, - )) as { - ddl: string; - }; - expect(createTableResult.ddl).toBe("OK"); - - // alter table - const alterTableResult = (await query( - container, - `ALTER TABLE ${tableName} SET PARAM maxUncommittedRows = 1;`, - )) as { - ddl: string; - }; - expect(alterTableResult.ddl).toBe("OK"); - // ingest via client await sender .table(tableName) @@ -1901,6 +1895,9 @@ describe("Sender tests with containerized QuestDB instance", () => { .at(1658484765000000000n, "ns"); await sender.flush(); + // wait for the table + await waitForTable(container, tableName) + // query table const select1Result = await runSelect(container, tableName, 1); expect(select1Result.query).toBe(tableName); @@ -1957,17 +1954,6 @@ describe("Sender tests with containerized QuestDB instance", () => { { name: "timestamp", type: "TIMESTAMP" }, ]; - // Ensure table exists and is clean - await query( - container, - `CREATE TABLE IF NOT EXISTS ${tableName}(${getFieldsString(schema)}) TIMESTAMP (timestamp) PARTITION BY DAY;`, - ); - await query(container, `TRUNCATE TABLE ${tableName}`); - await query( - container, - `ALTER TABLE ${tableName} SET PARAM maxUncommittedRows = 1;`, // Make data visible quickly - ); - const sender = Sender.fromConfig( `http::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_HTTP_PORT)};auto_flush_interval=0;auto_flush_rows=1`, ); @@ -1979,7 +1965,8 @@ describe("Sender tests with containerized QuestDB instance", () => { .floatColumn("temperature", 17.1) .at(1658484765000000000n, "ns"); - // await sender.flush(); // Explicit flush before querying + // wait for the table + await waitForTable(container, tableName) // query table const select1Result = await runSelect(container, tableName, 1); @@ -2008,8 +1995,6 @@ describe("Sender tests with containerized QuestDB instance", () => { .floatColumn("temperature", 18.81) .at(1658484765001234000n, "ns"); - // await sender.flush(); // Explicit flush before querying - // query table const select2Result = await runSelect(container, tableName, 4); expect(select2Result.query).toBe(tableName); @@ -2038,17 +2023,6 @@ describe("Sender tests with containerized QuestDB instance", () => { { name: "timestamp", type: "TIMESTAMP" }, ]; - // Ensure table exists and is clean - await query( - container, - `CREATE TABLE IF NOT EXISTS ${tableName}(${getFieldsString(schema)}) TIMESTAMP (timestamp) PARTITION BY DAY;`, - ); - await query(container, `TRUNCATE TABLE ${tableName}`); - await query( - container, - `ALTER TABLE ${tableName} SET PARAM maxUncommittedRows = 1;`, // Make data visible quickly - ); - const sender = Sender.fromConfig( `http::addr=${container.getHost()}:${container.getMappedPort(QUESTDB_HTTP_PORT)};auto_flush_interval=1;auto_flush_rows=0`, ); @@ -2063,7 +2037,8 @@ describe("Sender tests with containerized QuestDB instance", () => { .floatColumn("temperature", 17.1) .at(1658484765000000000n, "ns"); - // await sender.flush(); // Explicit flush before querying + // wait for the table + await waitForTable(container, tableName) // query table const select1Result = await runSelect(container, tableName, 1); @@ -2095,8 +2070,6 @@ describe("Sender tests with containerized QuestDB instance", () => { .floatColumn("temperature", 18.81) .at(1658484765001234000n, "ns"); - // await sender.flush(); // Explicit flush before querying - // query table const select2Result = await runSelect(container, tableName, 4); expect(select2Result.query).toBe(tableName); @@ -2134,24 +2107,6 @@ describe("Sender tests with containerized QuestDB instance", () => { { name: "timestamp", type: "TIMESTAMP" }, ]; - // create table - const createTableResult = (await query( - container, - `CREATE TABLE ${tableName}(${getFieldsString(schema)}) TIMESTAMP (timestamp) PARTITION BY DAY;`, - )) as { - ddl: string; - }; - expect(createTableResult.ddl).toBe("OK"); - - // alter table - const alterTableResult = (await query( - container, - `ALTER TABLE ${tableName} SET PARAM maxUncommittedRows = 1;`, - )) as { - ddl: string; - }; - expect(alterTableResult.ddl).toBe("OK"); - // ingest via client const numOfRows = 100; for (let i = 0; i < numOfRows; i++) { @@ -2164,6 +2119,9 @@ describe("Sender tests with containerized QuestDB instance", () => { await sender.flush(); } + // wait for the table + await waitForTable(container, tableName) + // query table const selectQuery = `${tableName} order by temperature`; const selectResult = await runSelect(container, selectQuery, numOfRows); @@ -2187,22 +2145,6 @@ describe("Sender tests with containerized QuestDB instance", () => { await sender.connect(); const tableName = "test_high_load_autoflush"; - const schema = [ - { name: "id", type: "LONG" }, - { name: "timestamp", type: "TIMESTAMP" }, - ]; - - // Create table if not exists, and truncate it for a clean test - await query( - container, - `CREATE TABLE IF NOT EXISTS ${tableName}(${getFieldsString(schema)}) TIMESTAMP (timestamp) PARTITION BY DAY;`, - ); - await query( - container, - `ALTER TABLE ${tableName} SET PARAM maxUncommittedRows = 1;`, // Make data visible quickly - ); - await query(container, `TRUNCATE TABLE ${tableName}`); - const numOfRows = 1000; const promises: Promise[] = []; @@ -2211,7 +2153,7 @@ describe("Sender tests with containerized QuestDB instance", () => { const p = sender .table(tableName) .intColumn("id", i) - .at(1658484765000000000n + BigInt(i), "ns"); // Unique timestamp for each row + .at(1658484765000000000n + BigInt(1000 * i), "ns"); // Unique timestamp for each row promises.push(p); } @@ -2222,17 +2164,17 @@ describe("Sender tests with containerized QuestDB instance", () => { // This will be queued correctly after any ongoing auto-flushes. await sender.flush(); + // Wait for the table + await waitForTable(container, tableName) + // Query table and verify count - const selectQuery = `${tableName}`; - // Increased timeout for runSelect as many small flushes might take longer to commit and be queryable - const selectResult = await runSelect(container, selectQuery, numOfRows, 20000); + const selectQuery = `SELECT id FROM ${tableName}`; + const selectResult = await runSelect(container, selectQuery, numOfRows); expect(selectResult.count).toBe(numOfRows); // Verify data integrity - const dataCheckResult = (await query(container, `SELECT id FROM ${tableName} ORDER BY id`)) as { dataset: [number][], count: number }; - expect(dataCheckResult.count).toBe(numOfRows); for (let i = 0; i < numOfRows; i++) { - expect(dataCheckResult.dataset[i][0]).toBe(i); + expect(selectResult.dataset[i][0]).toBe(i); } await sender.close(); diff --git a/test/testapp.ts b/test/testapp.ts index 2536b58..9aedd9c 100644 --- a/test/testapp.ts +++ b/test/testapp.ts @@ -1,7 +1,7 @@ import { readFileSync } from "node:fs"; import { Proxy } from "./_utils_/proxy"; -import { Sender } from "../src/index"; +import { Sender } from "../src"; import { SenderOptions } from "../src/options"; const PROXY_PORT = 9099;