Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix abort controller handling #36

Merged
merged 15 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions packages/sdk/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,16 @@ console.log(transactionHash);

The `Database` may also be run in `autoWait` mode, such that each mutating call will not resolve until it has finalized on the Tableland network. This is useful when working with D1 compatible libraries, or to avoid issues with nonce-reuse etc.

Additionally, all async method calls take an optional `AbortSignal` object, which may be used to cancel or otherwise abort an inflight query. Note that this will only abort queries (including wait status), not the actual mutation transaction itself.
Additionally, all async method calls take an optional `Signal` or `PollingController` object, which may be used to abort an inflight query. Note that this will only abort queries (including wait status), not the actual mutation transaction itself.

```typescript
const controller = new AbortController();
const signal = controller.signal;
import { helpers } from "@tableland/sdk";
const controller = helpers.createPollingController(60_000, 1500); // polling timeout and interval

const stmt = db.prepare("SELECT name, age FROM users WHERE age < ?1");

setTimeout(() => controller.abort(), 10);
const young = await stmt.bind(20).all({ signal });
const young = await stmt.bind(20).all({ controller });
/*
Error: The operation was aborted.
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
},
"dependencies": {
"@async-generators/from-emitter": "^0.3.0",
"@tableland/evm": "^4.3.0",
"@tableland/evm": "^4.4.0",
"@tableland/sqlparser": "^1.3.0",
"ethers": "^5.7.2"
}
Expand Down
36 changes: 19 additions & 17 deletions packages/sdk/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { wrapResult } from "./registry/utils.js";
import {
type Config,
type AutoWaitConfig,
type ChainName,
type PollingController,
type Signer,
checkWait,
extractBaseUrl,
type ChainName,
getBaseUrl,
type Signal,
type Signer,
normalize,
validateTableName,
} from "./helpers/index.js";
Expand Down Expand Up @@ -45,7 +45,7 @@ export class Database<D = unknown> {
*/
static readOnly(chainNameOrId: ChainName | number): Database {
console.warn(
"`Database.readOnly()` is a depricated method, use `new Database()`"
"`Database.readOnly()` is a deprecated method, use `new Database()`"
);
const baseUrl = getBaseUrl(chainNameOrId);
return new Database({ baseUrl });
Expand Down Expand Up @@ -83,17 +83,17 @@ export class Database<D = unknown> {
* in the sequence fails, then an error is returned for that specific
* statement, and it aborts or rolls back the entire sequence.
* @param statements A set of Statement objects to batch and submit.
* @param opts Additional options to control execution.
* @param controller An optional object used to control receipt polling behavior.
* @returns An array of run results.
*/
// Note: if we want this package to mirror the D1 package in a way that
// enables compatability with packages built to exend D1, then the return type
// enables compatability with packages built to extend D1, then the return type
// here will potentially affect if/how those packages work.
// D1-ORM is a good example: https://github.com/Interactions-as-a-Service/d1-orm/
async batch<T = D>(
statements: Statement[],
opts: Signal = {}
// reads returns an Array with legnth equal to the number of batched statements,
controller?: PollingController
// reads returns an Array with length equal to the number of batched statements,
// everything else a single result wrapped in an Array for backward compatability.
): Promise<Array<Result<T>>> {
try {
Expand Down Expand Up @@ -124,7 +124,7 @@ export class Database<D = unknown> {
// and return an Array of the query results.
if (type === "read") {
return await Promise.all(
statements.map(async (stmt) => await stmt.all<T>(undefined, opts))
statements.map(async (stmt) => await stmt.all<T>({ controller }))
);
}

Expand All @@ -135,7 +135,8 @@ export class Database<D = unknown> {
await execCreateMany(
this.config,
statements.map((stmt) => stmt.toString())
)
),
controller
);

// TODO: wrapping in an Array is required for back compat, consider changing this for next major
Expand All @@ -160,7 +161,8 @@ export class Database<D = unknown> {

const receipt = await checkWait(
this.config,
await execMutateMany(this.config, runnables)
await execMutateMany(this.config, runnables),
controller
);

// TODO: wrapping in an Array is required for back compat, consider changing this for next major
Expand All @@ -186,19 +188,19 @@ export class Database<D = unknown> {
* transaction. In the future, more "intelligent" transaction planning,
* splitting, and batching may be used.
* @param statementStrings A set of SQL statement strings separated by semi-colons.
* @param opts Additional options to control execution.
* @param controller An optional object used to control receipt polling behavior.
* @returns A single run result.
*/
async exec<T = D>(
statementStrings: string,
opts: Signal = {}
controller?: PollingController
): Promise<Result<T>> {
// TODO: Note that this method appears to be the wrong return type in practice.
try {
const { statements } = await normalize(statementStrings);
const count = statements.length;
const statement = this.prepare(statementStrings);
const result = await statement.run(opts);
const result = await statement.run({ controller });
// Adds a count property which isn't typed
result.meta.count = count;
return result;
Expand All @@ -213,9 +215,9 @@ export class Database<D = unknown> {
/**
* Export a (set of) tables to the SQLite binary format.
* Not implemented yet!
* @param _opts Additional options to control execution.
* @param controller An optional object used to control receipt polling behavior.
*/
async dump(_opts: Signal = {}): Promise<ArrayBuffer> {
async dump(_controller?: PollingController): Promise<ArrayBuffer> {
throw errorWithCause("DUMP_ERROR", new Error("not implemented yet"));
}
}
Expand Down Expand Up @@ -248,7 +250,7 @@ async function normalizedToRunnables(
// check if these tables are in the normalized table names
// if so, filter them out (i.e., they are not being mutated)
const filteredTables = norm.tables.filter(
(tableName) => !tableNames.includes(tableName)
(tableName: string) => !tableNames.includes(tableName)
);
// if the filtered tables are greater than 1, then there are two
// tables being mutated in a single statement, which is not allowed
Expand Down
140 changes: 107 additions & 33 deletions packages/sdk/src/helpers/await.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,130 @@
/**
* A type that can be awaited.
* @property T The type to await.
*/
export type Awaitable<T> = T | PromiseLike<T>;

/**
* A signal to abort a request.
*/
export interface Signal {
signal?: AbortSignal;
/**
* The {@link AbortSignal} to abort a request.
*/
signal: AbortSignal;
/**
* A function to abort a request.
*/
abort: () => void;
}

/**
* A polling interval to check for results.
*/
export interface Interval {
interval?: number;
/**
* The interval period to make new requests, in milliseconds.
*/
interval: number;
/**
* A function to cancel a polling interval.
*/
cancel: () => void;
}

/**
* A polling timeout to abort a request.
*/
export interface Timeout {
/**
* The timeout period in milliseconds.
*/
timeout: number;
}

export type SignalAndInterval = Signal & Interval;
/**
* A polling controller with a custom timeout & interval.
*/
export type PollingController = Signal & Interval & Timeout;

/**
* A waitable interface to check for results.
*/
export interface Wait<T = unknown> {
wait: (opts?: SignalAndInterval) => Promise<T>;
/**
* A function to check for results.
* @param controller A {@link PollingController} with the custom timeout & interval.
* @returns
*/
wait: (controller?: PollingController) => Promise<T>;
}

/**
* Results from an an asynchronous function.
*/
export interface AsyncData<T> {
done: boolean;
data?: T;
}

/**
* An asynchronous function to check for results.
*/
export type AsyncFunction<T> = () => Awaitable<AsyncData<T>>;

export function getAbortSignal(
signal?: AbortSignal,
maxTimeout: number = 60_000
): {
signal: AbortSignal;
timeoutId: ReturnType<typeof setTimeout> | undefined;
} {
let abortSignal: AbortSignal;
let timeoutId;
if (signal == null) {
const controller = new AbortController();
abortSignal = controller.signal;
// return the timeoutId so the caller can cleanup
timeoutId = setTimeout(function () {
/**
* Create a signal to abort a request.
* @returns A {@link Signal} to abort a request.
*/
export function createSignal(): Signal {
const controller = new AbortController();
return {
signal: controller.signal,
abort: () => {
controller.abort();
}, maxTimeout);
} else {
abortSignal = signal;
}
return { signal: abortSignal, timeoutId };
},
};
}

/**
* Create a polling controller with a custom timeout & interval.
* @param timeout The timeout period in milliseconds.
* @param interval The interval period to make new requests, in milliseconds.
* @returns A {@link PollingController} with the custom timeout & interval.
*/
export function createPollingController(
timeout: number = 60_000,
pollingInterval: number = 1500
): PollingController {
const controller = new AbortController();
const timeoutId = setTimeout(function () {
controller.abort();
}, timeout);
return {
signal: controller.signal,
abort: () => {
clearTimeout(timeoutId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the only change I made here from Sander's original code was the addition of this clearTimeout(timeoutId) line, to make sure that it's cleared if an abort occurs.

controller.abort();
},
interval: pollingInterval,
cancel: () => {
clearTimeout(timeoutId);
},
timeout,
};
}

/**
* Create an asynchronous poller to check for results for a given function.
* @param fn An {@link AsyncFunction} to check for results.
* @param controller A {@link PollingController} with the custom timeout & interval.
* @returns Result from the awaited function's execution or resulting error.
*/
export async function getAsyncPoller<T = unknown>(
fn: AsyncFunction<T>,
interval: number = 1500,
signal?: AbortSignal
controller?: PollingController
): Promise<T> {
// in order to set a timeout other than 10 seconds you need to
// create and pass in an abort signal with a different timeout
const { signal: abortSignal, timeoutId } = getAbortSignal(signal, 10_000);
const control = controller ?? createPollingController();
const checkCondition = (
resolve: (value: T) => void,
reject: (reason?: any) => void
Expand All @@ -59,15 +133,15 @@ export async function getAsyncPoller<T = unknown>(
.then((result: AsyncData<T>) => {
if (result.done && result.data != null) {
// We don't want to call `AbortController.abort()` if the call succeeded
clearTimeout(timeoutId);
control.cancel();
return resolve(result.data);
}
if (abortSignal.aborted) {
if (control.signal.aborted) {
// We don't want to call `AbortController.abort()` if the call is already aborted
clearTimeout(timeoutId);
return reject(abortSignal.reason);
control.cancel();
return reject(control.signal.reason);
} else {
setTimeout(checkCondition, interval, resolve, reject);
setTimeout(checkCondition, control.interval, resolve, reject);
}
})
.catch((err) => {
Expand Down
Loading
Loading