Skip to content

Commit

Permalink
added subscription documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
andorsk committed Sep 28, 2023
1 parent 140523e commit 7052d47
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 47 deletions.
45 changes: 45 additions & 0 deletions packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,51 @@ The `delete` request object is composed as follows:
- **`message`** - _`object`_: The properties of the DWeb Node Message Descriptor that will be used to construct a valid DWeb Node message.
- **`recordId`** - _`string`_: the required record ID string that identifies the record being deleted.

### **`web5.dwn.subscription.create(request)`**

Method for subscribing to an event stream of a local or remote note.
Note: for remote servers, only a `WebSocket` connection is available.

You MUST request a permission using the `Permissions` interface with a `SubscriptionRequest`
before getting access to a subscription.

The `subscription` request object is composed as follows:

- **`target`** - _`DID string`_ (_optional_): The target DWN you want to sync with.
- **`filter`** - The filtered scope that you wish to get. It MUST be scoped to a permission that has been scoped in the initial Permission scope.
- **`callback`** - The callback function you wish to apply on the incoming event.

```javascript
const subscription = await web5.dwn.subscription.create(
target: "did:example:12345",
filter: {
"request": {
"filter": {
"type": "record",
"recordFilters": {
"protocolPath": "/my/protocol/path"
}
}
}
},
callback: (e: EventMessage) => {
console.log("I got a message!");
}
})
```

Note, the `subscription` object also returns an event stream and a socket connection.
You may use that directly if that is your preference.

#### **Request**

The `delete` request object is composed as follows:

- **`from`** - _`DID string`_ (_optional_): The DID of the DWeb Node the delete tombstone will be sent to.
- **`message`** - _`object`_: The properties of the DWeb Node Message Descriptor that will be used to construct a valid DWeb Node message.
- **`recordId`** - _`string`_: the required record ID string that identifies the record being deleted.


### **`web5.dwn.protocols.configure(request)`**

Method for configuring a protocol definition in the DWeb Node of the user's local DWeb Node, remote DWeb Nodes, or another party's DWeb Nodes (if permitted).
Expand Down
182 changes: 135 additions & 47 deletions packages/api/src/dwn-api.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { DwnInterfaceName, DwnMethodName, EventStreamI } from '@tbd54566975/dwn-sdk-js';
import type { DwnResponse, Web5Agent } from '@web5/agent';
import type {
UnionMessageReply,
RecordsReadOptions,
EventMessage,
ProtocolsConfigureDescriptor,
ProtocolsConfigureMessage,
ProtocolsConfigureOptions,
ProtocolsQueryOptions,
RecordsDeleteOptions,
RecordsQueryOptions,
RecordsQueryReplyEntry,
RecordsReadOptions,
RecordsWriteMessage,
RecordsWriteOptions,
RecordsDeleteOptions,
ProtocolsQueryOptions,
RecordsQueryReplyEntry,
ProtocolsConfigureMessage,
ProtocolsConfigureOptions,
ProtocolsConfigureDescriptor,
SubscriptionRequestOptions,
SubscriptionRequestReply,
UnionMessageReply,
} from '@tbd54566975/dwn-sdk-js';

import { isEmptyObject } from '@web5/common';
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';

import { Record } from './record.js';
import { Protocol } from './protocol.js';
import { Record } from './record.js';
import { dataToBlob } from './utils.js';
import { isEmptyObject } from '@web5/common';

export type ProtocolsConfigureRequest = {
message: Omit<ProtocolsConfigureOptions, 'authorizationSignatureInput'>;
Expand Down Expand Up @@ -96,6 +98,20 @@ export type RecordsWriteResponse = {
record?: Record
};

export type SubscriptionRequestMessage = {
message: Omit<SubscriptionRequestOptions, 'authorizationSignatureInput'>;
}

export type SubscriptionRequestResponse = {
status: UnionMessageReply['status'];
protocol?: Record;
}

export type Subscription = {
stream: EventStreamI;
socket?: WebSocket;
}

/**
* TODO: Document class.
*/
Expand All @@ -108,6 +124,78 @@ export class DwnApi {
this.connectedDid = options.connectedDid;
}

get subscription() {
return {
/**
* Creates a subscription. Note: the appropriate Permissions over SubscriptionRequestPermission
* MUST be set beforehand for authorization to work.
* @param {string} target - The target for the subscription.
* @param {SubscriptionRequestMessage} request - The subscription request message.
* @param {(e: EventMessage) => Promise<void>} callback - The callback function to handle events.
* @returns {Promise<SubscriptionRequestReply>} A promise containing the subscription request reply.
*
* Example:
* {
* "target": "did:example:12345",
* "request": {
* "filter": {
* "type": "record",
* "recordFilters": {
* "protocolPath": "/my/protocol/path"
* }
* }
* }
* }
* Callback will run over the returned event type.
* Alternatively, you may request the actual pipe
*/
create: async (target, request, callback) : Promise<Subscription> => {
if (this.connectedDid === target) {
// Form a request object
const agentResponse = await this.agent.processDwnRequest({
target: this.connectedDid,
author: this.connectedDid,
messageOptions: request.message,
messageType: DwnInterfaceName.Subscriptions + DwnMethodName.Request
});

const { message, messageCid, reply: { status } } = agentResponse;
const response = { status };

if (status.code < 300) {
const metadata = { author: this.connectedDid, messageCid };
// response.subscription = new Subscription(this.agent, message as SubscriptionRequestMessage, metadata);
}
response.subscription?.emitter.on((event) => {
callback(event);
});
return response;
} else {
// Step 1: Get address via DID document (To be fixed: resolve DID document)
const addr = "127.0.0.1:9002";

// Step 2: Create WebSocket
const socket = new WebSocket(addr);

// Setup socket
socket.onmessage = (data) => {
// Parse message
const event = JSON.parse(data) as EventMessage;
// Run callback
callback(event);
};

socket.onopen = () => {
// Step 3: Send RPC request to endpoint
const request = JSON.stringify(dwnRequest)
socket.send(request);
};

}
}
};
}

/**
* TODO: Document namespace.
*/
Expand All @@ -118,13 +206,13 @@ export class DwnApi {
*/
configure: async (request: ProtocolsConfigureRequest): Promise<ProtocolsConfigureResponse> => {
const agentResponse = await this.agent.processDwnRequest({
target : this.connectedDid,
author : this.connectedDid,
messageOptions : request.message,
messageType : DwnInterfaceName.Protocols + DwnMethodName.Configure
target: this.connectedDid,
author: this.connectedDid,
messageOptions: request.message,
messageType: DwnInterfaceName.Protocols + DwnMethodName.Configure
});

const { message, messageCid, reply: { status }} = agentResponse;
const { message, messageCid, reply: { status } } = agentResponse;
const response: ProtocolsConfigureResponse = { status };

if (status.code < 300) {
Expand All @@ -140,10 +228,10 @@ export class DwnApi {
*/
query: async (request: ProtocolsQueryRequest): Promise<ProtocolsQueryResponse> => {
const agentRequest = {
author : this.connectedDid,
messageOptions : request.message,
messageType : DwnInterfaceName.Protocols + DwnMethodName.Query,
target : request.from || this.connectedDid
author: this.connectedDid,
messageOptions: request.message,
messageType: DwnInterfaceName.Protocols + DwnMethodName.Query,
target: request.from || this.connectedDid
};

let agentResponse: DwnResponse;
Expand Down Expand Up @@ -209,8 +297,8 @@ export class DwnApi {
}

return this.records.write({
data : request.data,
message : {
data: request.data,
message: {
...inheritedProperties,
...request.message,
},
Expand All @@ -222,10 +310,10 @@ export class DwnApi {
*/
delete: async (request: RecordsDeleteRequest): Promise<RecordsDeleteResponse> => {
const agentRequest = {
author : this.connectedDid,
messageOptions : request.message,
messageType : DwnInterfaceName.Records + DwnMethodName.Delete,
target : request.from || this.connectedDid
author: this.connectedDid,
messageOptions: request.message,
messageType: DwnInterfaceName.Records + DwnMethodName.Delete,
target: request.from || this.connectedDid
};

let agentResponse;
Expand Down Expand Up @@ -254,10 +342,10 @@ export class DwnApi {
*/
query: async (request: RecordsQueryRequest): Promise<RecordsQueryResponse> => {
const agentRequest = {
author : this.connectedDid,
messageOptions : request.message,
messageType : DwnInterfaceName.Records + DwnMethodName.Query,
target : request.from || this.connectedDid
author: this.connectedDid,
messageOptions: request.message,
messageType: DwnInterfaceName.Records + DwnMethodName.Query,
target: request.from || this.connectedDid
};

let agentResponse;
Expand All @@ -272,8 +360,8 @@ export class DwnApi {

const records = entries.map((entry: RecordsQueryReplyEntry) => {
const recordOptions = {
author : this.connectedDid,
target : this.connectedDid,
author: this.connectedDid,
target: this.connectedDid,
...entry as RecordsWriteMessage
};
const record = new Record(this.agent, recordOptions);
Expand All @@ -288,10 +376,10 @@ export class DwnApi {
*/
read: async (request: RecordsReadRequest): Promise<RecordsReadResponse> => {
const agentRequest = {
author : this.connectedDid,
messageOptions : request.message,
messageType : DwnInterfaceName.Records + DwnMethodName.Read,
target : request.from || this.connectedDid
author: this.connectedDid,
messageOptions: request.message,
messageType: DwnInterfaceName.Records + DwnMethodName.Read,
target: request.from || this.connectedDid
};

let agentResponse;
Expand All @@ -316,8 +404,8 @@ export class DwnApi {
let record: Record;
if (200 <= status.code && status.code <= 299) {
const recordOptions = {
author : this.connectedDid,
target : this.connectedDid,
author: this.connectedDid,
target: this.connectedDid,
...responseRecord,
};

Expand Down Expand Up @@ -347,12 +435,12 @@ export class DwnApi {
messageOptions.dataFormat = dataFormat;

const agentResponse = await this.agent.processDwnRequest({
author : this.connectedDid,
dataStream : dataBlob,
author: this.connectedDid,
dataStream: dataBlob,
messageOptions,
messageType : DwnInterfaceName.Records + DwnMethodName.Write,
store : request.store,
target : this.connectedDid
messageType: DwnInterfaceName.Records + DwnMethodName.Write,
store: request.store,
target: this.connectedDid
});

const { message, reply: { status } } = agentResponse;
Expand All @@ -361,9 +449,9 @@ export class DwnApi {
let record: Record;
if (200 <= status.code && status.code <= 299) {
const recordOptions = {
author : this.connectedDid,
encodedData : dataBlob,
target : this.connectedDid,
author: this.connectedDid,
encodedData: dataBlob,
target: this.connectedDid,
...responseMessage,
};

Expand Down
40 changes: 40 additions & 0 deletions packages/api/src/subscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { ProtocolsConfigure, SubscriptionRequest } from '@tbd54566975/dwn-sdk-js';

import type { Web5Agent } from '@web5/agent';

export type SubscriptionRequestMessage = SubscriptionRequest['message'];
type SubscriptionMetadata = {
author: string;
messageCid?: string;
};

export class Subscription {
private _agent: Web5Agent;
private _metadata: SubscriptionMetadata;
private _subscriptionRequestMessage: SubscriptionRequestMessage;

get definition() {
return this._subscriptionRequestMessage.descriptor.definition;
}

constructor(agent: Web5Agent, subscriptionRequestMessage: SubscriptionRequestMessage, metadata: SubscriptionMetadata) {
this._agent = agent;
this._metadata = metadata;
this._subscriptionRequestMessage = subscriptionRequestMessage;
}

toJSON() {
return this._subscriptionRequestMessage;
}

async send(target: string) {
const { reply } = await this._agent.sendDwnRequest({
author : this._metadata.author,
messageCid : this._metadata.messageCid,
messageType : 'subscriptionRequest',
target : target,
});

return { status: reply.status };
}
}

0 comments on commit 7052d47

Please sign in to comment.