Skip to content

Commit

Permalink
clean up sync interface, add new tests, remove chaos script
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Sep 19, 2023
1 parent 9a76b46 commit c645760
Show file tree
Hide file tree
Showing 4 changed files with 379 additions and 538 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tests-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
run: npm run test:node --ws -- --color
env:
TEST_DWN_URLS: http://localhost:3000,http://localhost:3001
MIN_SYNC_INTERVAL: 100

- name: Upload test coverage to Codecov
uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4
Expand Down Expand Up @@ -119,3 +120,4 @@ jobs:
run: npm run test:browser --ws -- --color
env:
TEST_DWN_URLS: http://localhost:3000,http://localhost:3001
MIN_SYNC_INTERVAL: 100
132 changes: 70 additions & 62 deletions packages/agent/src/sync-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { BatchOperation } from 'level';
import type {
Event,
EventsGetReply,
GenericMessage,
MessagesGetReply,
Expand All @@ -16,10 +17,29 @@ import type { Web5ManagedAgent } from './types/agent.js';

import { webReadableToIsomorphicNodeReadable } from './utils.js';

const checkNumber = (n?: string) => isNaN(parseInt(n || '')) ? undefined : parseInt(n || '');
// arbitrary number for now, but we should enforce some sane minimum
// allow for environment to set a minimum
const MIN_SYNC_INTERVAL = checkNumber(process?.env.MIN_SYNC_INTERVAL) ?? 5000;

type SyncDirection = 'pull' | 'push';

interface SyncOptions {
interval?: number
direction?: SyncDirection
}

export interface SyncManager {
agent: Web5ManagedAgent;
registerIdentity(options: { did: string }): Promise<void>;
startSync(options: { interval: number }): Promise<void>;

// sync will run the sync operation once.
// if a direction is passed, it will only sync in that direction.
sync(direction?: SyncDirection): Promise<void>;

// startSync will run sync on an interval
// if a direction is provided, it will only sync in that direction.
startSync(options?: SyncOptions): Promise<void>;
stopSync(): void;
}

Expand Down Expand Up @@ -91,7 +111,7 @@ export class SyncManagerLevel implements SyncManager {
await this._db.clear();
}

public async pull(): Promise<void> {
private async pull(): Promise<void> {
const pullQueue = this.getPullQueue();
const pullJobs = await pullQueue.iterator().all();

Expand Down Expand Up @@ -219,7 +239,7 @@ export class SyncManagerLevel implements SyncManager {
await pullQueue.batch(deleteOperations as any);
}

public async push(): Promise<void> {
private async push(): Promise<void> {
const pushQueue = this.getPushQueue();
const pushJobs = await pushQueue.iterator().all();

Expand Down Expand Up @@ -285,37 +305,21 @@ export class SyncManagerLevel implements SyncManager {
await registeredIdentities.put(did, '');
}

public startSync(options: {
interval: number
}): Promise<void> {
const { interval } = options;

// interval 0 means start instantly and don't repeat.
if (interval === 0) {
return new Promise( async (resolve,reject) => {
try {
await this.sync();
resolve();

} catch(error) {
reject(error);
}
})
}

public startSync(options: SyncOptions = {}): Promise<void> {
const { interval = MIN_SYNC_INTERVAL, direction } = options;
return new Promise((resolve, reject) => {
if (this._syncIntervalId) {
clearInterval(this._syncIntervalId);
}

this._syncIntervalId = setInterval(async () => {
try {
await this.sync();
await this.sync(direction);
} catch (error) {
this.stopSync();
reject(error);
}
}, interval);
}, interval >= MIN_SYNC_INTERVAL ? interval : MIN_SYNC_INTERVAL);
});
}

Expand All @@ -326,10 +330,13 @@ export class SyncManagerLevel implements SyncManager {
}
}

private async sync(): Promise<void> {
await this.enqueueOperations();
await this.push();
await this.pull();
public async sync(direction?: SyncDirection): Promise<void> {
await this.enqueueOperations(direction);
// enqueue operations handles the direction logic.
// we can just run both operations and only enqueued events will sync.
await Promise.all([
this.push(), this.pull()
]);
}

private createOperationKey(did: string, dwnUrl: string, watermark: string, messageCid: string): string {
Expand All @@ -338,47 +345,47 @@ export class SyncManagerLevel implements SyncManager {

private dbBatchOperationPut(did: string, dwnUrl: string, watermark: string, messageCid: string): DbBatchOperation {
const key = this.createOperationKey(did, dwnUrl, watermark, messageCid);
return { type: 'put', key, value: '' }
return { type: 'put', key, value: '' };
}

async enqueueOperations(direction?: 'pull' | 'push') {
/**
* Enqueues the operations needed for sync based on the supplied direction.
*
* @param direction the optional direction in which you would like to enqueue sync events for.
* If no direction is supplied it will sync in both directions.
*/
async enqueueOperations(direction?: SyncDirection) {
const syncPeerState = await this.getSyncPeerState();
for (let syncState of syncPeerState) {
const localEvents = await this.getLocalDwnEvents({
did: syncState.did,
watermark: syncState.pushWatermark,
});
const remoteEvents = await this.getRemoteEvents({
did: syncState.did,
dwnUrl: syncState.dwnUrl,
watermark: syncState.pullWatermark,
});

const pullOperations: DbBatchOperation[] = [];
if (direction === undefined || direction === 'pull') {
remoteEvents.forEach(remoteEvent => {
if (localEvents.findIndex(localEvent => localEvent.messageCid === remoteEvent.messageCid) < 0) {
const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, remoteEvent.watermark, remoteEvent.messageCid);
pullOperations.push(operation);
}
for (let syncState of syncPeerState) {
const batchPromises = [];
if (direction === undefined || direction === 'push') {
const localEventsPromise = this.getLocalDwnEvents({
did : syncState.did,
watermark : syncState.pushWatermark,
});
batchPromises.push(this.batchOperations('push', localEventsPromise, syncState));
}

const pushOperations: DbBatchOperation[] = [];
if (direction === undefined || direction === 'push') {
localEvents.forEach(localEvent => {
if(remoteEvents.findIndex(remoteEvent => remoteEvent.messageCid === localEvent.messageCid) < 0) {
const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, localEvent.watermark, localEvent.messageCid);
pushOperations.push(operation);
}
if(direction === undefined || direction === 'pull') {
const remoteEventsPromise = this.getRemoteEvents({
did : syncState.did,
dwnUrl : syncState.dwnUrl,
watermark : syncState.pullWatermark,
});
batchPromises.push(this.batchOperations('pull', remoteEventsPromise, syncState));
}

await this.getPullQueue().batch(pullOperations as any);
await this.getPushQueue().batch(pushOperations as any);
await Promise.all(batchPromises);
}
}

private async batchOperations(direction: SyncDirection, eventsPromise: Promise<Event[]>, syncState: SyncState): Promise<void> {
const { did, dwnUrl } = syncState;
const operations: DbBatchOperation[] = [];
(await eventsPromise).forEach(e => operations.push(this.dbBatchOperationPut(did, dwnUrl, e.watermark, e.messageCid)));
return direction === 'pull' ? this.getPullQueue().batch(operations as any) : this.getPushQueue().batch(operations as any);
}

private async getLocalDwnEvents(options:{ did: string, watermark?: string }) {
const { did, watermark } = options;
let eventsReply = {} as EventsGetReply;
Expand All @@ -394,6 +401,7 @@ export class SyncManagerLevel implements SyncManager {

private async getRemoteEvents(options: { did: string, dwnUrl: string, watermark?: string }) {
const { did, dwnUrl, watermark } = options;

let eventsReply = {} as EventsGetReply;

const eventsGetMessage = await this.agent.dwnManager.createMessage({
Expand All @@ -419,7 +427,7 @@ export class SyncManagerLevel implements SyncManager {
author: string,
messageCid: string
): Promise<DwnMessage | undefined> {
let messagesGetResponse = await this.agent.dwnManager.processRequest({
const messagesGetResponse = await this.agent.dwnManager.processRequest({
author : author,
target : author,
messageType : 'MessagesGet',
Expand Down Expand Up @@ -540,16 +548,16 @@ export class SyncManagerLevel implements SyncManager {
const wm = await watermarkStore.get(wmKey);
const split = wm.split('~');
if (split.length !== 2) {
return {}
return {};
}

let pull;
let push;
if (split[0] !== '0') {
pull = split[0]
pull = split[0];
}
if (split[1] !== '0') {
push = split[1]
push = split[1];
}

return { pull, push };
Expand All @@ -567,11 +575,11 @@ export class SyncManagerLevel implements SyncManager {
const watermarkStore = this.getWatermarkStore();

if (pullWatermark === undefined) {
pullWatermark = '0'
pullWatermark = '0';
}

if (pushWatermark === undefined) {
pushWatermark = '0'
pushWatermark = '0';
}

await watermarkStore.put(wmKey, `${pullWatermark}~${pushWatermark}`);
Expand Down
Loading

0 comments on commit c645760

Please sign in to comment.