Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 237 additions & 0 deletions src/ingest-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/**
* Worker pools for shared-nothing parallel ingest / clone
* (PUSHWORK_PARALLEL_INGEST=shard).
*
* Each worker owns a full Repo (own Wasm, own storage writes, own socket) and
* reports only URLs/heads (push) or which files it wrote (clone); the main
* thread never materializes the file documents. New files / leaves are split
* round-robin across a bounded pool, and a per-worker failure falls back to
* main-thread handling rather than failing the whole operation.
*
* The worker scripts run as compiled CommonJS sitting next to this module
* (dist/workers/...), so a build must exist even when the engine runs from
* source via tsx.
*/
import * as os from "os";
import * as path from "path";
import { Worker } from "worker_threads";
import {
parseAutomergeUrl,
stringifyAutomergeUrl,
type AutomergeUrl,
type UrlHeads,
} from "@automerge/automerge-repo";
import type { Backend } from "./config.js";
import { log } from "./log.js";
import { isInArtifactDir } from "./shapes/index.js";
import type {
ShardIngestData,
ShardIngestReport,
ShardIngestTask,
} from "./workers/shard-ingest-worker.js";
import type {
ShardCloneData,
ShardCloneReport,
} from "./workers/shard-clone-worker.js";

const dlog = log("ingest-pool");

// Higher worker counts exhaust file descriptors during the online upload
// storm (EMFILE). Overridable for experiments via PUSHWORK_WORKERS.
const SHARD_WORKER_CAP = Math.max(
1,
Math.floor(Number(process.env.PUSHWORK_WORKERS) || 8),
);

// Below this many items, spinning up workers (each ~1s of Wasm + Repo init)
// doesn't pay for itself versus the main-thread path.
const SHARD_MIN_ITEMS = 8;

// Auto-dispatch thresholds for ingest: shard automatically only when the CRDT
// build is heavy enough to beat per-worker overhead. Tiny files are
// overhead-bound (a wash), so a small-file tree stays on the main thread even
// past the item floor.
const AUTO_AVG_BYTES = 8 * 1024;
const AUTO_TOTAL_BYTES = 4 * 1024 * 1024;

type ExplicitMode = "on" | "off" | null;

// `PUSHWORK_PARALLEL_INGEST=shard`/`2` forces the pools on; `off`/`0` forces
// them off; anything else leaves the adaptive policy in charge.
function explicitMode(): ExplicitMode {
const v = process.env.PUSHWORK_PARALLEL_INGEST;
if (v === "shard" || v === "2") return "on";
if (v === "off" || v === "0") return "off";
return null;
}

/**
* Whether to shard an ingest of `files`. Explicit on/off always wins; otherwise
* auto-enable only for CPU-bound trees (large average file or large total
* bytes), since the per-char op-tree build is what the workers parallelize.
*/
export function shouldShardIngest(files: Map<string, Uint8Array>): boolean {
if (files.size < SHARD_MIN_ITEMS) return false;
const mode = explicitMode();
if (mode !== null) return mode === "on";
let total = 0;
for (const bytes of files.values()) total += bytes.length;
return total >= AUTO_TOTAL_BYTES || total / files.size >= AUTO_AVG_BYTES;
}

/**
* Whether to shard a clone/materialize of `leafCount` files. Parallel
* download/read wins broadly (the serial path pays a per-file `repo.find`), so
* auto-enable past the floor; explicit `off`/`0` still declines.
*/
export function shouldShardClone(leafCount: number): boolean {
if (leafCount < SHARD_MIN_ITEMS) return false;
return explicitMode() !== "off";
}

function workerCount(itemCount: number): number {
const cores = Math.max(1, (os.cpus()?.length ?? 1) - 1);
return Math.max(1, Math.min(SHARD_WORKER_CAP, cores, itemCount));
}

// Worker scripts are compiled next to this module: dist/ingest-pool.js +
// dist/workers/<name>.js (or dist-bench/src/... under the bench build).
function workerPath(name: string): string {
return path.join(__dirname, "workers", name);
}

function partition<T>(items: T[], buckets: number): T[][] {
const out: T[][] = Array.from({ length: buckets }, () => []);
items.forEach((item, i) => out[i % buckets].push(item));
return out;
}

function pinFromHeads(url: AutomergeUrl, heads: string[]): AutomergeUrl {
const { documentId } = parseAutomergeUrl(url);
return stringifyAutomergeUrl({ documentId, heads: heads as UrlHeads });
}

// Spawn a worker, resolve with its single report message, and terminate it
// eagerly on report (the report is sent strictly after the worker's storage
// flush / shutdown, so nothing of value remains — and a leftover sync timer
// would otherwise keep the worker's event loop alive long after shutdown).
function runWorker<TData, TReport>(
scriptPath: string,
workerData: TData,
): Promise<TReport> {
return new Promise<TReport>((resolve, reject) => {
const worker = new Worker(scriptPath, { workerData });
let settled = false;
worker.once("message", (msg: TReport) => {
settled = true;
void worker.terminate();
resolve(msg);
});
worker.once("error", (err) => {
settled = true;
reject(err);
});
worker.once("exit", (code) => {
if (!settled) reject(new Error(`worker exited with code ${code}`));
});
});
}

export type ShardIngestOpts = {
root: string;
backend: Backend;
online: boolean;
files: Map<string, Uint8Array>;
artifactDirs: readonly string[];
};

/**
* Create file documents for `files` across the worker pool. Returns the URLs
* the workers created (artifact paths pinned to their reported heads) keyed by
* posix path, plus the paths a worker could not create (caller materializes
* those on the main thread).
*/
export async function shardIngest(
opts: ShardIngestOpts,
): Promise<{ created: Map<string, AutomergeUrl>; failed: string[] }> {
const tasks: ShardIngestTask[] = Array.from(opts.files, ([relPath, bytes]) => ({
relPath,
bytes,
isArtifact: isInArtifactDir(relPath, opts.artifactDirs),
}));
const n = workerCount(tasks.length);
dlog("shardIngest files=%d workers=%d online=%s", tasks.length, n, opts.online);

const reports = await Promise.all(
partition(tasks, n).map((shard) =>
runWorker<ShardIngestData, ShardIngestReport>(
workerPath("shard-ingest-worker.js"),
{ root: opts.root, backend: opts.backend, online: opts.online, tasks: shard },
).catch((err): ShardIngestReport => {
dlog("ingest worker failed, falling back to main: %s", err);
return {
results: shard.map((t) => ({
relPath: t.relPath,
ok: false,
error: String(err),
})),
};
}),
),
);

const created = new Map<string, AutomergeUrl>();
const failed: string[] = [];
for (const report of reports) {
for (const r of report.results) {
if (r.ok) {
created.set(r.relPath, r.isArtifact ? pinFromHeads(r.url, r.heads) : r.url);
} else {
failed.push(r.relPath);
}
}
}
dlog("shardIngest created=%d failed=%d", created.size, failed.length);
return { created, failed };
}

export type ShardCloneOpts = {
root: string;
backend: Backend;
online: boolean;
leaves: Map<string, AutomergeUrl>;
};

/**
* Write the `leaves` (posix path → file-doc URL) to disk across the worker
* pool. Returns the paths workers wrote and the paths they could not (caller
* materializes those on the main thread).
*/
export async function shardClone(
opts: ShardCloneOpts,
): Promise<{ written: Set<string>; failed: string[] }> {
const entries = Array.from(opts.leaves) as [string, AutomergeUrl][];
const n = workerCount(entries.length);
dlog("shardClone leaves=%d workers=%d online=%s", entries.length, n, opts.online);

const reports = await Promise.all(
partition(entries, n).map((shard) =>
runWorker<ShardCloneData, ShardCloneReport>(
workerPath("shard-clone-worker.js"),
{ root: opts.root, backend: opts.backend, online: opts.online, leaves: shard },
).catch((err): ShardCloneReport => {
dlog("clone worker failed, falling back to main: %s", err);
return { written: [], failed: shard.map(([relPath]) => relPath) };
}),
),
);

const written = new Set<string>();
const failed: string[] = [];
for (const report of reports) {
for (const p of report.written) written.add(p);
for (const p of report.failed) failed.push(p);
}
dlog("shardClone written=%d failed=%d", written.size, failed.length);
return { written, failed };
}
114 changes: 111 additions & 3 deletions src/pushwork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import {
} from "./config.js";
import { loadIgnore } from "./ignore.js";
import { byteEq, walkDir, writeFileAtomic } from "./fs-tree.js";
import {
shardClone,
shardIngest,
shouldShardClone,
shouldShardIngest,
} from "./ingest-pool.js";
import { log } from "./log.js";
import { openRepo, waitForSync } from "./repo.js";
import {
Expand Down Expand Up @@ -116,7 +122,9 @@ export async function init(opts: InitOpts): Promise<AutomergeUrl> {
dlog("init walked %d files", fsFiles.size);

const title = path.basename(root) || undefined;
const tree = await pushFiles(repo, fsFiles, undefined, artifactDirs);
const tree = shouldShardIngest(fsFiles)
? await ingestSharded(repo, root, opts.backend, online, fsFiles, artifactDirs)
: await pushFiles(repo, fsFiles, undefined, artifactDirs);
const folderUrl = await shape.encode({ repo, tree, title });
dlog("init encoded folder=%s title=%s", folderUrl, title);
const folderHandle = await repo.find<unknown>(folderUrl);
Expand Down Expand Up @@ -196,7 +204,7 @@ export async function clone(opts: CloneOpts): Promise<void> {
});

const tree = await shape.decode({ repo, root: folderHandle });
await materializeTree(repo, root, tree);
await materializeTree(repo, root, tree, { backend: opts.backend, online });

await writeConfig(root, {
version: CONFIG_VERSION,
Expand Down Expand Up @@ -832,6 +840,49 @@ async function pushFiles(
return root;
}

/**
* Like `pushFiles` for the all-new case (init), but builds the file documents
* across the shared-nothing worker pool: workers create + persist (+ upload)
* their shard and report `{path, url, heads}`; this thread only stitches the
* URLs into the tree (pinning artifacts from the reported heads) and falls
* back to main-thread creation for any path a worker could not handle.
*/
async function ingestSharded(
repo: Repo,
root: string,
backend: Backend,
online: boolean,
fsFiles: Map<string, Uint8Array>,
artifactDirs: readonly string[],
): Promise<VfsNode> {
const { created, failed } = await shardIngest({
root,
backend,
online,
files: fsFiles,
artifactDirs,
});

const tree = newDir();
for (const [posixPath, url] of created) {
setFileAt(tree, posixPath.split("/").filter(Boolean), url);
}

for (const posixPath of failed) {
const bytes = fsFiles.get(posixPath);
if (!bytes) continue;
const isArtifact = isInArtifactDir(posixPath, artifactDirs);
const handle = repo.create<UnixFileEntry>(
makeFileEntry(posixPath, bytes, isArtifact),
);
const url = isArtifact ? pinUrl(handle) : handle.url;
setFileAt(tree, posixPath.split("/").filter(Boolean), url);
}

dlog("ingestSharded created=%d main-fallback=%d", created.size, failed.length);
return tree;
}

/**
* Re-pin every artifact leaf in the folder doc to its file doc's current
* heads. Bare (non-artifact) URLs are left as-is since they already track
Expand Down Expand Up @@ -885,9 +936,20 @@ async function materializeTree(
repo: Repo,
root: string,
tree: VfsNode,
shardCtx?: { backend: Backend; online: boolean },
): Promise<void> {
const leaves = flattenLeaves(tree);

// Clone path: fan the per-file download + write out to the worker pool.
// Only the caller that knows the working tree is freshly materialized
// (clone) passes shardCtx, so writing every leaf is correct here.
if (shardCtx && shouldShardClone(leaves.size)) {
await materializeSharded(repo, root, leaves, shardCtx);
return;
}

const desired = new Map<string, Uint8Array>();
for (const [posixPath, fileUrl] of flattenLeaves(tree)) {
for (const [posixPath, fileUrl] of leaves) {
const handle = await repo.find<UnixFileEntry>(fileUrl);
desired.set(posixPath, contentToBytes(handle.doc().content));
}
Expand Down Expand Up @@ -916,6 +978,52 @@ async function materializeTree(
dlog("materialize done: %d written, %d removed", written, removed);
}

async function materializeSharded(
repo: Repo,
root: string,
leaves: Map<string, AutomergeUrl>,
shardCtx: { backend: Backend; online: boolean },
): Promise<void> {
const { written, failed } = await shardClone({
root,
backend: shardCtx.backend,
online: shardCtx.online,
leaves,
});

// Main-thread fallback for any leaf a worker could not write.
for (const posixPath of failed) {
const url = leaves.get(posixPath);
if (!url) continue;
const handle = await repo.find<UnixFileEntry>(url);
await writeFileAtomic(
path.join(root, fromPosix(posixPath)),
contentToBytes(handle.doc().content),
);
}

// Remove anything on disk the tree no longer references.
const ig = await loadIgnore(root);
const present = await walkDir(root, ig);
let removed = 0;
for (const posixPath of present.keys()) {
if (leaves.has(posixPath)) continue;
try {
await fs.unlink(path.join(root, fromPosix(posixPath)));
removed++;
} catch {
// already gone
}
await pruneEmptyDirs(root, path.dirname(fromPosix(posixPath)));
}
dlog(
"materialize (shard) written=%d main-fallback=%d removed=%d",
written.size,
failed.length,
removed,
);
}

const fromPosix = (p: string) => p.split("/").join(path.sep);

async function pruneEmptyDirs(root: string, relDir: string): Promise<void> {
Expand Down
Loading
Loading