Skip to content

Commit

Permalink
listen for collection reset
Browse files Browse the repository at this point in the history
  • Loading branch information
kilbot committed Dec 25, 2023
1 parent df78147 commit ab3877a
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 88 deletions.
23 changes: 22 additions & 1 deletion src/collection-replication-state.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BehaviorSubject, Observable, Subscription, Subject, interval } from 'rxjs';
import { BehaviorSubject, Observable, Subscription, Subject, interval, combineLatest } from 'rxjs';
import {
filter,
tap,
Expand Down Expand Up @@ -194,6 +194,27 @@ export class CollectionReplicationState<T extends RxCollection> {
}
}

/**
* Total count is the number of remote IDs, plus any local docs without an ID
*/
get total$() {
return combineLatest([this.remoteIDs$, this.unsyncedLocalDocs$]).pipe(
map(([remoteIDs, unsyncedLocalDocs]) => {
return remoteIDs.length + unsyncedLocalDocs.length;
})
);
}

get unsyncedLocalDocs$() {
return this.collection.find({ selector: { id: null } }).$;
}

getUnsyncedRemoteIDs() {
const remoteIDs = this.subjects.remoteIDs.getValue();
const localIDs = this.subjects.localIDs.getValue();
return remoteIDs.filter((id) => !localIDs.includes(id));
}

/**
* We need to a way to pause and start the replication, eg: when the user is offline
*/
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ import { Query } from './query-state';

export { QueryProvider, useQueryManager } from './provider';
export { useQuery } from './use-query';
export { useReplicationState } from './use-replication-state';
export type { Query };
162 changes: 103 additions & 59 deletions src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,54 @@ import { Observable, Subject, Subscription } from 'rxjs';
import { CollectionReplicationState } from './collection-replication-state';
import { QueryReplicationState } from './query-replication-state';
import { Query } from './query-state';
import { buildUrlWithParams } from './utils';

import type { QueryParams, QueryHooks } from './query-state';
import type { RxDatabase, RxCollection } from 'rxdb';

/**
*
*/
export interface ResisterQueryConfig {
queryKeys: (string | number | object)[];
collectionName: string;
initialParams?: QueryParams;
hooks?: QueryHooks;
locale?: string;
endpoint?: string;
}

/**
*
*/
export class Manager<TDatabase extends RxDatabase> {
private isCanceled = false;

/**
* Registry of all RxDB queries, indexed by query key
* Registry of all RxDB queries, indexed by queryKeys
*/
public queries: Map<string, Query<RxCollection>> = new Map();

/**
* Registry of all replication states, indexed by endpoint
* Registry of all replication states, indexed by endpoint & query params
* - for most collections collection.name = endpoint
* - special case for product variations, where:
* -- endpoint = 'products/<parent_id>/variations' and
* -- endpoint = 'products/variations' (for all variation queries, ie: barcode)
* - a query replication state will have the endpoint and query params, eg:
* -- 'products?stock_status=instock'
*
* NOTE: replication states can be shared between queryKeys
*/
private collectionReplicationStates: Map<string, CollectionReplicationState<RxCollection>> =
new Map();
private queryReplicationStates: Map<string, Map<string, QueryReplicationState<RxCollection>>> =
new Map();
public replicationStates: Map<
string,
CollectionReplicationState<RxCollection> | QueryReplicationState<RxCollection>
> = new Map();

/**
*
*/
public queryKeyToReplicationsMap: Map<string, string[]> = new Map();

/**
*
Expand All @@ -43,18 +68,42 @@ export class Manager<TDatabase extends RxDatabase> {
constructor(
private localDB: TDatabase,
private httpClient
) {}
) {
/**
* Subscribe to localDB to detect if collection is reset
*/
this.subs.push(
this.localDB.reset$.subscribe((col) => {
// find all queries that use this collection
this.queries.forEach((query, key) => {
if (query.collection.name === col.name) {
// cancel all replications
const endpoints = this.queryKeyToReplicationsMap.get(key);
endpoints.forEach((endpoint) => {
const replication = this.replicationStates.get(endpoint);
replication.cancel();
this.replicationStates.delete(endpoint);
});
this.queryKeyToReplicationsMap.delete(key);
// cancel the query
this.queries.delete(key);
query.cancel();
}
});
})
);
}

serializeQueryKey(queryKey: (string | number | object)[]): string {
stringify(params: any): string {
try {
return JSON.stringify(queryKey);
return JSON.stringify(params);
} catch (error) {
this.subjects.error.next(new Error(`Failed to serialize query key: ${error}`));
}
}

hasQuery(queryKey: (string | number | object)[]): boolean {
const key = this.serializeQueryKey(queryKey);
hasQuery(queryKeys: (string | number | object)[]): boolean {
const key = this.stringify(queryKeys);
return this.queries.has(key);
}

Expand All @@ -64,20 +113,18 @@ export class Manager<TDatabase extends RxDatabase> {
initialParams,
hooks = {},
locale,
}: {
queryKeys: (string | number | object)[];
collectionName: string;
initialParams?: QueryParams;
hooks?: QueryHooks;
locale?: string;
}) {
const key = this.serializeQueryKey(queryKeys);
...args
}: ResisterQueryConfig) {
const key = this.stringify(queryKeys);
const endpoint = args.endpoint || collectionName;

if (key && !this.queries.has(key)) {
const collection = this.getCollection(collectionName);
if (collection) {
const query = new Query<typeof collection>({ collection, initialParams, hooks });
const endpoint = this.getEndpoint(collection, hooks);
const query = new Query<typeof collection>({ id: key, collection, initialParams, hooks });
const collectionReplication = this.registerCollectionReplication({ collection, endpoint });
this.addQueryKeyToReplicationsMap(key, endpoint);
collectionReplication.start();

/**
* Subscribe to query params and register a new replication state for the query
Expand All @@ -86,12 +133,14 @@ export class Manager<TDatabase extends RxDatabase> {
this.subs.push(
query.params$.subscribe((params) => {
const apiQueryParams = this.getApiQueryParams(params);
this.registerQueryReplication(
apiQueryParams,
const queryEndpoint = buildUrlWithParams(endpoint, apiQueryParams);
const queryReplication = this.registerQueryReplication({
collectionReplication,
collection,
endpoint
);
endpoint: queryEndpoint,
});
this.addQueryKeyToReplicationsMap(key, queryEndpoint);
queryReplication.start();
})
);

Expand All @@ -107,6 +156,7 @@ export class Manager<TDatabase extends RxDatabase> {
this.queries.set(key, query);
}
}

return this.getQuery(queryKeys);
}

Expand All @@ -118,7 +168,7 @@ export class Manager<TDatabase extends RxDatabase> {
}

getQuery(queryKeys: (string | number | object)[]) {
const key = this.serializeQueryKey(queryKeys);
const key = this.stringify(queryKeys);
const query = this.queries.get(key);

if (!query) {
Expand All @@ -129,7 +179,7 @@ export class Manager<TDatabase extends RxDatabase> {
}

deregisterQuery(queryKeys: (string | number | object)[]): void {
const key = this.serializeQueryKey(queryKeys);
const key = this.stringify(queryKeys);
// cancel the query
const query = this.queries.get(key);
if (query) {
Expand All @@ -138,22 +188,31 @@ export class Manager<TDatabase extends RxDatabase> {
}
}

/**
* Allow the user to override the endpoint, eg: variations collection will have
* /products/<parent_id>/variations endpoint
*/
getEndpoint(collection, hooks): string {
if (hooks.preEndpoint) {
return hooks.preEndpoint(collection);
addQueryKeyToReplicationsMap(key: string, endpoint: string) {
if (!this.queryKeyToReplicationsMap.has(key)) {
this.queryKeyToReplicationsMap.set(key, []);
}
const replications = this.queryKeyToReplicationsMap.get(key);
if (!replications.includes(endpoint)) {
replications.push(endpoint);
}
return collection.name;
}

getReplicationStatesByQueryID(key: string) {
const endpoints = this.queryKeyToReplicationsMap.get(key);
return endpoints.map((endpoint) => this.replicationStates.get(endpoint));
}

getReplicationStatesByQueryKeys(queryKeys: (string | number | object)[]) {
const key = this.stringify(queryKeys);
return this.getReplicationStatesByQueryID(key);
}

/**
* There is one replication state per collection
*/
registerCollectionReplication({ collection, endpoint }) {
if (!this.collectionReplicationStates.has(endpoint)) {
if (!this.replicationStates.has(endpoint)) {
const collectionReplication = new CollectionReplicationState({
httpClient: this.httpClient,
collection,
Expand All @@ -169,33 +228,19 @@ export class Manager<TDatabase extends RxDatabase> {
})
);

this.collectionReplicationStates.set(endpoint, collectionReplication);
this.replicationStates.set(endpoint, collectionReplication);
}

return this.collectionReplicationStates.get(endpoint);
return this.replicationStates.get(endpoint);
}

/**
* There is one replication state per unique query
*/
registerQueryReplication(apiQueryParams, collectionReplication, collection, endpoint) {
if (!this.queryReplicationStates.has(endpoint)) {
this.queryReplicationStates.set(endpoint, new Map());
}
const queryStates = this.queryReplicationStates.get(endpoint);
const apiQueryKey = this.serializeQueryKey(apiQueryParams);

// pause all other queries
queryStates.forEach((state) => {
state.pause();
});

// if there is no query state for this query, create one
let queryState = queryStates.get(apiQueryKey);
if (!queryState) {
queryState = new QueryReplicationState({
registerQueryReplication({ endpoint, collectionReplication, collection }) {
if (!this.replicationStates.has(endpoint)) {
const queryReplication = new QueryReplicationState({
httpClient: this.httpClient,
apiQueryParams,
collectionReplication,
collection,
endpoint,
Expand All @@ -205,16 +250,15 @@ export class Manager<TDatabase extends RxDatabase> {
* Subscribe to query errors and pipe them to the error subject
*/
this.subs.push(
queryState.error$.subscribe((error) => {
queryReplication.error$.subscribe((error) => {
this.subjects.error.next(error);
})
);

queryStates.set(apiQueryKey, queryState);
this.replicationStates.set(endpoint, queryReplication);
}

// start the query
queryState.start();
return this.replicationStates.get(endpoint);
}

/**
Expand Down
Loading

0 comments on commit ab3877a

Please sign in to comment.