Skip to content

Commit 5868624

Browse files
authored
Merge pull request #111 from autonomys/add-partial-downloads
feat: add partial downloads
2 parents 5add0b4 + 7f92a06 commit 5868624

File tree

8 files changed

+305
-36
lines changed

8 files changed

+305
-36
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import {
2+
createFileChunkIpldNode,
3+
PBNode,
4+
cidToString,
5+
cidOfNode,
6+
createChunkedFileIpldNode,
7+
processFileToIPLDFormat,
8+
stringToCid,
9+
decodeNode,
10+
NODE_METADATA_SIZE,
11+
} from '@autonomys/auto-dag-data'
12+
import { dsnFetcher } from '../src/services/dsnFetcher.js'
13+
import { jest } from '@jest/globals'
14+
import { randomBytes } from 'crypto'
15+
import { MemoryBlockstore } from 'blockstore-core'
16+
17+
describe('Partial downloads', () => {
18+
beforeEach(() => {
19+
jest.clearAllMocks()
20+
})
21+
afterEach(() => {
22+
jest.clearAllMocks()
23+
})
24+
25+
it('should return the correct chunk', async () => {
26+
const nodeMapByCID: Record<string, PBNode> = {}
27+
const chunks = ['a', 'b', 'c', 'd', 'e']
28+
29+
// Create the tree of nodes
30+
const nodes = chunks.map((chunk) =>
31+
createFileChunkIpldNode(Buffer.from(chunk, 'utf-8')),
32+
)
33+
const headNode = createChunkedFileIpldNode(
34+
nodes.map((node) => cidOfNode(node)),
35+
5n,
36+
1,
37+
)
38+
39+
// Create a map of the nodes by CID
40+
nodes.forEach((node) => {
41+
const cid = cidToString(cidOfNode(node))
42+
nodeMapByCID[cid] = node
43+
})
44+
nodeMapByCID[cidToString(cidOfNode(headNode))] = headNode
45+
46+
const cid = cidToString(cidOfNode(headNode))
47+
48+
jest.spyOn(dsnFetcher, 'fetchNode').mockImplementation(async (cid) => {
49+
return nodeMapByCID[cid]
50+
})
51+
52+
// First node (head) should return null
53+
const chunk = await dsnFetcher.getPartial(cid, 0)
54+
expect(chunk).toBeInstanceOf(Buffer)
55+
expect(chunk?.length).toBe(0)
56+
57+
// Chunks 1-5 should return the correct chunk
58+
let i = 0
59+
while (i < chunks.length) {
60+
// i + 1 for offsetting the head node
61+
const chunk = await dsnFetcher.getPartial(cid, i + 1)
62+
expect(chunk).toBeDefined()
63+
expect(chunk?.toString('utf-8')).toBe(chunks[i])
64+
i++
65+
}
66+
67+
// Chunk 6 should return empty buffer because it's past the end of the file
68+
const chunk6 = await dsnFetcher.getPartial(cid, 6)
69+
expect(chunk6).toBeNull()
70+
})
71+
72+
it('should work with a large file', async () => {
73+
const chunksCount = 30
74+
const maxLinkPerNode = 5
75+
const chunkSize = 50
76+
const totalSize = chunksCount * chunkSize
77+
78+
const chunks = Array.from({ length: chunksCount }).map(() =>
79+
randomBytes(chunkSize),
80+
)
81+
82+
const blockstore = new MemoryBlockstore()
83+
84+
const cid = cidToString(
85+
await processFileToIPLDFormat(
86+
blockstore,
87+
chunks,
88+
BigInt(totalSize),
89+
'dag-cbor',
90+
{
91+
maxLinkPerNode,
92+
maxNodeSize: chunkSize + NODE_METADATA_SIZE,
93+
},
94+
),
95+
)
96+
97+
jest.spyOn(dsnFetcher, 'fetchNode').mockImplementation(async (cid) => {
98+
return decodeNode(await blockstore.get(stringToCid(cid)))
99+
})
100+
101+
let index = 0,
102+
received = 0
103+
while (received < chunks.length) {
104+
const chunk = await dsnFetcher.getPartial(cid, index)
105+
index++
106+
if (chunk && chunk?.length > 0) {
107+
expect(chunk.toString('hex')).toBe(chunks[received].toString('hex'))
108+
received++
109+
}
110+
}
111+
112+
const finalResult = await dsnFetcher.getPartial(cid, index)
113+
expect(finalResult).toBeNull()
114+
})
115+
})

services/file-retriever/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
"type": "module",
55
"dependencies": {
66
"@auto-files/rpc-apis": "workspace:*",
7-
"@autonomys/asynchronous": "^1.4.31",
7+
"@autonomys/asynchronous": "^1.4.36",
88
"@autonomys/auto-dag-data": "^1.4.31",
99
"@autonomys/auto-utils": "^1.4.31",
10-
"@autonomys/file-caching": "^1.4.31",
10+
"@autonomys/file-caching": "^1.4.36",
1111
"@autonomys/rpc": "^1.4.31",
1212
"@ipld/dag-pb": "^4.1.3",
1313
"@keyvhq/core": "^2.1.1",

services/file-retriever/src/http/controllers/file.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import { pipeline } from 'stream'
55
import { logger } from '../../drivers/logger.js'
66
import { asyncSafeHandler } from '../../utils/express.js'
77
import { uniqueHeaderValue } from '../../utils/http.js'
8+
import { HttpError } from '../middlewares/error.js'
9+
import { dsnFetcher } from '../../services/dsnFetcher.js'
10+
import { safeIPLDDecode } from '../../utils/dagData.js'
811

912
const fileRouter = Router()
1013

@@ -61,4 +64,53 @@ fileRouter.get(
6164
}),
6265
)
6366

67+
fileRouter.head(
68+
'/:cid',
69+
authMiddleware,
70+
asyncSafeHandler(async (req, res) => {
71+
const cid = req.params.cid
72+
73+
const file = await dsnFetcher.fetchNode(cid, [])
74+
if (file) {
75+
const metadata = safeIPLDDecode(file)
76+
const size = metadata?.size
77+
const isCompressed = metadata?.uploadOptions?.compression
78+
const isEncrypted = metadata?.uploadOptions?.encryption
79+
if (size) {
80+
res.set('Content-Length', size.toString())
81+
res.set('Content-Type', 'application/octet-stream')
82+
}
83+
if (isCompressed && !isEncrypted) {
84+
res.set('Content-Encoding', 'deflate')
85+
} else if (isEncrypted) {
86+
res.set('Content-Encoding', 'aes-256-gcm')
87+
}
88+
res.sendStatus(204)
89+
} else {
90+
res.sendStatus(404)
91+
}
92+
}),
93+
)
94+
95+
fileRouter.get(
96+
'/:cid/partial',
97+
authMiddleware,
98+
asyncSafeHandler(async (req, res) => {
99+
const cid = req.params.cid
100+
const chunk = parseInt(req.query.chunk as string)
101+
if (isNaN(chunk)) {
102+
throw new HttpError(400, 'Invalid chunk')
103+
}
104+
105+
const fileData = await dsnFetcher.getPartial(cid, chunk)
106+
if (fileData) {
107+
res.set('Content-Type', 'application/octet-stream')
108+
res.set('Content-Length', fileData.length.toString())
109+
res.send(fileData)
110+
} else {
111+
res.sendStatus(204)
112+
}
113+
}),
114+
)
115+
64116
export { fileRouter }

services/file-retriever/src/http/controllers/node.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ nodeRouter.get(
99
'/:cid',
1010
asyncSafeHandler(async (req, res) => {
1111
const cid = req.params.cid
12-
const node = await dsnFetcher.fetchNode(cid)
12+
const node = await dsnFetcher.fetchNode(cid, [])
1313

1414
res.json(node)
1515
}),
@@ -19,7 +19,7 @@ nodeRouter.get(
1919
'/:cid/ipld',
2020
asyncSafeHandler(async (req, res) => {
2121
const cid = req.params.cid
22-
const node = await dsnFetcher.fetchNode(cid)
22+
const node = await dsnFetcher.fetchNode(cid, [])
2323

2424
const ipldNode = safeIPLDDecode(node)
2525

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import {
2+
createFileCache,
3+
defaultMemoryAndSqliteConfig,
4+
ensureDirectoryExists,
5+
} from '@autonomys/file-caching'
6+
import path from 'path'
7+
import { config } from '../config.js'
8+
9+
export const cache = createFileCache(
10+
defaultMemoryAndSqliteConfig({
11+
dirname: ensureDirectoryExists(path.join(config.cacheDir, 'files')),
12+
cacheMaxSize: config.cacheMaxSize,
13+
cacheTtl: config.cacheTtl,
14+
}),
15+
)

services/file-retriever/src/services/dsnFetcher.ts

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@ import { logger } from '../drivers/logger.js'
1818
import axios from 'axios'
1919
import { objectMappingIndexer } from './objectMappingIndexer.js'
2020
import { fromEntries, promiseAll } from '../utils/array.js'
21-
import { weightedRequestConcurrencyController } from '@autonomys/asynchronous'
21+
import {
22+
streamToBuffer,
23+
weightedRequestConcurrencyController,
24+
} from '@autonomys/asynchronous'
2225
import { optimizeBatchFetch } from './batchOptimizer.js'
2326
import { ObjectMapping } from '@auto-files/models'
2427
import { withRetries } from '../utils/retries.js'
2528
import { Readable } from 'stream'
2629
import { ReadableStream } from 'stream/web'
30+
import { cache } from './cache.js'
2731

2832
const fetchNodesSchema = z.object({
2933
jsonrpc: z.string(),
@@ -272,16 +276,75 @@ const fetchFile = async (cid: string): Promise<FileResponse> => {
272276
}
273277
}
274278

275-
const fetchNode = async (cid: string) => {
276-
const [objectMapping] = await objectMappingIndexer.get_object_mappings({
277-
hashes: [getObjectMappingHash(cid)],
279+
const fetchNode = async (cid: string, siblings: string[]): Promise<PBNode> => {
280+
const isCached: boolean = await cache.has(cid)
281+
if (isCached) {
282+
const buffer = await cache.get(cid).then((e) => streamToBuffer(e!.data))
283+
return decodeNode(buffer)
284+
}
285+
286+
const nodeObjectMappingHash = getObjectMappingHash(cid)
287+
const hashes = siblings.map(getObjectMappingHash)
288+
const objectMappings = await objectMappingIndexer.get_object_mappings({
289+
hashes,
278290
})
279-
const head = await fetchObjects([objectMapping]).then((nodes) => nodes[0])
291+
const optimizedBatches = optimizeBatchFetch(objectMappings)
292+
const optimizedBatch = optimizedBatches.find((e) =>
293+
e.some((e) => e[0] === nodeObjectMappingHash),
294+
)
295+
if (!optimizedBatch) {
296+
throw new HttpError(
297+
500,
298+
'Internal server error: Optimizing batch did not include target node',
299+
)
300+
}
301+
302+
const head = await fetchObjects(optimizedBatch).then((nodes) => nodes[0])
280303
return head
281304
}
282305

306+
const getPartial = async (
307+
cid: string,
308+
chunk: number,
309+
): Promise<Buffer | null> => {
310+
let index = 0
311+
async function dfs(
312+
cid: string,
313+
siblings: string[] = [],
314+
): Promise<PBNode | undefined> {
315+
const node = await dsnFetcher.fetchNode(cid, siblings)
316+
if (index === chunk) {
317+
return node
318+
}
319+
index++
320+
for (const sibling of node.Links) {
321+
const result = await dfs(
322+
cidToString(sibling.Hash),
323+
node.Links.map((e) => cidToString(e.Hash)),
324+
)
325+
if (result) {
326+
return result
327+
}
328+
}
329+
}
330+
331+
const node = await dfs(cid)
332+
333+
// if the node is not found, the chunk is not present
334+
// and therefore the file has finished being downloaded
335+
if (!node) {
336+
return null
337+
}
338+
const ipldMetadata = safeIPLDDecode(node)
339+
if (!ipldMetadata) {
340+
throw new HttpError(400, 'Bad request: Not a valid auto-dag-data IPLD node')
341+
}
342+
return Buffer.from(ipldMetadata.data ?? [])
343+
}
344+
283345
export const dsnFetcher = {
284346
fetchFile,
285347
fetchNode,
286348
fetchObjects,
349+
getPartial,
287350
}

services/file-retriever/src/services/fileComposer.ts

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,8 @@
11
import { dsnFetcher } from './dsnFetcher.js'
22
import { forkStream } from '@autonomys/asynchronous'
33
import { logger } from '../drivers/logger.js'
4-
import {
5-
createFileCache,
6-
defaultMemoryAndSqliteConfig,
7-
ensureDirectoryExists,
8-
FileResponse,
9-
} from '@autonomys/file-caching'
10-
import path from 'path'
11-
import { config } from '../config.js'
12-
13-
const cache = createFileCache(
14-
defaultMemoryAndSqliteConfig({
15-
dirname: ensureDirectoryExists(path.join(config.cacheDir, 'files')),
16-
cacheMaxSize: config.cacheMaxSize,
17-
cacheTtl: config.cacheTtl,
18-
}),
19-
)
4+
import { FileResponse } from '@autonomys/file-caching'
5+
import { cache } from './cache.js'
206

217
const get = async (
228
cid: string,

0 commit comments

Comments
 (0)