Skip to content

Commit

Permalink
bring parts of sync up outside of push/pull
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Sep 19, 2023
1 parent f96b106 commit 9a76b46
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 109 deletions.
217 changes: 122 additions & 95 deletions packages/agent/src/sync-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ export interface SyncManager {
registerIdentity(options: { did: string }): Promise<void>;
startSync(options: { interval: number }): Promise<void>;
stopSync(): void;
push(): Promise<void>;
pull(): Promise<void>;
}

export type SyncManagerOptions = {
Expand All @@ -31,12 +29,12 @@ export type SyncManagerOptions = {
db?: Level;
};

type SyncDirection = 'push' | 'pull';

type SyncState = {
did: string;
dwnUrl: string;
watermark: string | undefined;
pullWatermark: string | undefined;
pushWatermark: string | undefined;
}

type DwnMessage = {
Expand Down Expand Up @@ -94,9 +92,6 @@ export class SyncManagerLevel implements SyncManager {
}

public async pull(): Promise<void> {
const syncPeerState = await this.getSyncPeerState({ syncDirection: 'pull' });
await this.enqueueOperations({ syncDirection: 'pull', syncPeerState });

const pullQueue = this.getPullQueue();
const pullJobs = await pullQueue.iterator().all();

Expand Down Expand Up @@ -225,9 +220,6 @@ export class SyncManagerLevel implements SyncManager {
}

public async push(): Promise<void> {
const syncPeerState = await this.getSyncPeerState({ syncDirection: 'push' });
await this.enqueueOperations({ syncDirection: 'push', syncPeerState });

const pushQueue = this.getPushQueue();
const pushJobs = await pushQueue.iterator().all();

Expand Down Expand Up @@ -296,7 +288,20 @@ export class SyncManagerLevel implements SyncManager {
public startSync(options: {
interval: number
}): Promise<void> {
const { interval = 120_000 } = options;
const { interval } = options;

// interval 0 means start instantly and don't repeat.
if (interval === 0) {
return new Promise( async (resolve,reject) => {
try {
await this.sync();
resolve();

} catch(error) {
reject(error);
}
})
}

return new Promise((resolve, reject) => {
if (this._syncIntervalId) {
Expand All @@ -305,8 +310,7 @@ export class SyncManagerLevel implements SyncManager {

this._syncIntervalId = setInterval(async () => {
try {
await this.push();
await this.pull();
await this.sync();
} catch (error) {
this.stopSync();
reject(error);
Expand All @@ -322,90 +326,93 @@ export class SyncManagerLevel implements SyncManager {
}
}

private async enqueueOperations(options: {
syncDirection: SyncDirection,
syncPeerState: SyncState[]
}) {
const { syncDirection, syncPeerState } = options;

for (let syncState of syncPeerState) {
// Get the event log from the remote DWN if pull sync, or local DWN if push sync.
const eventLog = await this.getDwnEventLog({
did : syncState.did,
dwnUrl : syncState.dwnUrl,
syncDirection,
watermark : syncState.watermark
});
private async sync(): Promise<void> {
await this.enqueueOperations();
await this.push();
await this.pull();
}

const syncOperations: DbBatchOperation[] = [];
private createOperationKey(did: string, dwnUrl: string, watermark: string, messageCid: string): string {
return [did, dwnUrl, watermark, messageCid].join('~');
}

for (let event of eventLog) {
/** Use "did~dwnUrl~watermark~messageCid" as the key in the sync queue.
* Note: It is critical that `watermark` precedes `messageCid` to
* ensure that when the sync jobs are pulled off the queue, they
* are lexographically sorted oldest to newest. */
const operationKey = [
syncState.did,
syncState.dwnUrl,
event.watermark,
event.messageCid
].join('~');
private dbBatchOperationPut(did: string, dwnUrl: string, watermark: string, messageCid: string): DbBatchOperation {
const key = this.createOperationKey(did, dwnUrl, watermark, messageCid);
return { type: 'put', key, value: '' }
}

const operation: DbBatchOperation = { type: 'put', key: operationKey, value: '' };
async enqueueOperations(direction?: 'pull' | 'push') {
const syncPeerState = await this.getSyncPeerState();
for (let syncState of syncPeerState) {
const localEvents = await this.getLocalDwnEvents({
did: syncState.did,
watermark: syncState.pushWatermark,
});
const remoteEvents = await this.getRemoteEvents({
did: syncState.did,
dwnUrl: syncState.dwnUrl,
watermark: syncState.pullWatermark,
});

syncOperations.push(operation);
const pullOperations: DbBatchOperation[] = [];
if (direction === undefined || direction === 'pull') {
remoteEvents.forEach(remoteEvent => {
if (localEvents.findIndex(localEvent => localEvent.messageCid === remoteEvent.messageCid) < 0) {
const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, remoteEvent.watermark, remoteEvent.messageCid);
pullOperations.push(operation);
}
});
}

if (syncOperations.length > 0) {
const syncQueue = (syncDirection === 'pull')
? this.getPullQueue()
: this.getPushQueue();
await syncQueue.batch(syncOperations as any);
const pushOperations: DbBatchOperation[] = [];
if (direction === undefined || direction === 'push') {
localEvents.forEach(localEvent => {
if(remoteEvents.findIndex(remoteEvent => remoteEvent.messageCid === localEvent.messageCid) < 0) {
const operation = this.dbBatchOperationPut(syncState.did, syncState.dwnUrl, localEvent.watermark, localEvent.messageCid);
pushOperations.push(operation);
}
});
}

await this.getPullQueue().batch(pullOperations as any);
await this.getPushQueue().batch(pushOperations as any);
}
}

private async getDwnEventLog(options: {
did: string,
dwnUrl: string,
syncDirection: SyncDirection,
watermark?: string
}) {
const { did, dwnUrl, syncDirection, watermark } = options;

private async getLocalDwnEvents(options:{ did: string, watermark?: string }) {
const { did, watermark } = options;
let eventsReply = {} as EventsGetReply;
({ reply: eventsReply } = await this.agent.dwnManager.processRequest({
author : did,
target : did,
messageType : 'EventsGet',
messageOptions : { watermark }
}));

return eventsReply.events ?? [];
}

if (syncDirection === 'pull') {
// When sync is a pull, get the event log from the remote DWN.
const eventsGetMessage = await this.agent.dwnManager.createMessage({
author : did,
messageType : 'EventsGet',
messageOptions : { watermark }
});
private async getRemoteEvents(options: { did: string, dwnUrl: string, watermark?: string }) {
const { did, dwnUrl, watermark } = options;
let eventsReply = {} as EventsGetReply;

try {
eventsReply = await this.agent.rpcClient.sendDwnRequest({
dwnUrl : dwnUrl,
targetDid : did,
message : eventsGetMessage
});
} catch {
// If a particular DWN service endpoint is unreachable, silently ignore.
}
const eventsGetMessage = await this.agent.dwnManager.createMessage({
author : did,
messageType : 'EventsGet',
messageOptions : { watermark }
});

} else if (syncDirection === 'push') {
// When sync is a push, get the event log from the local DWN.
({ reply: eventsReply } = await this.agent.dwnManager.processRequest({
author : did,
target : did,
messageType : 'EventsGet',
messageOptions : { watermark }
}));
try {
eventsReply = await this.agent.rpcClient.sendDwnRequest({
dwnUrl : dwnUrl,
targetDid : did,
message : eventsGetMessage
});
} catch {
// If a particular DWN service endpoint is unreachable, silently ignore.
}

const eventLog = eventsReply.events ?? [];

return eventLog;
return eventsReply.events ?? [];
}

private async getDwnMessage(
Expand Down Expand Up @@ -482,11 +489,7 @@ export class SyncManagerLevel implements SyncManager {
return dwnMessage;
}

private async getSyncPeerState(options: {
syncDirection: SyncDirection
}): Promise<SyncState[]> {
const { syncDirection } = options;

private async getSyncPeerState(): Promise<SyncState[]> {
// Get a list of the DIDs of all registered identities.
const registeredIdentities = await this._db.sublevel('registeredIdentities').keys().all();

Expand Down Expand Up @@ -521,33 +524,57 @@ export class SyncManagerLevel implements SyncManager {
/** Get the watermark (or undefined) for each (DID, DWN service endpoint, sync direction)
* combination and add it to the sync peer state array. */
for (let dwnUrl of service.serviceEndpoint.nodes) {
const watermark = await this.getWatermark(did, dwnUrl, syncDirection);
syncPeerState.push({ did, dwnUrl, watermark });
const watermark = await this.getWatermark(did, dwnUrl);
syncPeerState.push({ did, dwnUrl, pullWatermark: watermark.pull, pushWatermark: watermark.push });
}
}

return syncPeerState;
}

private async getWatermark(did: string, dwnUrl: string, direction: SyncDirection) {
const wmKey = `${did}~${dwnUrl}~${direction}`;
private async getWatermark(did: string, dwnUrl: string): Promise<{ pull?:string, push?: string }> {
const wmKey = `${did}~${dwnUrl}`;
const watermarkStore = this.getWatermarkStore();

try {
return await watermarkStore.get(wmKey);
const wm = await watermarkStore.get(wmKey);
const split = wm.split('~');
if (split.length !== 2) {
return {}
}

let pull;
let push;
if (split[0] !== '0') {
pull = split[0]
}
if (split[1] !== '0') {
push = split[1]
}

return { pull, push };
} catch(error: any) {
// Don't throw when a key wasn't found.
if (error.notFound) {
return undefined;
return {};
}
throw new Error('invalid watermark');
}
}

private async setWatermark(did: string, dwnUrl: string, direction: SyncDirection, watermark: string) {
const wmKey = `${did}~${dwnUrl}~${direction}`;
private async setWatermark(did: string, dwnUrl: string, pullWatermark?: string, pushWatermark?: string) {
const wmKey = `${did}~${dwnUrl}`;
const watermarkStore = this.getWatermarkStore();

await watermarkStore.put(wmKey, watermark);
if (pullWatermark === undefined) {
pullWatermark = '0'
}

if (pushWatermark === undefined) {
pushWatermark = '0'
}

await watermarkStore.put(wmKey, `${pullWatermark}~${pushWatermark}`);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions packages/agent/tests/chaos/chaos-monkey.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ describe('Chaos Monkey', () => {
let { reply: replyRemote } = await testAgent.agent.dwnManager.sendRequest(testQuery);

const startSync = Date.now();
await testAgent.agent.syncManager.push();
await testAgent.agent.syncManager.pull();
await testAgent.agent.syncManager.startSync({ interval: 0 });
const endSync = Date.now();

const remoteEntries = (replyRemote.entries || []).filter(e => (reply.entries || []).findIndex(le => (le as RecordsWriteMessage).recordId === (e as RecordsWriteMessage).recordId) < 0);
Expand Down
Loading

0 comments on commit 9a76b46

Please sign in to comment.