Skip to content

Commit

Permalink
add collection replication state
Browse files Browse the repository at this point in the history
  • Loading branch information
kilbot committed Dec 18, 2023
1 parent dea33f0 commit 6907a59
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 13 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"packageManager": "[email protected]",
"dependencies": {
"@shelf/fast-natural-order-by": "^2.0.0",
"lodash": "^4.17.21",
"rxjs": "^7.8.1",
"typescript": "^5.3.3"
},
Expand Down
Empty file removed src/api-service.ts
Empty file.
236 changes: 236 additions & 0 deletions src/collection-replication-state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import { BehaviorSubject, Observable, Subscription, Subject, interval } from 'rxjs';
import {
filter,
tap,
map,
switchMap,
startWith,
debounceTime,
distinctUntilChanged,
} from 'rxjs/operators';

import { isArrayOfIntegers } from './utils';

import type { RxCollection } from 'rxdb';

export interface QueryHooks {
preEndpoint?: (collection: RxCollection) => string;
}

interface CollectionReplicationConfig<T extends RxCollection> {
collection: T;
httpClient: any;
hooks?: any;
}

export class CollectionReplicationState<T extends RxCollection> {
private hooks: CollectionReplicationConfig<T>['hooks'];
public readonly endpoint: string;
public readonly collection: T;
public readonly httpClient: any;

/**
* Internal State
*/
private isCanceled = false;
private lastFetchRemoteIDsTime = null;
private pollingTime = 1000 * 60 * 5; // 5 minutes

/**
*
*/
public readonly subs: Subscription[] = [];
public readonly subjects = {
error: new Subject<Error>(),
remoteIDs: new BehaviorSubject<number[]>([]), // emits all remote ids that are known to the replication
localIDs: new BehaviorSubject<number[]>([]), // emits all local ids that are known to the replication
lastModified: new BehaviorSubject<string>(null), // emits the date of the last modified document
paused: new BehaviorSubject<boolean>(true), // true when the replication is paused, start true
};

/**
*
*/
readonly error$: Observable<Error> = this.subjects.error.asObservable();
readonly remoteIDs$: Observable<number[]> = this.subjects.remoteIDs.asObservable();
readonly localIDs$: Observable<number[]> = this.subjects.localIDs.asObservable();
readonly lastModified$: Observable<string> = this.subjects.lastModified.asObservable();
readonly paused$: Observable<boolean> = this.subjects.paused.asObservable();

/**
*
*/
constructor({ collection, httpClient, hooks }: CollectionReplicationConfig<T>) {
this.collection = collection;
this.httpClient = httpClient;
this.hooks = hooks || {};
this.endpoint = this.getEndpoint();

/**
* Subscribe to the remoteIDs local document
*/
this.subs.push(
collection.getLocal$('audit').subscribe((doc) => {
if (doc) {
this.subjects.remoteIDs.next(doc.get('remoteIDs'));
}
})
);

/**
* Subscribe to collection changes and track the array of localIDs and lastModified date
* @TODO - categories and tags don't have a date_modified_gmt field, what to do?
* @TODO - get the IDs without having to construict the whole document
*/
this.subs.push(
this.collection
.find({
selector: {
id: { $ne: null },
},
sort:
this.collection.name === 'products/categories' ||
this.collection.name === 'products/tags'
? undefined
: [{ date_modified_gmt: 'desc' }],
})
.$.subscribe((docs) => {
/**
* @TODO - I need to change the tax-rates schema to make sure id is always integer
*/
const ids = docs.map((doc) => parseInt(doc.get('id'), 10));
const lastModified = docs[0]?.get('date_modified_gmt');
this.subjects.localIDs.next(ids);
this.subjects.lastModified.next(lastModified);
})
);

/**
*
*/
this.subs.push(
this.paused$
.pipe(
switchMap((isPaused) => (isPaused ? [] : interval(this.pollingTime).pipe(startWith(0)))),
filter(() => !this.subjects.paused.getValue())
)
.subscribe(async () => {
this.run();
})
);
}

/**
* Run the collection replication
*/
async run() {
await this.auditIDs();
// await this.syncLastModified();
// await this.syncRemoteIDs();
}

/**
* Allow the user to override the endpoint, eg: variations collection will have
* /products/<parent_id>/variations endpoint
*/
getEndpoint(): string {
if (this.hooks.preEndpoint) {
return this.hooks.preEndpoint(this.collection);
}
return this.collection.name;
}

/**
*
*/
async auditIDs() {
if (this.isStopped()) {
return;
}

let remoteIDs;
if (this.lastFetchRemoteIDsTime < new Date().getTime() - this.pollingTime) {
remoteIDs = await this.fetchRemoteIDs();
if (isArrayOfIntegers(remoteIDs)) {
await this.collection.upsertLocal('audit', { remoteIDs });
this.subjects.remoteIDs.next(remoteIDs);
}
} else {
remoteIDs = this.subjects.remoteIDs.getValue();
}

if (!Array.isArray(remoteIDs) || remoteIDs.length === 0) {
return;
}

/**
* @TODO - variations can be orphaned at the moment, we need a relationship table with parent
*/
const remove = this.subjects.localIDs.getValue().filter((id) => !remoteIDs.includes(id));
if (remove.length > 0 && this.collection.name !== 'variations') {
// deletion should be rare, only when an item is deleted from the server
console.warn('removing', remove, 'from', this.collection.name);
await this.collection.find({ selector: { id: { $in: remove } } }).remove();
}
}

/**
* Makes a request the the endpoint to fetch all remote IDs
* - can be overwritten by the fetchRemoteIDs hook, this is required for variations
*/
async fetchRemoteIDs(): Promise<number[]> {
if (this.hooks?.fetchRemoteIDs) {
/**
* @HACK - this is a bit hacky, but it works for now
* Variations is one collection locally, containing all variations
* But it is multiple endpoints on the server, one for each product
* Maybe we should have a separate collection for each variable product?
*/
return this.hooks?.fetchRemoteIDs(this.endpoint, this.collection);
}

try {
const response = await this.httpClient.get(this.endpoint, {
params: { fields: ['id'], posts_per_page: -1 },
});

if (!response.data || !Array.isArray(response.data)) {
throw new Error('Invalid response data for remote IDs');
}

return response.data.map((doc) => doc.id);
} catch (error) {
this.subjects.error.next(error);
}
}

/**
* We need to a way to pause and start the replication, eg: when the user is offline
*/
start() {
this.subjects.paused.next(false);
}

pause() {
this.subjects.paused.next(true);
}

isStopped() {
return this.isCanceled || this.subjects.paused.getValue();
}

/**
* Cancel
*
* Make sure we clean up subscriptions:
* - things we subscribe to in this class, also
* - complete the observables accessible from this class
*/
cancel() {
this.isCanceled = true;
this.subs.forEach((sub) => sub.unsubscribe());

// Complete subjects
this.subjects.error.complete();
}
}
53 changes: 46 additions & 7 deletions src/manager.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import { Observable, Subject, Subscription } from 'rxjs';

import { CollectionReplicationState } from './collection-replication-state';
import { QueryReplicationState } from './query-replication-state';
import { Query } from './query-state';

import type { QueryParams, QueryHooks } from './query-state';
import type { RxDatabase } from 'rxdb';
import type { RxDatabase, RxCollection } from 'rxdb';

export class Manager<T extends RxDatabase> {
private queries: Map<string, Query<any>> = new Map();
export class Manager<TDatabase extends RxDatabase> {
private isCanceled = false;

/**
* Registry of all query and replication states
*/
private queries: Map<string, Query<RxCollection>> = new Map();
private collectionReplicationStates: Map<string, CollectionReplicationState<RxCollection>> =
new Map();
private queryReplicationStates: Map<string, QueryReplicationState> = new Map();

/**
*
*/
Expand All @@ -23,7 +32,7 @@ export class Manager<T extends RxDatabase> {
readonly error$: Observable<Error> = this.subjects.error.asObservable();

constructor(
private localDB: T,
private localDB: TDatabase,
private httpClient
) {}

Expand All @@ -49,15 +58,17 @@ export class Manager<T extends RxDatabase> {
}: {
queryKey: (string | number | object)[];
collectionName: string;
initialParams: QueryParams;
initialParams?: QueryParams;
hooks?: QueryHooks;
locale?: string;
}) {
const key = this.serializeQueryKey(queryKey);
if (key && !this.queries.has(key)) {
const collection = this.getCollection(collectionName);
if (collection) {
const query = new Query({ collection, initialParams, hooks });
const query = new Query<typeof collection>({ collection, initialParams, hooks });
const collectionReplication = this.registerCollectionReplication(collectionName);

/**
* Subscribe to query errors and pipe them to the error subject
*/
Expand All @@ -66,6 +77,7 @@ export class Manager<T extends RxDatabase> {
this.subjects.error.next(error);
})
);

this.queries.set(key, query);
}
}
Expand All @@ -79,7 +91,7 @@ export class Manager<T extends RxDatabase> {
return this.localDB[collectionName];
}

getQuery<T>(queryKey: (string | number | object)[]): Query<T> | undefined {
getQuery(queryKey: (string | number | object)[]) {
const key = this.serializeQueryKey(queryKey);
const query = this.queries.get(key);

Expand All @@ -100,6 +112,33 @@ export class Manager<T extends RxDatabase> {
}
}

/**
* There is one collection replication state per collection
*/
registerCollectionReplication(collectionName: string) {
if (!this.collectionReplicationStates.has(collectionName)) {
const collection = this.getCollection(collectionName);

const collectionReplication = new CollectionReplicationState({
httpClient: this.httpClient,
collection,
});

/**
* Subscribe to query errors and pipe them to the error subject
*/
this.subs.push(
collectionReplication.error$.subscribe((error) => {
this.subjects.error.next(error);
})
);

this.collectionReplicationStates.set(collectionName, collectionReplication);
}

return this.collectionReplicationStates.get(collectionName);
}

/**
* Cancel
*
Expand Down
1 change: 1 addition & 0 deletions src/query-replication-state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class QueryReplicationState {}
2 changes: 1 addition & 1 deletion src/query-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class Query<T extends RxCollection> {
/**
*
*/
constructor({ collection, initialParams, hooks }: QueryConfig<T>) {
constructor({ collection, initialParams = {}, hooks }: QueryConfig<T>) {
this.collection = collection;
this.subjects.params.next(initialParams);
this.hooks = hooks || {};
Expand Down
Empty file removed src/replication-state.ts
Empty file.
2 changes: 1 addition & 1 deletion src/use-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { QueryParams, QueryHooks } from './query-state';
interface QueryOptions {
queryKey: (string | number | object)[];
collectionName: string;
initialParams: QueryParams;
initialParams?: QueryParams;
hooks?: QueryHooks;
locale?: string;
}
Expand Down
8 changes: 8 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
*
* @param value
* @returns
*/
export function isArrayOfIntegers(value) {
return Array.isArray(value) && value.every((item) => Number.isInteger(item));
}
Loading

0 comments on commit 6907a59

Please sign in to comment.