Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: api for incremental uploads #742

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,18 @@ async function uploadBlockStream(conf, blocks, options = {}) {
await Upload.add(conf, root, shards, options)
return root
}

/**
* @param {import('./types').InvocationConfig} conf
* @param {(writer: ReturnType<UnixFS.createDirectoryWriter>) => Promise<void>} task
* @param {import('./types').UploadOptions} [options]
*/
export const uploadWith = async (conf, task, options = {}) => {
const channel = UnixFS.createUploadChannel()
const writer = UnixFS.createDirectoryWriter(channel)
const result = uploadBlockStream(conf, channel.readable, options)
await task(writer)
await writer.close()
await channel.writer.close()
return result
}
126 changes: 126 additions & 0 deletions packages/upload-client/src/unixfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,129 @@ async function collect(collectable) {
)
return chunks
}

/**
* @typedef {{
* readable: ReadableStream<import('@ipld/unixfs').Block>
* writable: WritableStream<import('@ipld/unixfs').Block>
* writer: import('@ipld/unixfs').View
* }} UploadChannel
*/

/**
* Create a new upload channel that can be used to write UnixFS files and
* directories.
*
* @param {QueuingStrategy} [strategy]
* @returns {UploadChannel}
*/
export const createUploadChannel = (strategy = queuingStrategy) => {
const { readable, writable } = new TransformStream({}, strategy)
const writer = UnixFS.createWriter({ writable, settings })
return { readable, writable, writer }
}

/**
* @param {object} options
* @param {import('@ipld/unixfs').View} options.writer
*/
export const createDirectoryWriter = (options) => new DirectoryWriter(options)

class FileWriter {
/**
* @param {object} options
* @param {import('@ipld/unixfs').View} options.writer
*/
constructor({ writer }) {
this.writer = UnixFS.createFileWriter(writer)
}
/**
* @param {Uint8Array} chunk
*/
write(chunk) {
return this.writer.write(chunk)
}
close() {
if (this.result) {
return this.result
} else {
return (this.result = this.writer.close())
}
}
}

class DirectoryWriter {
/**
* @param {object} options
* @param {import('@ipld/unixfs').View} options.writer
*/
constructor({ writer }) {
this.writer = writer
/** @type {Map<string, DirectoryWriter|FileWriter>} */
this.entries = new Map()
}

/**
* @param {string} path
*/
createDirectory(path) {
/** @type {DirectoryWriter} */
let directory = this
const at = []
for (const name of path.split('/')) {
if (name !== '' && name !== '.') {
at.push(name)
let writer = directory.entries.get(name)
if (writer == null) {
writer = new DirectoryWriter(this)
directory.entries.set(name, writer)
}

if (!(writer instanceof DirectoryWriter)) {
throw new Error(
`Can not create directory at ${at.join(
'/'
)}, because there is a file with the same name`
)
}

directory = writer
}
}
return directory
}

/**
* @param {string} path
*/
createFile(path) {
const parts = path.split('/')
const name = /** @type {string} */ (parts.pop())
let directory = this.createDirectory(parts.join('/'))

if (directory.entries.has(name)) {
throw new Error(
`Can not create a file at "${path}" because there is already a file or directory with the same name"`
)
}

const writer = new FileWriter(this)
directory.entries.set(name, writer)
return writer
}

async close() {
const writer =
this.entries.size <= SHARD_THRESHOLD
? UnixFS.createDirectoryWriter(this.writer)
: UnixFS.createShardedDirectoryWriter(this.writer)

const promises = [...this.entries].map(async ([name, entry]) => {
const link = await entry.close()
writer.set(name, link)
})

await Promise.all(promises)
return await writer.close()
}
}
120 changes: 119 additions & 1 deletion packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import * as CBOR from '@ucanto/transport/cbor'
import * as Signer from '@ucanto/principal/ed25519'
import * as StoreCapabilities from '@web3-storage/capabilities/store'
import * as UploadCapabilities from '@web3-storage/capabilities/upload'
import { uploadFile, uploadDirectory, uploadCAR } from '../src/index.js'
import {
uploadFile,
uploadDirectory,
uploadCAR,
uploadWith,
} from '../src/index.js'
import { serviceSigner } from './fixtures.js'
import { randomBlock, randomBytes } from './helpers/random.js'
import { toCAR } from './helpers/car.js'
Expand Down Expand Up @@ -458,3 +463,116 @@ describe('uploadCAR', () => {
assert.equal(carCIDs.length, 2)
})
})

describe('incremental uploader', () => {
it('incremental upload', async () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const files = [
new File([await randomBytes(128)], '1.txt'),
new File([await randomBytes(32)], '2.txt'),
]

/** @type {import('../src/types').CARLink?} */
let carCID = null

const proofs = await Promise.all([
StoreCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
UploadCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
])

/** @type {Omit<import('../src/types.js').StoreAddUploadRequiredResponse, 'link'>} */
const res = {
status: 'upload',
headers: { 'x-test': 'true' },
url: 'http://localhost:9200',
with: space.did(),
}

const service = mockService({
store: {
add: provide(StoreCapabilities.add, ({ capability, invocation }) => {
assert.equal(invocation.issuer.did(), agent.did())
assert.equal(invocation.capabilities.length, 1)
const invCap = invocation.capabilities[0]
assert.equal(invCap.can, StoreCapabilities.add.can)
assert.equal(invCap.with, space.did())
return {
...res,
link: /** @type {import('../src/types').CARLink} */ (
capability.nb.link
),
}
}),
},
upload: {
add: provide(UploadCapabilities.add, ({ invocation }) => {
assert.equal(invocation.issuer.did(), agent.did())
assert.equal(invocation.capabilities.length, 1)
const invCap = invocation.capabilities[0]
assert.equal(invCap.can, UploadCapabilities.add.can)
assert.equal(invCap.with, space.did())
assert.equal(invCap.nb?.shards?.length, 1)
assert.equal(String(invCap.nb?.shards?.[0]), carCID?.toString())
if (!invCap.nb) throw new Error('nb must be present')
return invCap.nb
}),
},
})

const server = Server.create({
id: serviceSigner,
service,
decoder: CAR,
encoder: CBOR,
})
const connection = Client.connect({
id: serviceSigner,
encoder: CAR,
decoder: CBOR,
channel: server,
})

const dataCID = await uploadWith(
{
issuer: agent,
with: space.did(),
proofs,
audience: serviceSigner,
},
async (writer) => {
console.log('writing')
const onetxt = writer.createFile(files[0].name)
onetxt.write(new Uint8Array(await files[0].arrayBuffer()))
await onetxt.close()

const twotxt = writer.createFile(files[1].name)
await twotxt.write(new Uint8Array(await files[1].arrayBuffer()))
},
{
connection,
onShardStored: (meta) => {
carCID = meta.cid
},
}
)

assert(service.store.add.called)
assert.equal(service.store.add.callCount, 1)
assert(service.upload.add.called)
assert.equal(service.upload.add.callCount, 1)

assert(carCID)
assert(dataCID)
})
})
Loading