Skip to content

Commit

Permalink
feat!: api for incremental uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
Gozala committed Apr 5, 2023
1 parent 05e2590 commit 6dd6b3c
Show file tree
Hide file tree
Showing 4 changed files with 3,028 additions and 2,646 deletions.
17 changes: 16 additions & 1 deletion packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export async function uploadCAR(conf, car, options = {}) {
* @param {import('./types').UploadOptions} [options]
* @returns {Promise<import('./types').AnyLink>}
*/
async function uploadBlockStream(conf, blocks, options = {}) {
export async function uploadBlockStream(conf, blocks, options = {}) {
/** @type {import('./types').CARLink[]} */
const shards = []
/** @type {import('./types').AnyLink?} */
Expand All @@ -131,3 +131,18 @@ async function uploadBlockStream(conf, blocks, options = {}) {
await Upload.add(conf, root, shards, options)
return root
}

/**
* @param {import('./types').InvocationConfig} conf
* @param {(writer: ReturnType<UnixFS.createDirectoryWriter>) => Promise<void>} task
* @param {import('./types').UploadOptions} [options]
*/
export const uploadWith = async (conf, task, options = {}) => {
const channel = UnixFS.createUploadChannel()
const writer = UnixFS.createDirectoryWriter(channel)
const result = uploadBlockStream(conf, channel.readable, options)
await task(writer)
await writer.close()
await channel.writer.close()
return result
}
126 changes: 126 additions & 0 deletions packages/upload-client/src/unixfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,129 @@ async function collect(collectable) {
)
return chunks
}

/**
* @typedef {{
* readable: ReadableStream<import('@ipld/unixfs').Block>
* writable: WritableStream<import('@ipld/unixfs').Block>
* writer: import('@ipld/unixfs').View
* }} UploadChannel
*/

/**
* Create a new upload channel that can be used to write UnixFS files and
* directories.
*
* @param {QueuingStrategy} [strategy]
* @returns {UploadChannel}
*/
export const createUploadChannel = (strategy = queuingStrategy) => {
const { readable, writable } = new TransformStream({}, strategy)
const writer = UnixFS.createWriter({ writable, settings })
return { readable, writable, writer }
}

/**
* @param {object} options
* @param {import('@ipld/unixfs').View} options.writer
*/
export const createDirectoryWriter = (options) => new DirectoryWriter(options)

class FileWriter {
/**
* @param {object} options
* @param {import('@ipld/unixfs').View} options.writer
*/
constructor({ writer }) {
this.writer = UnixFS.createFileWriter(writer)
}
/**
* @param {Uint8Array} chunk
*/
write(chunk) {
return this.writer.write(chunk)
}
close() {
if (this.result) {
return this.result
} else {
return (this.result = this.writer.close())
}
}
}

class DirectoryWriter {
/**
* @param {object} options
* @param {import('@ipld/unixfs').View} options.writer
*/
constructor({ writer }) {
this.writer = writer
/** @type {Map<string, DirectoryWriter|FileWriter>} */
this.entries = new Map()
}

/**
* @param {string} path
*/
createDirectory(path) {
/** @type {DirectoryWriter} */
let directory = this
const at = []
for (const name of path.split('/')) {
if (name !== '' && name !== '.') {
at.push(name)
let writer = directory.entries.get(name)
if (writer == null) {
writer = new DirectoryWriter(this)
directory.entries.set(name, writer)
}

if (!(writer instanceof DirectoryWriter)) {
throw new Error(
`Can not create directory at ${at.join(
'/'
)}, because there is a file with the same name`
)
}

directory = writer
}
}
return directory
}

/**
* @param {string} path
*/
createFile(path) {
const parts = path.split('/')
const name = /** @type {string} */ (parts.pop())
let directory = this.createDirectory(parts.join('/'))

if (directory.entries.has(name)) {
throw new Error(
`Can not create a file at "${path}" because there is already a file or directory with the same name"`
)
}

const writer = new FileWriter(this)
directory.entries.set(name, writer)
return writer
}

async close() {
const writer =
this.entries.size <= SHARD_THRESHOLD
? UnixFS.createDirectoryWriter(this.writer)
: UnixFS.createShardedDirectoryWriter(this.writer)

const promises = [...this.entries].map(async ([name, entry]) => {
const link = await entry.close()
writer.set(name, link)
})

await Promise.all(promises)
return await writer.close()
}
}
120 changes: 119 additions & 1 deletion packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import * as CBOR from '@ucanto/transport/cbor'
import * as Signer from '@ucanto/principal/ed25519'
import * as StoreCapabilities from '@web3-storage/capabilities/store'
import * as UploadCapabilities from '@web3-storage/capabilities/upload'
import { uploadFile, uploadDirectory, uploadCAR } from '../src/index.js'
import {
uploadFile,
uploadDirectory,
uploadCAR,
uploadWith,
} from '../src/index.js'
import { serviceSigner } from './fixtures.js'
import { randomBlock, randomBytes } from './helpers/random.js'
import { toCAR } from './helpers/car.js'
Expand Down Expand Up @@ -458,3 +463,116 @@ describe('uploadCAR', () => {
assert.equal(carCIDs.length, 2)
})
})

describe('incremental uploader', () => {
it('incremental upload', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const files = [
new File([await randomBytes(128)], '1.txt'),
new File([await randomBytes(32)], '2.txt'),
]

/** @type {import('../src/types').CARLink?} */
let carCID = null

const proofs = await Promise.all([
StoreCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
UploadCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
])

/** @type {Omit<import('../src/types.js').StoreAddUploadRequiredResponse, 'link'>} */
const res = {
status: 'upload',
headers: { 'x-test': 'true' },
url: 'http://localhost:9200',
with: space.did(),
}

const service = mockService({
store: {
add: provide(StoreCapabilities.add, ({ capability, invocation }) => {
assert.equal(invocation.issuer.did(), agent.did())
assert.equal(invocation.capabilities.length, 1)
const invCap = invocation.capabilities[0]
assert.equal(invCap.can, StoreCapabilities.add.can)
assert.equal(invCap.with, space.did())
return {
...res,
link: /** @type {import('../src/types').CARLink} */ (
capability.nb.link
),
}
}),
},
upload: {
add: provide(UploadCapabilities.add, ({ invocation }) => {
assert.equal(invocation.issuer.did(), agent.did())
assert.equal(invocation.capabilities.length, 1)
const invCap = invocation.capabilities[0]
assert.equal(invCap.can, UploadCapabilities.add.can)
assert.equal(invCap.with, space.did())
assert.equal(invCap.nb?.shards?.length, 1)
assert.equal(String(invCap.nb?.shards?.[0]), carCID?.toString())
if (!invCap.nb) throw new Error('nb must be present')
return invCap.nb
}),
},
})

const server = Server.create({
id: serviceSigner,
service,
decoder: CAR,
encoder: CBOR,
})
const connection = Client.connect({
id: serviceSigner,
encoder: CAR,
decoder: CBOR,
channel: server,
})

const dataCID = await uploadWith(
{
issuer: agent,
with: space.did(),
proofs,
audience: serviceSigner,
},
async (writer) => {
console.log('writing')
const onetxt = writer.createFile(files[0].name)
onetxt.write(new Uint8Array(await files[0].arrayBuffer()))
await onetxt.close()

const twotxt = writer.createFile(files[1].name)
await twotxt.write(new Uint8Array(await files[1].arrayBuffer()))
},
{
connection,
onShardStored: (meta) => {
carCID = meta.cid
},
}
)

assert(service.store.add.called)
assert.equal(service.store.add.callCount, 1)
assert(service.upload.add.called)
assert.equal(service.upload.add.callCount, 1)

assert(carCID)
assert(dataCID)
})
})
Loading

0 comments on commit 6dd6b3c

Please sign in to comment.