Skip to content

Commit

Permalink
Make collection dispatcher spans finer-grained
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Nov 27, 2024
1 parent 61d94af commit be4ca8c
Showing 1 changed file with 80 additions and 71 deletions.
151 changes: 80 additions & 71 deletions src/federation/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getLogger } from "@logtape/logtape";
import type { Span, TracerProvider } from "@opentelemetry/api";
import type { TracerProvider } from "@opentelemetry/api";
import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
import { accepts } from "@std/http/negotiation";
import metadata from "../deno.json" with { type: "json" };
Expand All @@ -10,7 +10,6 @@ import { detachSignature, verifyJsonLd } from "../sig/ld.ts";
import { doesActorOwnKey } from "../sig/owner.ts";
import { verifyObject } from "../sig/proof.ts";
import type { Recipient } from "../vocab/actor.ts";
import { getTypeId } from "../vocab/type.ts";
import {
Activity,
CryptographicKey,
Expand Down Expand Up @@ -197,46 +196,13 @@ export interface CollectionHandlerParameters<
onNotAcceptable(request: Request): Response | Promise<Response>;
}

export function handleCollection<
export async function handleCollection<
TItem extends URL | Object | Link | Recipient,
TContext extends RequestContext<TContextData>,
TContextData,
TFilter,
>(
request: Request,
params: CollectionHandlerParameters<TItem, TContext, TContextData, TFilter>,
): Promise<Response> {
const name = params.name.trim().replace(/\s+/g, "_");
const tracerProvider = params.tracerProvider ?? trace.getTracerProvider();
const tracer = tracerProvider.getTracer(metadata.name, metadata.version);
const url = new URL(request.url);
const cursor = url.searchParams.get("cursor");
return tracer.startActiveSpan(
cursor == null
? `activitypub.dispatch_collection ${name}`
: `activitypub.dispatch_collection_page ${name}`,
{ kind: SpanKind.SERVER },
async (span) => {
try {
return await handleCollectionInternal(request, cursor, params, span);
} catch (e) {
span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) });
throw e;
} finally {
span.end();
}
},
);
}

async function handleCollectionInternal<
TItem extends URL | Object | Link | Recipient,
TContext extends RequestContext<TContextData>,
TContextData,
TFilter,
>(
request: Request,
cursor: string | null,
{
name,
identifier,
Expand All @@ -245,12 +211,17 @@ async function handleCollectionInternal<
filterPredicate,
context,
collectionCallbacks,
tracerProvider,
onUnauthorized,
onNotFound,
onNotAcceptable,
}: CollectionHandlerParameters<TItem, TContext, TContextData, TFilter>,
span: Span,
): Promise<Response> {
const spanName = name.trim().replace(/\s+/g, "_");
tracerProvider = tracerProvider ?? trace.getTracerProvider();
const tracer = tracerProvider.getTracer(metadata.name, metadata.version);
const url = new URL(request.url);
const cursor = url.searchParams.get("cursor");
if (collectionCallbacks == null) return await onNotFound(request);
let collection: OrderedCollection | OrderedCollectionPage;
const baseUri = uriGetter(identifier);
Expand All @@ -260,29 +231,50 @@ async function handleCollectionInternal<
identifier,
);
const totalItems = await collectionCallbacks.counter?.(context, identifier);
if (totalItems != null) {
span.setAttribute(
"activitypub.collection.total_items",
Number(totalItems),
);
}
if (firstCursor == null) {
const page = await collectionCallbacks.dispatcher(
context,
identifier,
null,
filter,
const itemsOrResponse = await tracer.startActiveSpan(
`activitypub.dispatch_collection ${spanName}`,
{
kind: SpanKind.SERVER,
attributes: {
"activitypub.collection.id": baseUri.href,
"activitypub.collection.type": OrderedCollection.typeId.href,
},
},
async (span) => {
if (totalItems != null) {
span.setAttribute(
"activitypub.collection.total_items",
Number(totalItems),
);
}
try {
const page = await collectionCallbacks.dispatcher(
context,
identifier,
null,
filter,
);
if (page == null) {
span.setStatus({ code: SpanStatusCode.ERROR });
return await onNotFound(request);
}
const { items } = page;
span.setAttribute("fedify.collection.items", items.length);
return items;
} catch (e) {
span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) });
throw e;
} finally {
span.end();
}
},
);
if (page == null) {
span.setStatus({ code: SpanStatusCode.ERROR });
return await onNotFound(request);
}
const { items } = page;
span.setAttribute("fedify.collection.items", items.length);
if (itemsOrResponse instanceof Response) return itemsOrResponse;
collection = new OrderedCollection({
id: baseUri,
totalItems: totalItems == null ? null : Number(totalItems),
items: filterCollectionItems(items, name, filterPredicate),
items: filterCollectionItems(itemsOrResponse, name, filterPredicate),
});
} else {
const lastCursor = await collectionCallbacks.lastCursor?.(
Expand All @@ -304,21 +296,42 @@ async function handleCollectionInternal<
});
}
} else {
span.setAttribute("fedify.collection.cursor", cursor);
const uri = new URL(baseUri);
uri.searchParams.set("cursor", cursor);
const page = await collectionCallbacks.dispatcher(
context,
identifier,
cursor,
filter,
const pageOrResponse = await tracer.startActiveSpan(
`activitypub.dispatch_collection_page ${name}`,
{
kind: SpanKind.SERVER,
attributes: {
"activitypub.collection.id": uri.href,
"activitypub.collection.type": OrderedCollectionPage.typeId.href,
"fedify.collection.cursor": cursor,
},
},
async (span) => {
try {
const page = await collectionCallbacks.dispatcher(
context,
identifier,
cursor,
filter,
);
if (page == null) {
span.setStatus({ code: SpanStatusCode.ERROR });
return await onNotFound(request);
}
span.setAttribute("fedify.collection.items", page.items.length);
return page;
} catch (e) {
span.setStatus({ code: SpanStatusCode.ERROR, message: String(e) });
throw e;
} finally {
span.end();
}
},
);
if (page == null) {
span.setStatus({ code: SpanStatusCode.ERROR });
return await onNotFound(request);
}
const { items, prevCursor, nextCursor } = page;
span.setAttribute("fedify.collection.items", items.length);
if (pageOrResponse instanceof Response) return pageOrResponse;
const { items, prevCursor, nextCursor } = pageOrResponse;
let prev = null;
if (prevCursor != null) {
prev = new URL(context.url);
Expand Down Expand Up @@ -354,10 +367,6 @@ async function handleCollectionInternal<
return await onUnauthorized(request);
}
}
if (collection.id != null) {
span.setAttribute("activitypub.collection.id", collection.id.href);
}
span.setAttribute("activitypub.collection.type", getTypeId(collection).href);
const jsonLd = await collection.toJsonLd(context);
return new Response(JSON.stringify(jsonLd), {
headers: {
Expand Down

0 comments on commit be4ca8c

Please sign in to comment.