Skip to content

Commit

Permalink
Allow sendActivity/forwardActivity to reject
Browse files Browse the repository at this point in the history
on fail to enqueue

Fix #192
  • Loading branch information
dahlia committed Nov 27, 2024
1 parent 5c16bd7 commit 49db883
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ To be released.

- Added `getTypeId()` function.

- `Context.sendActivity()` and `InboxContext.forwardActivity()` methods now
reject when they fail to enqueue the task. [[#192]]

- Fedify now supports OpenTelemetry for tracing. [[#170]]

- Added `Context.tracerProvider` property.
Expand Down Expand Up @@ -103,6 +106,7 @@ To be released.
[#173]: https://github.com/dahlia/fedify/issues/173
[#183]: https://github.com/dahlia/fedify/pull/183
[#186]: https://github.com/dahlia/fedify/pull/186
[#192]: https://github.com/dahlia/fedify/issues/192


Version 1.2.8
Expand Down
44 changes: 40 additions & 4 deletions src/federation/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
"#{attempt}); retry...:\n{error}",
{ ...logData, error },
);
this.outboxQueue?.enqueue(
await this.outboxQueue?.enqueue(
{
...message,
attempt: message.attempt + 1,
Expand Down Expand Up @@ -690,7 +690,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
recipient: message.identifier,
},
);
this.inboxQueue?.enqueue(
await this.inboxQueue?.enqueue(
{
...message,
attempt: message.attempt + 1,
Expand Down Expand Up @@ -1975,6 +1975,7 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk });
}
if (!this.manuallyStartQueue) this.#startQueue(contextData);
const promises: Promise<void>[] = [];
for (const inbox in inboxes) {
const message: OutboxMessage = {
type: "outbox",
Expand All @@ -1995,7 +1996,24 @@ export class FederationImpl<TContextData> implements Federation<TContextData> {
),
},
};
this.outboxQueue.enqueue(message);
promises.push(this.outboxQueue.enqueue(message));
}
const results = await Promise.allSettled(promises);
const errors = results
.filter((r) => r.status === "rejected")
.map((r) => r.reason);
if (errors.length > 0) {
logger.error(
"Failed to enqueue activity {activityId} to send later: {errors}",
{ activityId: activity.id!.href, errors },
);
if (errors.length > 1) {
throw new AggregateError(
errors,
`Failed to enqueue activity ${activityId} to send later.`,
);
}
throw errors[0];
}
}

Expand Down Expand Up @@ -3216,6 +3234,7 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
const privateKeyJwk = await exportJwk(privateKey);
keyJwkPairs.push({ keyId: keyId.href, privateKey: privateKeyJwk });
}
const promises: Promise<void>[] = [];
for (const inbox in inboxes) {
const message: OutboxMessage = {
type: "outbox",
Expand All @@ -3230,7 +3249,24 @@ export class InboxContextImpl<TContextData> extends ContextImpl<TContextData>
attempt: 0,
headers: {},
};
this.federation.outboxQueue.enqueue(message);
promises.push(this.federation.outboxQueue.enqueue(message));
}
const results = await Promise.allSettled(promises);
const errors: unknown[] = results
.filter((r) => r.status === "rejected")
.map((r) => r.reason);
if (errors.length > 0) {
logger.error(
"Failed to enqueue activity {activityId} to forward later:\n{errors}",
{ activityId: this.activityId, errors },
);
if (errors.length > 1) {
throw new AggregateError(
errors,
`Failed to enqueue activity ${this.activityId} to forward later.`,
);
}
throw errors[0];
}
}
}
Expand Down

0 comments on commit 49db883

Please sign in to comment.