Skip to content

Commit

Permalink
Merge pull request #34 from jsr-core/performance-fix
Browse files Browse the repository at this point in the history
feat: performance fix
  • Loading branch information
lambdalisue authored Aug 19, 2024
2 parents 227e7e5 + 2ce4940 commit d397736
Show file tree
Hide file tree
Showing 20 changed files with 469 additions and 110 deletions.
57 changes: 57 additions & 0 deletions _raw_semaphore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* @internal
*/
export class RawSemaphore {
#resolves: (() => void)[] = [];
#value: number;
#size: number;

/**
* Creates a new semaphore with the specified limit.
*
* @param size The maximum number of times the semaphore can be acquired before blocking.
* @throws {RangeError} if the size is not a positive safe integer.
*/
constructor(size: number) {
if (size <= 0 || !Number.isSafeInteger(size)) {
throw new RangeError(
`size must be a positive safe integer, got ${size}`,
);
}
this.#value = size;
this.#size = size;
}

/**
* Returns true if the semaphore is currently locked.
*/
get locked(): boolean {
return this.#value === 0;
}

/**
* Acquires the semaphore, blocking until the semaphore is available.
*/
acquire(): Promise<void> {
if (this.#value > 0) {
this.#value -= 1;
return Promise.resolve();
} else {
const { promise, resolve } = Promise.withResolvers<void>();
this.#resolves.push(resolve);
return promise;
}
}

/**
* Releases the semaphore, allowing the next waiting operation to proceed.
*/
release(): void {
const resolve = this.#resolves.shift();
if (resolve) {
resolve();
} else if (this.#value < this.#size) {
this.#value += 1;
}
}
}
29 changes: 14 additions & 15 deletions barrier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { Notify } from "./notify.ts";

/**
* A synchronization primitive that allows multiple tasks to wait until all of
* them have reached a certain point of execution before continuing.
Expand All @@ -26,8 +24,8 @@ import { Notify } from "./notify.ts";
* ```
*/
export class Barrier {
#notify = new Notify();
#rest: number;
#waiter: PromiseWithResolvers<void> = Promise.withResolvers();
#value: number;

/**
* Creates a new `Barrier` that blocks until `size` threads have called `wait`.
Expand All @@ -41,23 +39,24 @@ export class Barrier {
`size must be a positive safe integer, got ${size}`,
);
}
this.#rest = size;
this.#value = size;
}

/**
* Wait for all threads to reach the barrier.
* Blocks until all threads reach the barrier.
*/
async wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
signal?.throwIfAborted();
this.#rest -= 1;
if (this.#rest === 0) {
await Promise.all([
this.#notify.notified({ signal }),
this.#notify.notifyAll(),
]);
} else {
await this.#notify.notified({ signal });
wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
if (signal?.aborted) {
return Promise.reject(signal.reason);
}
const { promise, resolve, reject } = this.#waiter;
const abort = () => reject(signal!.reason);
signal?.addEventListener("abort", abort, { once: true });
this.#value -= 1;
if (this.#value === 0) {
resolve();
}
return promise.finally(() => signal?.removeEventListener("abort", abort));
}
}
23 changes: 23 additions & 0 deletions barrier_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Barrier as Barrier100 } from "jsr:@core/asyncutil@~1.0.0/barrier";
import { Barrier } from "./barrier.ts";

const length = 1_000;

Deno.bench({
name: "current",
fn: async () => {
const barrier = new Barrier(length);
await Promise.all(Array.from({ length }).map(() => barrier.wait()));
},
group: "Barrier#wait",
baseline: true,
});

Deno.bench({
name: "v1.0.0",
fn: async () => {
const barrier = new Barrier100(length);
await Promise.all(Array.from({ length }).map(() => barrier.wait()));
},
group: "Barrier#wait",
});
1 change: 1 addition & 0 deletions deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
],
"exclude": [
"**/*_test.ts",
"**/*_bench.ts",
".*"
]
},
Expand Down
7 changes: 7 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions lock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Mutex } from "./mutex.ts";
import { RawSemaphore } from "./_raw_semaphore.ts";

/**
* A mutual exclusion lock that provides safe concurrent access to a shared value.
Expand All @@ -16,7 +16,7 @@ import { Mutex } from "./mutex.ts";
* ```
*/
export class Lock<T> {
#mu = new Mutex();
#sem = new RawSemaphore(1);
#value: T;

/**
Expand All @@ -32,7 +32,7 @@ export class Lock<T> {
* Returns true if the lock is currently locked, false otherwise.
*/
get locked(): boolean {
return this.#mu.locked;
return this.#sem.locked;
}

/**
Expand All @@ -43,7 +43,11 @@ export class Lock<T> {
* @returns A Promise that resolves with the result of the function.
*/
async lock<R>(fn: (value: T) => R | PromiseLike<R>): Promise<R> {
using _lock = await this.#mu.acquire();
return await fn(this.#value);
await this.#sem.acquire();
try {
return await fn(this.#value);
} finally {
this.#sem.release();
}
}
}
23 changes: 23 additions & 0 deletions lock_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Lock as Lock100 } from "jsr:@core/asyncutil@~1.0.0/lock";
import { Lock } from "./lock.ts";

const length = 1_000;

Deno.bench({
name: "current",
fn: async () => {
const lock = new Lock(0);
await Promise.all(Array.from({ length }).map(() => lock.lock(() => {})));
},
group: "Lock#lock",
baseline: true,
});

Deno.bench({
name: "v1.0.0",
fn: async () => {
const lock = new Lock100(0);
await Promise.all(Array.from({ length }).map(() => lock.lock(() => {})));
},
group: "Lock#lock",
});
22 changes: 8 additions & 14 deletions mutex.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { RawSemaphore } from "./_raw_semaphore.ts";

/**
* A mutex (mutual exclusion) is a synchronization primitive that grants
* exclusive access to a shared resource.
Expand Down Expand Up @@ -26,33 +28,25 @@
* ```
*/
export class Mutex {
#waiters: Set<Promise<void>> = new Set();
#sem: RawSemaphore = new RawSemaphore(1);

/**
* Returns true if the mutex is locked, false otherwise.
*/
get locked(): boolean {
return this.#waiters.size > 0;
return this.#sem.locked;
}

/**
* Acquire the mutex and return a promise with disposable that releases the mutex when disposed.
*
* @returns A Promise with Disposable that releases the mutex when disposed.
*/
acquire(): Promise<Disposable> & Disposable {
const waiters = [...this.#waiters];
const { promise, resolve } = Promise.withResolvers<void>();
this.#waiters.add(promise);
const disposable = {
acquire(): Promise<Disposable> {
return this.#sem.acquire().then(() => ({
[Symbol.dispose]: () => {
resolve();
this.#waiters.delete(promise);
this.#sem.release();
},
};
return Object.assign(
Promise.all(waiters).then(() => disposable),
disposable,
);
}));
}
}
33 changes: 33 additions & 0 deletions mutex_bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Mutex as Mutex100 } from "jsr:@core/asyncutil@~1.0.0/mutex";
import { Mutex } from "./mutex.ts";

const length = 1_000;

Deno.bench({
name: "current",
fn: async () => {
const mutex = new Mutex();
await Promise.all(
Array.from({ length }).map(async () => {
const lock = await mutex.acquire();
lock[Symbol.dispose]();
}),
);
},
group: "Mutex#wait",
baseline: true,
});

Deno.bench({
name: "v1.0.0",
fn: async () => {
const mutex = new Mutex100();
await Promise.all(
Array.from({ length }).map(async () => {
const lock = await mutex.acquire();
lock[Symbol.dispose]();
}),
);
},
group: "Mutex#wait",
});
38 changes: 16 additions & 22 deletions notify.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import { iter } from "@core/iterutil/iter";
import { take } from "@core/iterutil/take";

/**
* Async notifier that allows one or more "waiters" to wait for a notification.
*
Expand All @@ -23,13 +20,13 @@ import { take } from "@core/iterutil/take";
* ```
*/
export class Notify {
#waiters: Set<PromiseWithResolvers<void>> = new Set();
#waiters: PromiseWithResolvers<void>[] = [];

/**
* Returns the number of waiters that are waiting for notification.
*/
get waiterCount(): number {
return this.#waiters.size;
return this.#waiters.length;
}

/**
Expand All @@ -43,40 +40,37 @@ export class Notify {
if (n <= 0 || !Number.isSafeInteger(n)) {
throw new RangeError(`n must be a positive safe integer, got ${n}`);
}
const it = iter(this.#waiters);
for (const waiter of take(it, n)) {
waiter.resolve();
}
this.#waiters = new Set(it);
this.#waiters.splice(0, n).forEach(({ resolve }) => resolve());
}

/**
* Notifies all waiters that are waiting for notification. Resolves each of the notified waiters.
*/
notifyAll(): void {
for (const waiter of this.#waiters) {
waiter.resolve();
}
this.#waiters = new Set();
this.#waiters.forEach(({ resolve }) => resolve());
this.#waiters = [];
}

/**
* Asynchronously waits for notification. The caller's execution is suspended until
* the `notify` method is called. The method returns a Promise that resolves when the caller is notified.
* Optionally takes an AbortSignal to abort the waiting if the signal is aborted.
*/
async notified({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
notified({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
if (signal?.aborted) {
throw signal.reason;
return Promise.reject(signal.reason);
}
const waiter = Promise.withResolvers<void>();
const abort = () => {
this.#waiters.delete(waiter);
waiter.reject(signal!.reason);
const waiter = this.#waiters.shift();
if (waiter) {
waiter.reject(signal!.reason);
}
};
signal?.addEventListener("abort", abort, { once: true });
this.#waiters.add(waiter);
await waiter.promise;
signal?.removeEventListener("abort", abort);
const waiter = Promise.withResolvers<void>();
this.#waiters.push(waiter);
return waiter.promise.finally(() => {
signal?.removeEventListener("abort", abort);
});
}
}
Loading

0 comments on commit d397736

Please sign in to comment.