From 5fa7604718e4f4456add9460f7f1624c4fb61bf9 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Thu, 4 Jun 2020 16:37:45 -0700 Subject: [PATCH] Add initial support for the BOOKMARK message in watch. --- examples/typescript/watch/watch-example.ts | 11 ++++++++--- src/cache.ts | 17 ++++++++++++++++- src/watch.ts | 4 ++-- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/examples/typescript/watch/watch-example.ts b/examples/typescript/watch/watch-example.ts index e4c52baca9..3a4723085e 100644 --- a/examples/typescript/watch/watch-example.ts +++ b/examples/typescript/watch/watch-example.ts @@ -6,9 +6,11 @@ kc.loadFromDefault(); const watch = new k8s.Watch(kc); watch.watch('/api/v1/namespaces', // optional query parameters can go here. - {}, + { + allowWatchBookmarks: true, + }, // callback is called for each received object. - (type, obj) => { + (type, apiObj, watchObj) => { if (type === 'ADDED') { // tslint:disable-next-line:no-console console.log('new object:'); @@ -18,12 +20,15 @@ watch.watch('/api/v1/namespaces', } else if (type === 'DELETED') { // tslint:disable-next-line:no-console console.log('deleted object:'); + } else if (type === 'BOOKMARK') { + // tslint:disable-next-line:no-console + console.log(`bookmark: ${watchObj.metadata.resourceVersion}`); } else { // tslint:disable-next-line:no-console console.log('unknown type: ' + type); } // tslint:disable-next-line:no-console - console.log(obj); + console.log(apiObj); }, // done callback is called if the watch terminates normally (err) => { diff --git a/src/cache.ts b/src/cache.ts index 4fcee68963..1c1fd87bee 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -9,6 +9,7 @@ export interface ObjectCache { export class ListWatch implements ObjectCache, Informer { private objects: T[] = []; + private resourceVersion: string; private readonly indexCache: { [key: string]: T[] } = {}; private readonly callbackCache: { [key: string]: Array> } = {}; @@ -24,6 +25,7 @@ export class ListWatch implements ObjectCache, In this.callbackCache[UPDATE] = []; this.callbackCache[DELETE] = []; this.callbackCache[ERROR] = []; + this.resourceVersion = ''; if (autoStart) { this.doneHandler(null); } @@ -68,11 +70,18 @@ export class ListWatch implements ObjectCache, In return this.indexCache[namespace] as ReadonlyArray; } + public latestResourceVersion(): string { + return this.resourceVersion; + } + private async doneHandler(err: any) { if (err) { this.callbackCache[ERROR].forEach((elt: ObjectCallback) => elt(err)); return; } + // TODO: Don't always list here for efficiency + // try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case. + // Or if resourceVersion is empty. const promise = this.listFn(); const result = await promise; const list = result.body; @@ -109,7 +118,7 @@ export class ListWatch implements ObjectCache, In addOrUpdateObject(namespaceList, obj); } - private watchHandler(phase: string, obj: T) { + private watchHandler(phase: string, obj: T, watchObj?: any) { switch (phase) { case 'ADDED': case 'MODIFIED': @@ -132,6 +141,12 @@ export class ListWatch implements ObjectCache, In } } break; + case 'BOOKMARK': + // nothing to do, here for documentation, mostly. + break; + } + if (watchObj && watchObj.metadata) { + this.resourceVersion = watchObj.metadata.resourceVersion; } } } diff --git a/src/watch.ts b/src/watch.ts index 648f437de1..9593c056d1 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -34,7 +34,7 @@ export class Watch { public async watch( path: string, queryParams: any, - callback: (phase: string, obj: any) => void, + callback: (phase: string, apiObj: any, watchObj?: any) => void, done: (err: any) => void, ): Promise { const cluster = this.config.getCurrentCluster(); @@ -60,7 +60,7 @@ export class Watch { stream.on('data', (line) => { try { const data = JSON.parse(line); - callback(data.type, data.object); + callback(data.type, data.object, data); } catch (ignore) { // ignore parse errors }