Skip to content

Commit

Permalink
Merge pull request #460 from brendandburns/watch
Browse files Browse the repository at this point in the history
Add initial support for the BOOKMARK message in watch.
  • Loading branch information
k8s-ci-robot authored Jun 5, 2020
2 parents 7590f31 + 5fa7604 commit 79736b9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
11 changes: 8 additions & 3 deletions examples/typescript/watch/watch-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ kc.loadFromDefault();
const watch = new k8s.Watch(kc);
watch.watch('/api/v1/namespaces',
// optional query parameters can go here.
{},
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, obj) => {
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
// tslint:disable-next-line:no-console
console.log('new object:');
Expand All @@ -18,12 +20,15 @@ watch.watch('/api/v1/namespaces',
} else if (type === 'DELETED') {
// tslint:disable-next-line:no-console
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
// tslint:disable-next-line:no-console
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
// tslint:disable-next-line:no-console
console.log('unknown type: ' + type);
}
// tslint:disable-next-line:no-console
console.log(obj);
console.log(apiObj);
},
// done callback is called if the watch terminates normally
(err) => {
Expand Down
17 changes: 16 additions & 1 deletion src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface ObjectCache<T> {

export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
private objects: T[] = [];
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};

Expand All @@ -24,6 +25,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[UPDATE] = [];
this.callbackCache[DELETE] = [];
this.callbackCache[ERROR] = [];
this.resourceVersion = '';
if (autoStart) {
this.doneHandler(null);
}
Expand Down Expand Up @@ -68,11 +70,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
return this.indexCache[namespace] as ReadonlyArray<T>;
}

public latestResourceVersion(): string {
return this.resourceVersion;
}

private async doneHandler(err: any) {
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
return;
}
// TODO: Don't always list here for efficiency
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
// Or if resourceVersion is empty.
const promise = this.listFn();
const result = await promise;
const list = result.body;
Expand Down Expand Up @@ -109,7 +118,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
addOrUpdateObject(namespaceList, obj);
}

private watchHandler(phase: string, obj: T) {
private watchHandler(phase: string, obj: T, watchObj?: any) {
switch (phase) {
case 'ADDED':
case 'MODIFIED':
Expand All @@ -132,6 +141,12 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
}
break;
case 'BOOKMARK':
// nothing to do, here for documentation, mostly.
break;
}
if (watchObj && watchObj.metadata) {
this.resourceVersion = watchObj.metadata.resourceVersion;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class Watch {
public async watch(
path: string,
queryParams: any,
callback: (phase: string, obj: any) => void,
callback: (phase: string, apiObj: any, watchObj?: any) => void,
done: (err: any) => void,
): Promise<any> {
const cluster = this.config.getCurrentCluster();
Expand All @@ -60,7 +60,7 @@ export class Watch {
stream.on('data', (line) => {
try {
const data = JSON.parse(line);
callback(data.type, data.object);
callback(data.type, data.object, data);
} catch (ignore) {
// ignore parse errors
}
Expand Down

0 comments on commit 79736b9

Please sign in to comment.