Skip to content

Commit

Permalink
add pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
kilbot committed Dec 26, 2023
1 parent ab3877a commit ea7fb33
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 51 deletions.
18 changes: 16 additions & 2 deletions src/collection-replication-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ export class CollectionReplicationState<T extends RxCollection> {
readonly lastModified$: Observable<string> = this.subjects.lastModified.asObservable();
readonly paused$: Observable<boolean> = this.subjects.paused.asObservable();

/**
*
*/
private firstSyncResolver: (() => void) | null = null;
public readonly firstSync: Promise<void>;

/**
*
*/
Expand All @@ -67,6 +73,11 @@ export class CollectionReplicationState<T extends RxCollection> {
this.hooks = hooks || {};
this.endpoint = endpoint;

// Initialize the firstSync promise
this.firstSync = new Promise<void>((resolve) => {
this.firstSyncResolver = resolve;
});

/**
* Subscribe to the remoteIDs local document
*/
Expand Down Expand Up @@ -126,8 +137,11 @@ export class CollectionReplicationState<T extends RxCollection> {
*/
async run() {
await this.auditIDs();
// await this.syncLastModified();
// await this.syncRemoteIDs();

if (this.firstSyncResolver) {
this.firstSyncResolver();
this.firstSyncResolver = null; // Clear the resolver to prevent future calls
}
}

/**
Expand Down
73 changes: 44 additions & 29 deletions src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,38 @@ export class Manager<TDatabase extends RxDatabase> {
collectionReplication.start();

/**
* Subscribe to query params and register a new replication state for the query
* - also cancel the previous query replication
* Add some subscriptions for the Query Class
* - these will be completed when the query instance is cancelled
*/
this.subs.push(
query.subs.push(
/**
* Subscribe to query params and register a new replication state for the query
* - also cancel the previous query replication
*/
query.params$.subscribe((params) => {
const apiQueryParams = this.getApiQueryParams(params);
const queryEndpoint = buildUrlWithParams(endpoint, apiQueryParams);
const queryReplication = this.registerQueryReplication({
collectionReplication,
collection,
endpoint: queryEndpoint,
});
this.addQueryKeyToReplicationsMap(key, queryEndpoint);
queryReplication.start();

if (!this.replicationStates.has(queryEndpoint)) {
const queryReplication = this.registerQueryReplication({
collectionReplication,
collection,
endpoint: queryEndpoint,
});

this.addQueryKeyToReplicationsMap(key, queryEndpoint);

/**
* Subscribe to the query trigger and trigger the query replication
*/
query.subs.push(
query.triggerServerQuery$.subscribe((page) => {
queryReplication.nextPage();
})
);

queryReplication.start();
}
})
);

Expand Down Expand Up @@ -238,27 +256,24 @@ export class Manager<TDatabase extends RxDatabase> {
* There is one replication state per unique query
*/
registerQueryReplication({ endpoint, collectionReplication, collection }) {
if (!this.replicationStates.has(endpoint)) {
const queryReplication = new QueryReplicationState({
httpClient: this.httpClient,
collectionReplication,
collection,
endpoint,
});

/**
* Subscribe to query errors and pipe them to the error subject
*/
this.subs.push(
queryReplication.error$.subscribe((error) => {
this.subjects.error.next(error);
})
);
const queryReplication = new QueryReplicationState({
httpClient: this.httpClient,
collectionReplication,
collection,
endpoint,
});

this.replicationStates.set(endpoint, queryReplication);
}
/**
* Subscribe to query errors and pipe them to the error subject
*/
this.subs.push(
queryReplication.error$.subscribe((error) => {
this.subjects.error.next(error);
})
);

return this.replicationStates.get(endpoint);
this.replicationStates.set(endpoint, collectionReplication);
return queryReplication;
}

/**
Expand Down
91 changes: 75 additions & 16 deletions src/query-replication-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@ export class QueryReplicationState<T extends RxCollection> {
public readonly subjects = {
error: new Subject<Error>(),
paused: new BehaviorSubject<boolean>(true), // true when the replication is paused, start true
active: new BehaviorSubject<boolean>(false), // true when something is running, false when not
};

readonly paused$: Observable<boolean> = this.subjects.paused.asObservable();

/**
*
*/
readonly error$: Observable<Error> = this.subjects.error.asObservable();
readonly paused$: Observable<boolean> = this.subjects.paused.asObservable();
readonly active$: Observable<boolean> = this.subjects.active.asObservable();

/**
*
*/
constructor({
collection,
httpClient,
Expand All @@ -60,6 +64,9 @@ export class QueryReplicationState<T extends RxCollection> {
*
*/
this.subs.push(
/**
* Pause/Start the replication
*/
this.paused$
.pipe(
switchMap((isPaused) => (isPaused ? [] : interval(this.pollingTime).pipe(startWith(0)))),
Expand All @@ -68,36 +75,47 @@ export class QueryReplicationState<T extends RxCollection> {
.subscribe(async () => {
this.run();
})

/**
*
*/
);
}

/**
*
*/
async run() {
await this.collectionReplication.firstSync;
await this.runPull();
}

/**
*
*/
async runPull() {
if (this.isStopped()) {
if (this.isStopped() || this.subjects.active.getValue()) {
return;
}

this.subjects.active.next(true);
const include = this.collectionReplication.getUnsyncedRemoteIDs();
const lastModified = this.collectionReplication.subjects.lastModified.getValue();

/**
* If there is nothing to fetch, then return
*/
if (isEmpty(include) && !lastModified) {
return;
}

try {
let response;
const include = this.collectionReplication.getUnsyncedRemoteIDs();

if (isEmpty(include)) {
response = await this.httpClient.get(this.endpoint);
response = await this.fetchLastModified({ lastModified });
} else {
response = await this.httpClient.post(
this.endpoint,
{
include,
},
{
headers: {
'X-HTTP-Method-Override': 'GET',
},
}
);
response = await this.fetchRemoteIDs({ include });
}

if (!response.data) {
Expand All @@ -115,9 +133,50 @@ export class QueryReplicationState<T extends RxCollection> {
await this.collection.bulkUpsert(documents);
} catch (error) {
this.subjects.error.next(error);
} finally {
this.subjects.active.next(false);
}
}

/**
*
*/
async fetchRemoteIDs({ include }) {
const response = await this.httpClient.post(
this.endpoint,
{
include,
},
{
headers: {
'X-HTTP-Method-Override': 'GET',
},
}
);

return response;
}

/**
*
*/
async fetchLastModified({ lastModified }) {
const response = await this.httpClient.get(this.endpoint, {
params: {
modified_after: lastModified,
},
});

return response;
}

/**
*
*/
nextPage() {
this.run();
}

/**
* We need to a way to pause and start the replication, eg: when the user is offline
*/
Expand Down
47 changes: 44 additions & 3 deletions src/query-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export class Query<T extends RxCollection> {
private isCanceled = false;
private whereClauses: WhereClause[] = [];
private hooks: QueryConfig<T>['hooks'];
private paginationEndReached = false;
private pageSize: number;

/**
*
Expand All @@ -53,6 +55,9 @@ export class Query<T extends RxCollection> {
result: new BehaviorSubject<DocumentType<T>[]>([]),
error: new Subject<Error>(),
cancel: new Subject<void>(),
currentPage: new BehaviorSubject<number>(1),
paginatedResult: new BehaviorSubject<DocumentType<T>[]>([]),
triggerServerQuery: new Subject<void>(),
};

/**
Expand All @@ -62,9 +67,13 @@ export class Query<T extends RxCollection> {
readonly result$: Observable<DocumentType<T>[]> = this.subjects.result.asObservable();
readonly error$: Observable<Error> = this.subjects.error.asObservable();
readonly cancel$: Observable<void> = this.subjects.cancel.asObservable();
readonly currentPage$: Observable<number> = this.subjects.currentPage.asObservable();
readonly paginatedResult$: Observable<DocumentType<T>[]> =
this.subjects.paginatedResult.asObservable();
readonly triggerServerQuery$: Observable<void> = this.subjects.triggerServerQuery.asObservable();

readonly resource = new ObservableResource(this.result$);
readonly paginatedResource = new ObservableResource(this.result$);
readonly paginatedResource = new ObservableResource(this.paginatedResult$);

/**
*
Expand All @@ -74,6 +83,7 @@ export class Query<T extends RxCollection> {
this.collection = collection;
this.subjects.params.next(initialParams);
this.hooks = hooks || {};
this.pageSize = 10;

/**
* Keep track of what we are subscribed to
Expand All @@ -84,7 +94,25 @@ export class Query<T extends RxCollection> {
*/
this.find$.subscribe((result) => {
this.subjects.result.next(result);
})
}),

/**
* Subscribe to result$ and emit paginated results
*/
this.result$
.pipe(
switchMap((items) =>
this.currentPage$.pipe(
map((currentPage) => {
const end = currentPage * this.pageSize;
const pageItems = items.slice(0, end);
this.paginationEndReached = pageItems.length < end;
return pageItems;
})
)
)
)
.subscribe(this.subjects.paginatedResult)
);
}

Expand Down Expand Up @@ -211,7 +239,20 @@ export class Query<T extends RxCollection> {
/**
* Pagination
*/
nextPage() {}
nextPage() {
if (!this.paginationEndReached) {
const page = this.subjects.currentPage.value + 1;
this.subjects.currentPage.next(page);
} else {
// the Query Replication will listen for this event and trigger a server query
this.subjects.triggerServerQuery.next();
}
}

paginationReset() {
this.paginationEndReached = false;
this.subjects.currentPage.next(1);
}

/**
* Cancel
Expand Down
2 changes: 1 addition & 1 deletion src/use-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ export const useQuery = (queryOptions: QueryOptions) => {

/**
* This is a hack for when when collection is reset:
* - re-register query
* - re-render components that use this query
* - on re-render the query is recreated
*/
const trigger = useObservableState(query.cancel$.pipe(map(() => trigger + 1)), 0);

Expand Down

0 comments on commit ea7fb33

Please sign in to comment.