Skip to content

Commit

Permalink
convert RPC Client response dataStream to IsomorphicNodeReadable stream
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Sep 28, 2023
1 parent 47af280 commit 54cd6e1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 28 deletions.
3 changes: 2 additions & 1 deletion packages/agent/src/rpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { DwnRpc, DwnRpcRequest, DwnRpcResponse } from './types/agent.js';
import { randomUuid } from '@web5/crypto/utils';

import { createJsonRpcRequest, parseJson } from './json-rpc.js';
import { webReadableToIsomorphicNodeReadable } from './utils.js';

/**
* Client used to communicate with Dwn Servers
Expand Down Expand Up @@ -88,7 +89,7 @@ class HttpDwnRpcClient implements DwnRpc {
throw new Error(`failed to parse json rpc response. dwn url: ${request.dwnUrl}`);
}

dataStream = resp.body;
dataStream = resp.body !== null ? webReadableToIsomorphicNodeReadable(resp.body) : resp.body;
dwnRpcResponse = jsonRpcResponse;
} else {
// TODO: wonder if i need to try/catch this?
Expand Down
46 changes: 19 additions & 27 deletions packages/agent/tests/dwn-manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ describe('DwnManager', () => {

it('handles RecordsWrite and RecordRead with large payload', async () => {
// Create test data to write.
const dataBytes = Array(10_000).fill('d').join('');
const dataBytes = Convert.string(Array(60_000).fill('d').join('')).toUint8Array();

// Attempt to process the RecordsWrite
let writeResponse = await testAgent.agent.dwnManager.processRequest({
Expand Down Expand Up @@ -439,18 +439,17 @@ describe('DwnManager', () => {


const readResponse = await testAgent.agent.dwnManager.processRequest({
author : identity.did,
target : identity.did,
messageType: 'RecordsRead',
messageOptions: { recordId: writeMessage.recordId }
author : identity.did,
target : identity.did,
messageType : 'RecordsRead',
messageOptions : { recordId: writeMessage.recordId }
});
expect(readResponse.reply.status.code).to.equal(200);
const reply = readResponse.reply as RecordsReadReply;
expect(reply.record).to.not.be.undefined;
expect(reply.record!.data).to.not.be.undefined;
const value = await DataStream.toBytes(reply.record!.data);
const data = new TextDecoder().decode(value);
expect(data).to.eq(dataBytes);
const data = await DataStream.toBytes(reply.record!.data);
expect(data).to.eql(dataBytes);

});
});
Expand Down Expand Up @@ -591,19 +590,15 @@ describe('DwnManager', () => {
const readReply = response.reply as RecordsReadReply;
expect(readReply.record).to.exist;

const record = readReply.record as unknown as RecordsWriteMessage & { data: ReadableStream };
expect(record.recordId).to.equal(message.recordId);

expect(record.data).to.exist;
expect(record.data instanceof ReadableStream).to.be.true;

const { value } = await record.data.getReader().read();
expect(dataBytes).to.eql(value);
expect(readReply.record!.recordId).to.equal(message.recordId);
expect(readReply.record!.data).to.exist;
const data = await DataStream.toBytes(readReply.record!.data);
expect(data).to.eql(dataBytes);
});

it('handles RecordsWrite and RecordRead with large payload', async () => {
// Create test data to write.
const dataBytes = Array(10_000).fill('d').join('');
const largeDataBytes = Convert.string(Array(60_000).fill('d').join('')).toUint8Array();

// Attempt to process the RecordsWrite
let writeResponse = await testAgent.agent.dwnManager.sendRequest({
Expand All @@ -613,7 +608,7 @@ describe('DwnManager', () => {
messageOptions : {
dataFormat: 'text/plain'
},
dataStream: new Blob([dataBytes])
dataStream: new Blob([ largeDataBytes ])
});

// Verify the response.
Expand All @@ -632,20 +627,17 @@ describe('DwnManager', () => {


const readResponse = await testAgent.agent.dwnManager.sendRequest({
author : identity.did,
target : identity.did,
messageType: 'RecordsRead',
messageOptions: { recordId: writeMessage.recordId }
author : identity.did,
target : identity.did,
messageType : 'RecordsRead',
messageOptions : { recordId: writeMessage.recordId }
});
expect(readResponse.reply.status.code).to.equal(200);
const reply = readResponse.reply as RecordsReadReply;
expect(reply.record).to.not.be.undefined;
expect(reply.record!.data).to.not.be.undefined;
const record = reply.record as unknown as RecordsWriteMessage & { data: ReadableStream };
const { value } = await record.data.getReader().read();
const data = new TextDecoder().decode(value);
expect(data).to.eq(dataBytes);

const data = await DataStream.toBytes(reply.record!.data);
expect(data).to.eql(largeDataBytes);
});

it('throws an error when DwnRequest fails validation', async () => {
Expand Down

0 comments on commit 54cd6e1

Please sign in to comment.