diff --git a/.github/workflows/tests-ci.yml b/.github/workflows/tests-ci.yml index 1eee38166..583ded74b 100644 --- a/.github/workflows/tests-ci.yml +++ b/.github/workflows/tests-ci.yml @@ -63,6 +63,7 @@ jobs: run: npm run test:node --ws -- --color env: TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 + MIN_SYNC_INTERVAL: 100 - name: Upload test coverage to Codecov uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 @@ -119,3 +120,4 @@ jobs: run: npm run test:browser --ws -- --color env: TEST_DWN_URLS: http://localhost:3000,http://localhost:3001 + MIN_SYNC_INTERVAL: 100 diff --git a/packages/agent/src/sync-manager.ts b/packages/agent/src/sync-manager.ts index e8e55d270..28c23f16d 100644 --- a/packages/agent/src/sync-manager.ts +++ b/packages/agent/src/sync-manager.ts @@ -1,5 +1,6 @@ import type { BatchOperation } from 'level'; import type { + Event, EventsGetReply, GenericMessage, MessagesGetReply, @@ -16,10 +17,29 @@ import type { Web5ManagedAgent } from './types/agent.js'; import { webReadableToIsomorphicNodeReadable } from './utils.js'; +const checkNumber = (n?: string) => isNaN(parseInt(n || '')) ? undefined : parseInt(n || ''); +// arbitrary number for now, but we should enforce some sane minimum +// allow for environment to set a minimum +const MIN_SYNC_INTERVAL = checkNumber(process?.env.MIN_SYNC_INTERVAL) ?? 5000; + +type SyncDirection = 'pull' | 'push'; + +interface SyncOptions { + interval?: number + direction?: SyncDirection +} + export interface SyncManager { agent: Web5ManagedAgent; registerIdentity(options: { did: string }): Promise; - startSync(options: { interval: number }): Promise; + + // sync will run the sync operation once. + // if a direction is passed, it will only sync in that direction. + sync(direction?: SyncDirection): Promise; + + // startSync will run sync on an interval + // if a direction is provided, it will only sync in that direction. + startSync(options?: SyncOptions): Promise; stopSync(): void; } @@ -91,7 +111,7 @@ export class SyncManagerLevel implements SyncManager { await this._db.clear(); } - public async pull(): Promise { + private async pull(): Promise { const pullQueue = this.getPullQueue(); const pullJobs = await pullQueue.iterator().all(); @@ -219,7 +239,7 @@ export class SyncManagerLevel implements SyncManager { await pullQueue.batch(deleteOperations as any); } - public async push(): Promise { + private async push(): Promise { const pushQueue = this.getPushQueue(); const pushJobs = await pushQueue.iterator().all(); @@ -285,24 +305,8 @@ export class SyncManagerLevel implements SyncManager { await registeredIdentities.put(did, ''); } - public startSync(options: { - interval: number - }): Promise { - const { interval } = options; - - // interval 0 means start instantly and don't repeat. - if (interval === 0) { - return new Promise( async (resolve,reject) => { - try { - await this.sync(); - resolve(); - - } catch(error) { - reject(error); - } - }) - } - + public startSync(options: SyncOptions = {}): Promise { + const { interval = MIN_SYNC_INTERVAL, direction } = options; return new Promise((resolve, reject) => { if (this._syncIntervalId) { clearInterval(this._syncIntervalId); @@ -310,12 +314,12 @@ export class SyncManagerLevel implements SyncManager { this._syncIntervalId = setInterval(async () => { try { - await this.sync(); + await this.sync(direction); } catch (error) { this.stopSync(); reject(error); } - }, interval); + }, interval >= MIN_SYNC_INTERVAL ? interval : MIN_SYNC_INTERVAL); }); } @@ -326,10 +330,13 @@ export class SyncManagerLevel implements SyncManager { } } - private async sync(): Promise { - await this.enqueueOperations(); - await this.push(); - await this.pull(); + public async sync(direction?: SyncDirection): Promise { + await this.enqueueOperations(direction); + // enqueue operations handles the direction logic. + // we can just run both operations and only enqueued events will sync. + await Promise.all([ + this.push(), this.pull() + ]); } private createOperationKey(did: string, dwnUrl: string, watermark: string, messageCid: string): string { @@ -338,47 +345,47 @@ export class SyncManagerLevel implements SyncManager { private dbBatchOperationPut(did: string, dwnUrl: string, watermark: string, messageCid: string): DbBatchOperation { const key = this.createOperationKey(did, dwnUrl, watermark, messageCid); - return { type: 'put', key, value: '' } + return { type: 'put', key, value: '' }; } - async enqueueOperations(direction?: 'pull' | 'push') { + /** + * Enqueues the operations needed for sync based on the supplied direction. + * + * @param direction the optional direction in which you would like to enqueue sync events for. + * If no direction is supplied it will sync in both directions. + */ + async enqueueOperations(direction?: SyncDirection) { const syncPeerState = await this.getSyncPeerState(); - for (let syncState of syncPeerState) { - const localEvents = await this.getLocalDwnEvents({ - did: syncState.did, - watermark: syncState.pushWatermark, - }); - const remoteEvents = await this.getRemoteEvents({ - did: syncState.did, - dwnUrl: syncState.dwnUrl, - watermark: syncState.pullWatermark, - }); - const pullOperations: DbBatchOperation[] = []; - if (direction === undefined || direction === 'pull') { - remoteEvents.forEach(remoteEvent => { - if (localEvents.findIndex(localEvent => localEvent.messageCid === remoteEvent.messageCid) < 0) { - const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, remoteEvent.watermark, remoteEvent.messageCid); - pullOperations.push(operation); - } + for (let syncState of syncPeerState) { + const batchPromises = []; + if (direction === undefined || direction === 'push') { + const localEventsPromise = this.getLocalDwnEvents({ + did : syncState.did, + watermark : syncState.pushWatermark, }); + batchPromises.push(this.batchOperations('push', localEventsPromise, syncState)); } - const pushOperations: DbBatchOperation[] = []; - if (direction === undefined || direction === 'push') { - localEvents.forEach(localEvent => { - if(remoteEvents.findIndex(remoteEvent => remoteEvent.messageCid === localEvent.messageCid) < 0) { - const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, localEvent.watermark, localEvent.messageCid); - pushOperations.push(operation); - } + if(direction === undefined || direction === 'pull') { + const remoteEventsPromise = this.getRemoteEvents({ + did : syncState.did, + dwnUrl : syncState.dwnUrl, + watermark : syncState.pullWatermark, }); + batchPromises.push(this.batchOperations('pull', remoteEventsPromise, syncState)); } - - await this.getPullQueue().batch(pullOperations as any); - await this.getPushQueue().batch(pushOperations as any); + await Promise.all(batchPromises); } } + private async batchOperations(direction: SyncDirection, eventsPromise: Promise, syncState: SyncState): Promise { + const { did, dwnUrl } = syncState; + const operations: DbBatchOperation[] = []; + (await eventsPromise).forEach(e => operations.push(this.dbBatchOperationPut(did, dwnUrl, e.watermark, e.messageCid))); + return direction === 'pull' ? this.getPullQueue().batch(operations as any) : this.getPushQueue().batch(operations as any); + } + private async getLocalDwnEvents(options:{ did: string, watermark?: string }) { const { did, watermark } = options; let eventsReply = {} as EventsGetReply; @@ -394,6 +401,7 @@ export class SyncManagerLevel implements SyncManager { private async getRemoteEvents(options: { did: string, dwnUrl: string, watermark?: string }) { const { did, dwnUrl, watermark } = options; + let eventsReply = {} as EventsGetReply; const eventsGetMessage = await this.agent.dwnManager.createMessage({ @@ -419,7 +427,7 @@ export class SyncManagerLevel implements SyncManager { author: string, messageCid: string ): Promise { - let messagesGetResponse = await this.agent.dwnManager.processRequest({ + const messagesGetResponse = await this.agent.dwnManager.processRequest({ author : author, target : author, messageType : 'MessagesGet', @@ -540,16 +548,16 @@ export class SyncManagerLevel implements SyncManager { const wm = await watermarkStore.get(wmKey); const split = wm.split('~'); if (split.length !== 2) { - return {} + return {}; } let pull; let push; if (split[0] !== '0') { - pull = split[0] + pull = split[0]; } if (split[1] !== '0') { - push = split[1] + push = split[1]; } return { pull, push }; @@ -567,11 +575,11 @@ export class SyncManagerLevel implements SyncManager { const watermarkStore = this.getWatermarkStore(); if (pullWatermark === undefined) { - pullWatermark = '0' + pullWatermark = '0'; } if (pushWatermark === undefined) { - pushWatermark = '0' + pushWatermark = '0'; } await watermarkStore.put(wmKey, `${pullWatermark}~${pushWatermark}`); diff --git a/packages/agent/tests/chaos/chaos-monkey.spec.ts b/packages/agent/tests/chaos/chaos-monkey.spec.ts deleted file mode 100644 index 664095e9a..000000000 --- a/packages/agent/tests/chaos/chaos-monkey.spec.ts +++ /dev/null @@ -1,218 +0,0 @@ -import type { PortableDid } from '@web5/dids'; - -import { expect } from 'chai'; -import * as sinon from 'sinon'; - -import type { ManagedIdentity } from '../../src/identity-manager.js' - -import { testDwnUrls } from '../test-config.js' -import { TestAgent } from '../utils/test-agent.js'; -import { TestManagedAgent } from '../../src/test-managed-agent.js'; - -import { randomUuid } from '@web5/crypto/utils'; -import { DwnRequest, DwnResponse, ProcessDwnRequest, SendDwnRequest } from '../../src/index.js'; -import { DataStream, RecordsDeleteMessage, RecordsWrite, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; -import _Readable from 'readable-stream'; - -const checkChaos = (): boolean => { - return process.env.CHAOS_ENV === 'true' -} - -describe('Chaos Monkey', () => { - describe('Sync Manager', function () { - this.timeout(120_000); - const records:DwnResponse[] = []; - - const lean: string|undefined = process.env.SYNC_LEAN === 'pull' ? 'pull' : - process.env.SYNC_LEAN === 'push' ? 'push' : undefined; - - const DEFAULT_SYNC_ROUNDS = 10; - const DEFAULT_BATCH_ROUNDS = 10; - const DEFAULT_BATCH_COUNT = 5; - const rounds: number = !isNaN(parseInt(process.env.SYNC_ROUNDS || 'not-a-number')) ? parseInt(process.env.SYNC_ROUNDS!) : DEFAULT_SYNC_ROUNDS; - const batchRounds: number = !isNaN(parseInt(process.env.BATCH_ROUNDS || 'not-a-number')) ? parseInt(process.env.BATCH_ROUNDS!) : DEFAULT_BATCH_ROUNDS; - const batchCount: number = !isNaN(parseInt(process.env.BATCH_COUNT || 'not-a-number')) ? parseInt(process.env.BATCH_COUNT!) : DEFAULT_BATCH_COUNT; - - let alice: ManagedIdentity; - let bob: ManagedIdentity; - let carol: ManagedIdentity; - let dave: ManagedIdentity; - - let aliceDid: PortableDid; - let bobDid: PortableDid; - let carolDid: PortableDid; - let daveDid: PortableDid; - let testAgent: TestManagedAgent; - - - const testWriteMessage = (did:string, id: string): ProcessDwnRequest => { - return { - author : did, - target : did, - messageType : 'RecordsWrite', - messageOptions : { - schema : 'schema', - dataFormat : 'application/json' - }, - dataStream: new Blob([ `Hello, ${id}`]) - }; - } - - const testQueryMessage = (did:string): ProcessDwnRequest => { - return { - author : did, - target : did, - messageType : 'RecordsQuery', - messageOptions: { filter: { schema: 'schema', dataFormat: 'application/json' } }, - }; - } - - const testReadMessage = (did:string, recordId: string): SendDwnRequest => { - return { - author : did, - target : did, - messageType : 'RecordsRead', - messageOptions : { recordId } - }; - } - - before(async () => { - testAgent = await TestManagedAgent.create({ - agentClass : TestAgent, - agentStores : 'dwn' - }); - }); - - beforeEach(async () => { - records.splice(0, records.length); - await testAgent.clearStorage(); - await testAgent.createAgentDid(); - // Create a new Identity to author the DWN messages. - ({ did: aliceDid } = await testAgent.createIdentity({ testDwnUrls })); - alice = await testAgent.agent.identityManager.import({ - did : aliceDid, - identity : { name: 'Alice', did: aliceDid.did }, - kms : 'local' - }); - ({ did: bobDid } = await testAgent.createIdentity({ testDwnUrls })); - bob = await testAgent.agent.identityManager.import({ - did : bobDid, - identity : { name: 'Bob', did: bobDid.did }, - kms : 'local' - }); - ({ did: carolDid } = await testAgent.createIdentity({ testDwnUrls })); - carol = await testAgent.agent.identityManager.import({ - did : carolDid, - identity : { name: 'Carol', did: carolDid.did }, - kms : 'local' - }); - ({ did: daveDid} = await testAgent.createIdentity({ testDwnUrls })); - dave = await testAgent.agent.identityManager.import({ - did : daveDid, - identity : { name: 'Dave', did: daveDid.did }, - kms : 'local' - }); - - const { dwnManager } = testAgent.agent; - const startLoadMessages = Date.now(); - - const process = async (message: ProcessDwnRequest, random: number): Promise => { - - let randomMod = 2; - if (lean !== undefined) { - // create an uneven distribution - randomMod = 3; - } - - // throw in a record that both get every 11th record. - if (random % 11 === 0) return processBoth(message); - - const left = (message: ProcessDwnRequest) => { - return lean === undefined || lean === 'pull' ? dwnManager.processRequest(message as ProcessDwnRequest): dwnManager.sendRequest(message as SendDwnRequest); - } - - const right = (message: ProcessDwnRequest) => { - return lean === undefined || lean === 'pull' ? dwnManager.sendRequest(message as SendDwnRequest) : dwnManager.processRequest(message as ProcessDwnRequest); - } - - return random % randomMod === 0 ? left(message) : right(message); - }; - - - const processBoth = async (message: ProcessDwnRequest) => { - const localResponse = await dwnManager.processRequest({...message} as ProcessDwnRequest); - // copy the message, todo use createFrom?? - message = { - ...message, - messageOptions: { - ...message.messageOptions || {}, - ...(localResponse.message as RecordsDeleteMessage).descriptor - } - } - const remoteResponse = await dwnManager.sendRequest({...message} as SendDwnRequest) - expect(localResponse.messageCid).to.equal(remoteResponse.messageCid, `invalid remote and local messages`); - return remoteResponse; - } - - const randomMessage = () => { - const random = getRandomInt(0, 1234567890); - const message = testWriteMessage(alice.did, randomUuid()); - return process(message, random); - } - - const batch = (count: number) => Array(count).fill({}).map(randomMessage) - - for (const _ of Array(batchRounds).fill({})) { - records.push(...(await Promise.all(batch(batchCount)))) - } - - const endLoadMessages = Date.now(); - console.log(`loaded ${records.length} messages in ${endLoadMessages - startLoadMessages}ms`); - expect(records.every(r => r.reply.status.code === 202), `could not load messages successfully`).to.be.true; - }); - - afterEach(async () => { - await testAgent.clearStorage(); - }); - - after(async () => { - await testAgent.clearStorage(); - await testAgent.closeStorage(); - }); - - describe(`startSync() ${rounds} runs`, () => { - if (checkChaos()) { - for ( const _ of Array(rounds).fill({})) { - it('sync a lot of records', async () => { - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // get remote and local before sync; - const testQuery = testQueryMessage(alice.did); - let { reply } = await testAgent.agent.dwnManager.processRequest(testQuery); - let { reply: replyRemote } = await testAgent.agent.dwnManager.sendRequest(testQuery); - - const startSync = Date.now(); - await testAgent.agent.syncManager.startSync({ interval: 0 }); - const endSync = Date.now(); - - const remoteEntries = (replyRemote.entries || []).filter(e => (reply.entries || []).findIndex(le => (le as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0); - const localEntries = (reply.entries || []).filter(e => (replyRemote.entries || []).findIndex(re => (re as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0) - const commonItemsLength = (reply.entries!.length + replyRemote.entries!.length) - records.length; - - console.log(`sync time:\t\t${endSync-startSync} for ${records.length} records\nlocal records:\t\t${reply.entries!.length}/${localEntries.length} unique\nremote records:\t\t${replyRemote.entries!.length}/${remoteEntries.length} unique\ncommon records:\t\t${commonItemsLength}\n\n`) - expect(endSync-startSync).to.be.lt(60_000); - ({ reply } = await testAgent.agent.dwnManager.processRequest(testQuery)); - expect(reply.status.code).to.equal(200); - expect(reply.entries!.length).to.equal(records.length); - }).timeout(100_000); - } - } - }); - }); -}); - -function getRandomInt(min: number, max: number) { - return Math.floor(Math.random() * (Math.ceil(max - min)) + Math.ceil(min)); -} \ No newline at end of file diff --git a/packages/agent/tests/sync-manager.spec.ts b/packages/agent/tests/sync-manager.spec.ts index 7edd9cd86..59c9dc680 100644 --- a/packages/agent/tests/sync-manager.spec.ts +++ b/packages/agent/tests/sync-manager.spec.ts @@ -11,8 +11,8 @@ import { TestAgent, sleep } from './utils/test-agent.js'; import { SyncManagerLevel } from '../src/sync-manager.js'; import { TestManagedAgent } from '../src/test-managed-agent.js'; -import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { ProcessDwnRequest } from '../src/index.js'; +import { RecordsQueryReply, RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; describe('SyncManagerLevel', () => { describe('get agent', () => { @@ -74,12 +74,12 @@ describe('SyncManagerLevel', () => { await testAgent.closeStorage(); }); - describe('startSync()', () => { + describe('sync()', () => { it('takes no action if no identities are registered', async () => { const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); const sendDwnRequestSpy = sinon.spy(testAgent.agent.rpcClient, 'sendDwnRequest'); - await testAgent.agent.syncManager.startSync({ interval: 0 }); + await testAgent.agent.syncManager.sync(); // Verify DID resolution and DWN requests did not occur. expect(didResolveSpy.notCalled).to.be.true; @@ -89,259 +89,6 @@ describe('SyncManagerLevel', () => { sendDwnRequestSpy.restore(); }); - it('synchronizes records for 1 identity from remove DWN to local DWN', async () => { - // Write a test record to Alice's remote DWN. - let writeResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, world!']) - }); - - // Get the record ID of the test record. - const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; - - // Confirm the record does NOT exist on Alice's local DWN. - let queryResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on local DWN. - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the record now DOES exist on Alice's local DWN. - queryResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. - }); - - - it('synchronizes records for multiple identities from remote DWN to local DWN', async () => { - // Create a second Identity to author the DWN messages. - const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); - const bob = await testAgent.agent.identityManager.import({ - did : bobDid, - identity : { name: 'Bob', did: bobDid.did }, - kms : 'local' - }); - - // Write a test record to Alice's remote DWN. - let writeResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Bob!']) - }); - - // Get the record ID of Alice's test record. - const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; - - // Write a test record to Bob's remote DWN. - writeResponse = await testAgent.agent.dwnManager.sendRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Alice!']) - }); - - // Get the record ID of Bob's test record. - const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Register Bob's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: bob.did - }); - - // Execute Sync to pull all records from Alice's and Bob's remove DWNs to their local DWNs. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the Alice test record exist on Alice's local DWN. - let queryResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdAlice } } - }); - let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. - - // Confirm the Bob test record exist on Bob's local DWN. - queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdBob } } - }); - localDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. - }).timeout(5000); - - it('takes no action if no identities are registered', async () => { - const didResolveSpy = sinon.spy(testAgent.agent.didResolver, 'resolve'); - const processRequestSpy = sinon.spy(testAgent.agent.dwnManager, 'processRequest'); - - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Verify DID resolution and DWN requests did not occur. - expect(didResolveSpy.notCalled).to.be.true; - expect(processRequestSpy.notCalled).to.be.true; - - didResolveSpy.restore(); - processRequestSpy.restore(); - }); - - it('synchronizes records for 1 identity from local DWN to remote DWN', async () => { - // Write a record that we can use for this test. - let writeResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, world!']) - }); - - // Get the record ID of the test record. - const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; - - // Confirm the record does NOT exist on Alice's remote DWN. - let queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on remote DWN. - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Execute Sync to push all records from Alice's local DWN to Alice's remote DWN. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the record now DOES exist on Alice's remote DWN. - queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordId } } - }); - remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. - }); - - it('synchronizes records for multiple identities from local DWN to remote DWN', async () => { - // Create a second Identity to author the DWN messages. - const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); - const bob = await testAgent.agent.identityManager.import({ - did : bobDid, - identity : { name: 'Bob', did: bobDid.did }, - kms : 'local' - }); - - // Write a test record to Alice's local DWN. - let writeResponse = await testAgent.agent.dwnManager.processRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Bob!']) - }); - - // Get the record ID of Alice's test record. - const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; - - // Write a test record to Bob's local DWN. - writeResponse = await testAgent.agent.dwnManager.processRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsWrite', - messageOptions : { - dataFormat: 'text/plain' - }, - dataStream: new Blob(['Hello, Alice!']) - }); - - // Get the record ID of Bob's test record. - const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; - - // Register Alice's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: alice.did - }); - - // Register Bob's DID to be synchronized. - await testAgent.agent.syncManager.registerIdentity({ - did: bob.did - }); - - // Execute Sync to push all records from Alice's and Bob's local DWNs to their remote DWNs. - await testAgent.agent.syncManager.startSync({ interval: 0 }); - - // Confirm the Alice test record exist on Alice's remote DWN. - let queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : alice.did, - target : alice.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdAlice } } - }); - let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. - - // Confirm the Bob test record exist on Bob's remote DWN. - queryResponse = await testAgent.agent.dwnManager.sendRequest({ - author : bob.did, - target : bob.did, - messageType : 'RecordsQuery', - messageOptions : { filter: { recordId: testRecordIdBob } } - }); - remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; - expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. - expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. - }).timeout(5000); - it('synchronizes data in both directions for a single identity', async () => { await testAgent.agent.syncManager.registerIdentity({ @@ -390,7 +137,7 @@ describe('SyncManagerLevel', () => { expect(remoteReply.entries?.length).to.equal(remoteRecords.size); expect(remoteReply.entries?.every(e => remoteRecords.has((e as RecordsWriteMessage).recordId))).to.be.true; - await testAgent.agent.syncManager.startSync({ interval: 0 }); + await testAgent.agent.syncManager.sync(); const records = new Set([...remoteRecords, ...localRecords]); const { reply: allRemoteReply } = await testAgent.agent.dwnManager.sendRequest(everythingQuery()); @@ -403,7 +150,309 @@ describe('SyncManagerLevel', () => { expect(allLocalReply.entries?.length).to.equal(records.size); expect(allLocalReply.entries?.every(e => records.has((e as RecordsWriteMessage).recordId))).to.be.true; - }).timeout(5000); + }).timeout(10_000); + + // tests must be run with a low MIN_SYNC_INTERVAL + it('check sync interval input', async () => { + const syncSpy = sinon.spy(testAgent.agent.syncManager, 'sync'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync({ interval: 300 }); + await sleep(1000); + expect(syncSpy.callCount).to.equal(3); + syncSpy.restore(); + }); + + // test must be run with MIN_SYNC_INTERVAL=100 + it('check sync default value passed', async () => { + const setIntervalSpy = sinon.spy(global, 'setInterval'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + testAgent.agent.syncManager.startSync({ }); + await sleep(500); + expect(setIntervalSpy.calledOnce).to.be.true; + expect(setIntervalSpy.getCall(0).args.at(1)).to.equal(100); + setIntervalSpy.restore(); + }); + + describe('batchOperations()', () => { + it('should only call once per remote DWN if pull direction is passed', async () => { + const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + await testAgent.agent.syncManager.sync('pull'); + expect(batchOperationsSpy.callCount).to.equal(testDwnUrls.length, 'pull direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(1, `args must include pull ${batchOperationsSpy.args[0]}`); + batchOperationsSpy.restore(); + }); + + it('should only call once if push direction is passed', async () => { + const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + await testAgent.agent.syncManager.sync('push'); + expect(batchOperationsSpy.callCount).to.equal(1, 'push direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(1, `args must include push ${batchOperationsSpy.args[0]}`); + batchOperationsSpy.restore(); + }); + + it('should be called twice if no direction is passed', async () => { + const batchOperationsSpy = sinon.spy(testAgent.agent.syncManager as any, 'batchOperations'); + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + await testAgent.agent.syncManager.sync(); + expect(batchOperationsSpy.callCount).to.equal(2, 'no direction is passed'); + expect(batchOperationsSpy.args.filter(arg => arg.includes('pull')).length).to.equal(1, `args must include one pull ${batchOperationsSpy.args}`); + expect(batchOperationsSpy.args.filter(arg => arg.includes('push')).length).to.equal(1, `args must include one push ${batchOperationsSpy.args}`); + batchOperationsSpy.restore(); + }); + }); + describe('pull', () => { + it('synchronizes records for 1 identity from remote DWN to local DWN', async () => { + // Write a test record to Alice's remote DWN. + let writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, world!']) + }); + + // Get the record ID of the test record. + const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; + + // Confirm the record does NOT exist on Alice's local DWN. + let queryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on local DWN. + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Execute Sync to pull all records from Alice's remote DWN to Alice's local DWN. + await testAgent.agent.syncManager.sync('pull'); + + // Confirm the record now DOES exist on Alice's local DWN. + queryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. + }); + + it('synchronizes records for multiple identities from remote DWN to local DWN', async () => { + // Create a second Identity to author the DWN messages. + const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); + const bob = await testAgent.agent.identityManager.import({ + did : bobDid, + identity : { name: 'Bob', did: bobDid.did }, + kms : 'local' + }); + + // Write a test record to Alice's remote DWN. + let writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Bob!']) + }); + + // Get the record ID of Alice's test record. + const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; + + // Write a test record to Bob's remote DWN. + writeResponse = await testAgent.agent.dwnManager.sendRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Alice!']) + }); + + // Get the record ID of Bob's test record. + const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Register Bob's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: bob.did + }); + + // Execute Sync to pull all records from Alice's and Bob's remote DWNs to their local DWNs. + await testAgent.agent.syncManager.sync('pull'); + + // Confirm the Alice test record exist on Alice's local DWN. + let queryResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdAlice } } + }); + let localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. + + // Confirm the Bob test record exist on Bob's local DWN. + queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdBob } } + }); + localDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(localDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(localDwnQueryReply.entries).to.have.length(1); // Record does exist on local DWN. + }).timeout(5000); + }); + + describe('push', () => { + it('synchronizes records for 1 identity from local DWN to remote DWN', async () => { + // Write a record that we can use for this test. + let writeResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, world!']) + }); + + // Get the record ID of the test record. + const testRecordId = (writeResponse.message as RecordsWriteMessage).recordId; + + // Confirm the record does NOT exist on Alice's remote DWN. + let queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(0); // Record doesn't exist on remote DWN. + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Execute Sync to push all records from Alice's local DWN to Alice's remote DWN. + await testAgent.agent.syncManager.sync('push'); + + // Confirm the record now DOES exist on Alice's remote DWN. + queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordId } } + }); + remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. + }); + + it('synchronizes records for multiple identities from local DWN to remote DWN', async () => { + // Create a second Identity to author the DWN messages. + const { did: bobDid } = await testAgent.createIdentity({ testDwnUrls }); + const bob = await testAgent.agent.identityManager.import({ + did : bobDid, + identity : { name: 'Bob', did: bobDid.did }, + kms : 'local' + }); + + // Write a test record to Alice's local DWN. + let writeResponse = await testAgent.agent.dwnManager.processRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Bob!']) + }); + + // Get the record ID of Alice's test record. + const testRecordIdAlice = (writeResponse.message as RecordsWriteMessage).recordId; + + // Write a test record to Bob's local DWN. + writeResponse = await testAgent.agent.dwnManager.processRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsWrite', + messageOptions : { + dataFormat: 'text/plain' + }, + dataStream: new Blob(['Hello, Alice!']) + }); + + // Get the record ID of Bob's test record. + const testRecordIdBob = (writeResponse.message as RecordsWriteMessage).recordId; + + // Register Alice's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: alice.did + }); + + // Register Bob's DID to be synchronized. + await testAgent.agent.syncManager.registerIdentity({ + did: bob.did + }); + + // Execute Sync to push all records from Alice's and Bob's local DWNs to their remote DWNs. + await testAgent.agent.syncManager.sync('push'); + + // Confirm the Alice test record exist on Alice's remote DWN. + let queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : alice.did, + target : alice.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdAlice } } + }); + let remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. + + // Confirm the Bob test record exist on Bob's remote DWN. + queryResponse = await testAgent.agent.dwnManager.sendRequest({ + author : bob.did, + target : bob.did, + messageType : 'RecordsQuery', + messageOptions : { filter: { recordId: testRecordIdBob } } + }); + remoteDwnQueryReply = queryResponse.reply as RecordsQueryReply; + expect(remoteDwnQueryReply.status.code).to.equal(200); // Query was successfully executed. + expect(remoteDwnQueryReply.entries).to.have.length(1); // Record does exist on remote DWN. + }).timeout(5000); + }); }); }); }); \ No newline at end of file