Skip to content

Commit 9986234

Browse files
authored
Fix EMFILE too many open files, added maxConcurrency option (#8)
1 parent a7b6f2b commit 9986234

10 files changed

+146
-48
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## next
22

3+
- Added `maxConcurrency` option to limit concurrent FS operations, preventing "too many open files" errors (#8)
34
- Fixed Node.js warnings such as "Warning: Closing file descriptor # on garbage collection", which is deprecated in Node.js 22 and will result in an error being thrown in the future
45

56
## 0.1.4 (2024-10-30)

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ await repo.dispose();
2727

2828
- `gitdir`: string - path to the git repo
2929
- `options` – optional settings:
30+
- `maxConcurrency` – limit the number of file system operations (default: 50)
3031
- `cruftPacks` – defines how [cruft packs](https://git-scm.com/docs/cruft-packs) are processed:
3132
- `'include'` or `true` (default) - process all packs
3233
- `'exclude'` or `false` - exclude cruft packs from processing

src/index.ts

+20-16
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@ import { createPackedObjectIndex } from './packed-object-index.js';
66
import { createFilesMethods } from './files-methods.js';
77
import { createCommitMethods } from './commits.js';
88
import { createStatMethod } from './stat.js';
9+
import { promiseAllThreaded } from './utils/threads.js';
910
import { GitReaderOptions, NormalizedGitReaderOptions, CruftPackMode } from './types';
1011

1112
export * from './types.js';
1213
export * from './parse-object.js';
1314
export { isGitDir, resolveGitDir };
1415

15-
export async function createGitReader(gitdir: string, options?: GitReaderOptions) {
16+
export async function createGitReader(gitdir: string, options?: Partial<GitReaderOptions>) {
1617
const startInitTime = Date.now();
1718
const normalizedOptions = normalizeOptions(options);
1819
const resolvedGitDir = await resolveGitDir(gitdir);
1920
const [refIndex, looseObjectIndex, packedObjectIndex] = await Promise.all([
20-
createRefIndex(resolvedGitDir),
21-
createLooseObjectIndex(resolvedGitDir),
21+
createRefIndex(resolvedGitDir, normalizedOptions),
22+
createLooseObjectIndex(resolvedGitDir, normalizedOptions),
2223
createPackedObjectIndex(resolvedGitDir, normalizedOptions)
2324
]);
2425
const { readObjectHeaderByHash, readObjectByHash, readObjectHeaderByOid, readObjectByOid } =
@@ -38,27 +39,30 @@ export async function createGitReader(gitdir: string, options?: GitReaderOptions
3839
async dispose() {
3940
await Promise.all([looseObjectIndex.dispose(), packedObjectIndex.dispose()]);
4041
},
41-
stat: createStatMethod({
42-
gitdir: resolvedGitDir,
43-
refIndex,
44-
looseObjectIndex,
45-
packedObjectIndex
46-
}),
42+
stat: createStatMethod(
43+
resolvedGitDir,
44+
{ refIndex, looseObjectIndex, packedObjectIndex },
45+
normalizedOptions
46+
),
4747

4848
initTime: Date.now() - startInitTime
4949
};
5050
}
5151

52-
function normalizeOptions(options?: GitReaderOptions): NormalizedGitReaderOptions {
53-
if (!options || options.cruftPacks === undefined) {
54-
return { cruftPacks: 'include' };
55-
}
52+
function normalizeOptions(options?: Partial<GitReaderOptions>): NormalizedGitReaderOptions {
53+
const { cruftPacks = true, maxConcurrency } = options || {};
54+
const maxConcurrencyNormalized = Number.isFinite(maxConcurrency)
55+
? (maxConcurrency as number)
56+
: 50;
5657

5758
return {
59+
maxConcurrency: maxConcurrencyNormalized,
60+
performConcurrent: (queue, action) =>
61+
promiseAllThreaded(maxConcurrencyNormalized, queue, action),
5862
cruftPacks:
59-
typeof options.cruftPacks === 'string'
60-
? validateCruftPackMode(options.cruftPacks)
61-
: options.cruftPacks // expands true/false aliases
63+
typeof cruftPacks === 'string'
64+
? validateCruftPackMode(cruftPacks)
65+
: cruftPacks // expands true/false aliases
6266
? 'include'
6367
: 'exclude'
6468
};

src/loose-object-index.ts

+13-11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
GitObject,
66
InternalGitObjectContent,
77
InternalGitObjectHeader,
8+
NormalizedGitReaderOptions,
89
ObjectsTypeStat,
910
PackedObjectType
1011
} from './types.js';
@@ -14,20 +15,21 @@ import { createObjectsTypeStat, objectsStatFromTypes } from './utils/stat.js';
1415
type LooseObjectMap = Map<string, string>;
1516
type LooseObjectMapEntry = [oid: string, relpath: string];
1617

17-
async function createLooseObjectMap(gitdir: string): Promise<LooseObjectMap> {
18+
async function createLooseObjectMap(
19+
gitdir: string,
20+
{ performConcurrent }: NormalizedGitReaderOptions
21+
): Promise<LooseObjectMap> {
1822
const objectsPath = pathJoin(gitdir, 'objects');
1923
const looseDirs = (await fsPromises.readdir(objectsPath)).filter((p) =>
2024
/^[0-9a-f]{2}$/.test(p)
2125
);
2226

23-
const objectDirs = await Promise.all(
24-
looseDirs.map((dir) =>
25-
fsPromises
26-
.readdir(pathJoin(objectsPath, dir))
27-
.then((files) =>
28-
files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`])
29-
)
30-
)
27+
const objectDirs = await performConcurrent(looseDirs, (dir) =>
28+
fsPromises
29+
.readdir(pathJoin(objectsPath, dir))
30+
.then((files) =>
31+
files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`])
32+
)
3133
);
3234

3335
return new Map(objectDirs.flat().sort(([a], [b]) => (a < b ? -1 : 1)));
@@ -77,8 +79,8 @@ function parseLooseObject(buffer: Buffer): InternalGitObjectContent {
7779
};
7880
}
7981

80-
export async function createLooseObjectIndex(gitdir: string) {
81-
const looseObjectMap = await createLooseObjectMap(gitdir);
82+
export async function createLooseObjectIndex(gitdir: string, options: NormalizedGitReaderOptions) {
83+
const looseObjectMap = await createLooseObjectMap(gitdir, options);
8284
const { fanoutTable, binaryNames, names } = indexObjectNames([...looseObjectMap.keys()]);
8385

8486
const getOidFromHash = (hash: Buffer) => {

src/packed-object-index.ts

+3-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const PACKDIR = 'objects/pack';
1919
*/
2020
export async function createPackedObjectIndex(
2121
gitdir: string,
22-
{ cruftPacks }: NormalizedGitReaderOptions
22+
{ cruftPacks, performConcurrent }: NormalizedGitReaderOptions
2323
) {
2424
function readObjectHeaderByHash(
2525
hash: Buffer,
@@ -75,10 +75,8 @@ export async function createPackedObjectIndex(
7575
: !cruftPackFilenames.includes(filename);
7676
});
7777

78-
const packFiles = await Promise.all(
79-
packFilenames.map((filename) =>
80-
readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
81-
)
78+
const packFiles = await performConcurrent(packFilenames, async (filename) =>
79+
readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
8280
);
8381

8482
return {

src/resolve-ref.ts

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { promises as fsPromises, existsSync } from 'fs';
22
import { join as pathJoin, basename, sep as pathSep } from 'path';
33
import { scanFs } from '@discoveryjs/scan-fs';
4+
import { NormalizedGitReaderOptions } from './types.js';
45

56
type Ref = {
67
name: string;
@@ -49,7 +50,10 @@ function isOid(value: unknown) {
4950
return typeof value === 'string' && value.length === 40 && /^[0-9a-f]{40}$/.test(value);
5051
}
5152

52-
export async function createRefIndex(gitdir: string) {
53+
export async function createRefIndex(
54+
gitdir: string,
55+
{ performConcurrent }: NormalizedGitReaderOptions
56+
) {
5357
const refResolver = await createRefResolver(gitdir);
5458

5559
// expand a ref into a full form
@@ -136,8 +140,8 @@ export async function createRefIndex(gitdir: string) {
136140
let cachedRefsWithOid = listRefsWithOidCache.get(prefix);
137141

138142
if (cachedRefsWithOid === undefined) {
139-
const oids = await Promise.all(
140-
cachedRefs.map((name) => refResolver.resolveOid(prefix + name))
143+
const oids = await performConcurrent(cachedRefs, (name) =>
144+
refResolver.resolveOid(prefix + name)
141145
);
142146

143147
cachedRefsWithOid = cachedRefs.map((name, index) => ({
@@ -210,8 +214,8 @@ export async function createRefIndex(gitdir: string) {
210214

211215
async stat() {
212216
const remotes = listRemotes();
213-
const branchesByRemote = await Promise.all(
214-
remotes.map((remote) => listRemoteBranches(remote))
217+
const branchesByRemote = await performConcurrent(remotes, (remote) =>
218+
listRemoteBranches(remote)
215219
);
216220

217221
return {

src/stat.ts

+11-10
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@ import { sumObjectsStat } from './utils/stat.js';
55
import { createRefIndex } from './resolve-ref.js';
66
import { createLooseObjectIndex } from './loose-object-index.js';
77
import { createPackedObjectIndex } from './packed-object-index.js';
8+
import { NormalizedGitReaderOptions } from './types.js';
89

9-
export function createStatMethod({
10-
gitdir,
11-
refIndex,
12-
looseObjectIndex,
13-
packedObjectIndex
14-
}: {
15-
gitdir: string;
10+
type CreateStatMethodInput = {
1611
refIndex: Awaited<ReturnType<typeof createRefIndex>>;
1712
looseObjectIndex: Awaited<ReturnType<typeof createLooseObjectIndex>>;
1813
packedObjectIndex: Awaited<ReturnType<typeof createPackedObjectIndex>>;
19-
}) {
14+
};
15+
16+
export function createStatMethod(
17+
gitdir: string,
18+
{ refIndex, looseObjectIndex, packedObjectIndex }: CreateStatMethodInput,
19+
{ performConcurrent }: NormalizedGitReaderOptions
20+
) {
2021
return async function () {
2122
const [refs, looseObjects, packedObjects, { files }] = await Promise.all([
2223
refIndex.stat(),
@@ -25,8 +26,8 @@ export function createStatMethod({
2526
scanFs(gitdir)
2627
]);
2728

28-
const fileStats = await Promise.all(
29-
files.map((file) => fsPromises.stat(path.join(gitdir, file.path)))
29+
const fileStats = await performConcurrent(files, (file) =>
30+
fsPromises.stat(path.join(gitdir, file.path))
3031
);
3132

3233
const objectsTypes = looseObjects.objects.types.map((entry) => ({ ...entry }));

src/types.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,20 @@ export interface GitReaderOptions {
131131
*
132132
* @default 'include'
133133
*/
134-
cruftPacks?: CruftPackMode | boolean;
134+
cruftPacks: CruftPackMode | boolean;
135+
136+
/**
137+
* Maximum number of concurrent file system operations.
138+
* @default 50
139+
*/
140+
maxConcurrency: number;
135141
}
136142

137143
export interface NormalizedGitReaderOptions {
138144
cruftPacks: CruftPackMode;
145+
maxConcurrency: number;
146+
performConcurrent: <T, R>(
147+
queue: T[],
148+
action: (item: T, itemIdx: number) => Promise<R>
149+
) => Promise<R[]>;
139150
}

src/utils/threads.ts

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Run async tasks in queue with a maximum number of threads.
3+
* Works like Promise.all, but with a maximum number of threads.
4+
* - The order of the results is guaranteed to be the same as the order of the input queue.
5+
* - If any task fails, the whole queue is rejected.
6+
* - If the queue is empty, the result is an empty array.
7+
* - If the queue has only one task, the result is an array with one element.
8+
*
9+
* @example
10+
* // Before
11+
* const packFiles = await Promise.all(
12+
* packFilenames.map((filename) =>
13+
* readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
14+
* )
15+
* );
16+
*
17+
* // After
18+
* const packFiles = await promiseAllThreaded(50, packFilenames, async (filename) =>
19+
* readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
20+
* );
21+
*/
22+
export async function promiseAllThreaded<T, R>(
23+
maxThreadCount: number,
24+
queue: T[],
25+
asyncFn: (task: T, taskIdx: number) => Promise<R>
26+
): Promise<R[]> {
27+
const result = Array(queue.length);
28+
let taskProcessed = 0;
29+
let queueSnapshot = [...queue];
30+
const thread = async () => {
31+
while (taskProcessed < queueSnapshot.length) {
32+
const taskIdx = taskProcessed++;
33+
const task = queueSnapshot[taskIdx];
34+
result[taskIdx] = await asyncFn(task, taskIdx);
35+
}
36+
};
37+
38+
await Promise.all(
39+
Array.from({ length: Math.min(maxThreadCount, queueSnapshot.length) }, () => thread())
40+
).catch((err) => {
41+
// remove all pending tasks
42+
queueSnapshot = [];
43+
throw err;
44+
});
45+
46+
return result;
47+
}

test/utils.ts

+29
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,38 @@
11
import assert from 'assert';
22
import { readEncodedOffset, BufferCursor } from '../src/utils/buffer.js';
3+
import { promiseAllThreaded } from '../src/utils/threads.js';
34

45
it('readEncodedOffset', () => {
56
const buffer = Buffer.from([142, 254, 254, 254, 254, 254, 254, 127]);
67
const cursor = new BufferCursor(buffer);
78

89
assert.strictEqual(readEncodedOffset(cursor), Number.MAX_SAFE_INTEGER);
910
});
11+
12+
it('promiseAllThreaded', async () => {
13+
const maxThreadCount = 2;
14+
const queue = [1, 2, 3, 4, 5];
15+
const asyncFn = async (task: number) => task * 2;
16+
17+
const result = await promiseAllThreaded(maxThreadCount, queue, asyncFn);
18+
19+
assert.deepStrictEqual(result, [2, 4, 6, 8, 10]);
20+
});
21+
22+
it('promiseAllThreaded with error', async () => {
23+
const maxThreadCount = 2;
24+
const queue = [1, 2, 3, 4, 5];
25+
const asyncFn = async (task: number) => {
26+
if (task === 3) {
27+
throw new Error('Task failed');
28+
}
29+
return task * 2;
30+
};
31+
32+
try {
33+
await promiseAllThreaded(maxThreadCount, queue, asyncFn);
34+
assert.fail('Expected an error');
35+
} catch (err) {
36+
assert.strictEqual(err.message, 'Task failed');
37+
}
38+
});

0 commit comments

Comments
 (0)