Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve pg write queue #1878

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/datastore/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1343,14 +1343,26 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities {
*/
export class PgWriteQueue {
readonly queue: PQueue;
private tasks: Promise<unknown>[];
constructor() {
const concurrency = Math.max(1, getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4));
this.queue = new PQueue({ concurrency, autoStart: true });
this.tasks = [];
}
enqueue(task: Parameters<PQueue['add']>[0]): void {
void this.queue.add(task);
const p = this.queue.add(task);
p.catch(e => logger.error(e, 'PgWriteQueue task failed'));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this catch needed, or would it be handled later in the throw firstRejected.reason code?

If the error is logged here, it would have no meaningful stack trace compared to the below code which preserves the async call stack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no catch, the process will be terminated by an unhandledRejection.

The error is logged here because throw firstRejected.reason only throws the first one, we don't have any log for the other errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, would it make sense to use the p-queue error event instead? https://www.npmjs.com/package/p-queue#error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we saw using a node built from this PR.

image

Copy link
Member

@zone117x zone117x Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we can manually capture the errors ourself with .catch to prevent a terminated by an unhandledRejection. But could we instead use the build-in p-queue errror handler, e.g.

queue.on('error', error => {
	console.error(error);
});

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.tasks.push(p);
}
done(): Promise<void> {
return this.queue.onIdle();
async done(): Promise<void> {
// https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe the problem described in this post applies to the tasks handled by this queue for a couple of reasons:

  1. PQueue's onIdle already behaves like allSettled

    Returns a promise that settles when the queue becomes empty, and all promises have completed

  2. The API only enqueues work to this queue inside of a pre-existing SQL transaction (all the block processing code is contained in a single sqlWriteTransaction). If there's ever an exception thrown inside, the transaction aborts and rolls back all the pending writes. We've done extensive tests on this and it's worked correctly for a long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. That's what I thought, but it was wrong; here's a demonstration: https://replit.com/@bestmike007/TroubledEarnestSandboxes#index.ts
  2. onIdle does not throw an exception, it simply waits all promises to complete.

What I'm seeing is, there was an unhandledRejection and then the process restarted.

const results = await Promise.allSettled(this.tasks);
this.tasks = [];
const firstRejected = results.find(v => v.status === 'rejected') as
| PromiseRejectedResult
| undefined;
if (firstRejected != null) {
throw firstRejected.reason;
}
}
}
Loading