Skip to content

Commit e551d92

Browse files
feat: add ability to periodically request debug headers (#139)
* feat: add ability to periodically request debug headers
1 parent 2140577 commit e551d92

File tree

8 files changed

+181
-6
lines changed

8 files changed

+181
-6
lines changed

Diff for: protos/grpc_gcp.proto

+8
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ message ChannelPoolConfig {
3535
// New channel will be created once it get hit, until we reach the max size
3636
// of the channel pool.
3737
uint32 max_concurrent_streams_low_watermark = 3;
38+
39+
// This will request debug response headers from the load balancer.
40+
// The headers are meant to help diagnose issues when connecting to GCP
41+
// services. The headers are primarily useful to support engineers that will
42+
// be able to decrypt them. The headers have a fairly large payload (~1kib),
43+
// so will be requested at most once per this period. A negative number will
44+
// request the headers for every request, 0 will never request headers.
45+
uint32 debug_header_interval_secs = 5;
3846
}
3947

4048
message MethodConfig {

Diff for: src/channel_ref.ts

+27
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ export class ChannelRef {
2727
private readonly channelId: number;
2828
private affinityCount: number;
2929
private activeStreamsCount: number;
30+
private debugHeadersRequestedAt: Date | null;
31+
private shouldForceDebugHeadersOnNextRequest: boolean;
32+
private closed: boolean;
3033

3134
/**
3235
* @param channel The underlying grpc channel.
@@ -44,6 +47,18 @@ export class ChannelRef {
4447
this.channelId = channelId;
4548
this.affinityCount = affinityCount ? affinityCount : 0;
4649
this.activeStreamsCount = activeStreamsCount ? activeStreamsCount : 0;
50+
this.debugHeadersRequestedAt = null;
51+
this.shouldForceDebugHeadersOnNextRequest = false;
52+
this.closed = false;
53+
}
54+
55+
close() {
56+
this.closed = true;
57+
this.channel.close();
58+
}
59+
60+
isClosed() {
61+
return this.closed;
4762
}
4863

4964
affinityCountIncr() {
@@ -70,6 +85,18 @@ export class ChannelRef {
7085
return this.activeStreamsCount;
7186
}
7287

88+
forceDebugHeadersOnNextRequest() {
89+
this.shouldForceDebugHeadersOnNextRequest = true;
90+
}
91+
notifyDebugHeadersRequested() {
92+
this.debugHeadersRequestedAt = new Date();
93+
this.shouldForceDebugHeadersOnNextRequest = false;
94+
}
95+
96+
getDebugHeadersRequestedAt(): Date | null {
97+
return this.debugHeadersRequestedAt;
98+
}
99+
73100
getChannel() {
74101
return this.channel;
75102
}

Diff for: src/gcp_channel_factory.ts

+39-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {promisify} from 'util';
2121

2222
import {ChannelRef} from './channel_ref';
2323
import * as protoRoot from './generated/grpc_gcp';
24-
24+
import {connectivityState} from '@grpc/grpc-js';
2525
import ApiConfig = protoRoot.grpc.gcp.ApiConfig;
2626
import IAffinityConfig = protoRoot.grpc.gcp.IAffinityConfig;
2727

@@ -34,6 +34,8 @@ export interface GcpChannelFactoryInterface extends grpcType.ChannelInterface {
3434
getAffinityConfig(methodName: string): IAffinityConfig;
3535
bind(channelRef: ChannelRef, affinityKey: string): void;
3636
unbind(boundKey?: string): void;
37+
shouldRequestDebugHeaders(lastRequested: Date | null) : boolean;
38+
3739
}
3840

3941
export interface GcpChannelFactoryConstructor {
@@ -61,6 +63,7 @@ export function getGcpChannelFactoryClass(
6163
private channelRefs: ChannelRef[] = [];
6264
private target: string;
6365
private credentials: grpcType.ChannelCredentials;
66+
private debugHeaderIntervalSecs: number;
6467

6568
/**
6669
* @param address The address of the server to connect to.
@@ -84,6 +87,7 @@ export function getGcpChannelFactoryClass(
8487
this.minSize = 1;
8588
this.maxSize = 10;
8689
this.maxConcurrentStreamsLowWatermark = 100;
90+
this.debugHeaderIntervalSecs = 0;
8791
const gcpApiConfig = options.gcpApiConfig;
8892
if (gcpApiConfig) {
8993
if (gcpApiConfig.channelPool) {
@@ -98,6 +102,7 @@ export function getGcpChannelFactoryClass(
98102
if (this.maxSize < this.minSize) {
99103
throw new Error('Invalid channelPool config: minSize must <= maxSize')
100104
}
105+
this.debugHeaderIntervalSecs = channelPool.debugHeaderIntervalSecs || 0;
101106
}
102107
this.initMethodToAffinityMap(gcpApiConfig);
103108
}
@@ -187,9 +192,33 @@ export function getGcpChannelFactoryClass(
187192
);
188193
const channelRef = new ChannelRef(grpcChannel, size);
189194
this.channelRefs.push(channelRef);
195+
196+
if (this.debugHeaderIntervalSecs) {
197+
this.setupDebugHeadersOnChannelTransition(channelRef);
198+
}
199+
190200
return channelRef;
191201
}
192202

203+
private setupDebugHeadersOnChannelTransition(channel: ChannelRef) {
204+
const self = this;
205+
206+
if (channel.isClosed()) {
207+
return;
208+
}
209+
210+
let currentState = channel.getChannel().getConnectivityState(false);
211+
if (currentState == connectivityState.SHUTDOWN) {
212+
return;
213+
}
214+
215+
channel.getChannel().watchConnectivityState(currentState, Infinity, (e) => {
216+
channel.forceDebugHeadersOnNextRequest();
217+
self.setupDebugHeadersOnChannelTransition(channel);
218+
});
219+
}
220+
221+
193222
/**
194223
* Get AffinityConfig associated with a certain method.
195224
* @param methodName Method name of the request.
@@ -198,6 +227,14 @@ export function getGcpChannelFactoryClass(
198227
return this.methodToAffinity[methodName];
199228
}
200229

230+
shouldRequestDebugHeaders(lastRequested: Date | null) : boolean {
231+
if (this.debugHeaderIntervalSecs < 0) return true;
232+
else if (this.debugHeaderIntervalSecs == 0) return false;
233+
else if (!lastRequested) return true;
234+
235+
return new Date().getTime() - lastRequested.getTime() > this.debugHeaderIntervalSecs * 1000;
236+
}
237+
201238
/**
202239
* Bind channel with affinity key.
203240
* @param channelRef ChannelRef instance that contains the grpc channel.
@@ -232,7 +269,7 @@ export function getGcpChannelFactoryClass(
232269
*/
233270
close(): void {
234271
this.channelRefs.forEach(ref => {
235-
ref.getChannel().close();
272+
ref.close();
236273
});
237274
}
238275

Diff for: src/generated/grpc_gcp.d.ts

+6
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ export namespace grpc {
115115

116116
/** ChannelPoolConfig maxConcurrentStreamsLowWatermark */
117117
maxConcurrentStreamsLowWatermark?: (number|null);
118+
119+
/** ChannelPoolConfig debugHeaderIntervalSecs */
120+
debugHeaderIntervalSecs?: (number|null);
118121
}
119122

120123
/** Represents a ChannelPoolConfig. */
@@ -138,6 +141,9 @@ export namespace grpc {
138141
/** ChannelPoolConfig maxConcurrentStreamsLowWatermark. */
139142
public maxConcurrentStreamsLowWatermark: number;
140143

144+
/** ChannelPoolConfig debugHeaderIntervalSecs. */
145+
public debugHeaderIntervalSecs: number;
146+
141147
/**
142148
* Creates a new ChannelPoolConfig instance using the specified properties.
143149
* @param [properties] Properties to set

Diff for: src/generated/grpc_gcp.js

+22
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ $root.grpc = (function() {
273273
* @property {number|null} [minSize] ChannelPoolConfig minSize
274274
* @property {number|Long|null} [idleTimeout] ChannelPoolConfig idleTimeout
275275
* @property {number|null} [maxConcurrentStreamsLowWatermark] ChannelPoolConfig maxConcurrentStreamsLowWatermark
276+
* @property {number|null} [debugHeaderIntervalSecs] ChannelPoolConfig debugHeaderIntervalSecs
276277
*/
277278

278279
/**
@@ -322,6 +323,14 @@ $root.grpc = (function() {
322323
*/
323324
ChannelPoolConfig.prototype.maxConcurrentStreamsLowWatermark = 0;
324325

326+
/**
327+
* ChannelPoolConfig debugHeaderIntervalSecs.
328+
* @member {number} debugHeaderIntervalSecs
329+
* @memberof grpc.gcp.ChannelPoolConfig
330+
* @instance
331+
*/
332+
ChannelPoolConfig.prototype.debugHeaderIntervalSecs = 0;
333+
325334
/**
326335
* Creates a new ChannelPoolConfig instance using the specified properties.
327336
* @function create
@@ -354,6 +363,8 @@ $root.grpc = (function() {
354363
writer.uint32(/* id 3, wireType 0 =*/24).uint32(message.maxConcurrentStreamsLowWatermark);
355364
if (message.minSize != null && Object.hasOwnProperty.call(message, "minSize"))
356365
writer.uint32(/* id 4, wireType 0 =*/32).uint32(message.minSize);
366+
if (message.debugHeaderIntervalSecs != null && Object.hasOwnProperty.call(message, "debugHeaderIntervalSecs"))
367+
writer.uint32(/* id 5, wireType 0 =*/40).uint32(message.debugHeaderIntervalSecs);
357368
return writer;
358369
};
359370

@@ -400,6 +411,9 @@ $root.grpc = (function() {
400411
case 3:
401412
message.maxConcurrentStreamsLowWatermark = reader.uint32();
402413
break;
414+
case 5:
415+
message.debugHeaderIntervalSecs = reader.uint32();
416+
break;
403417
default:
404418
reader.skipType(tag & 7);
405419
break;
@@ -447,6 +461,9 @@ $root.grpc = (function() {
447461
if (message.maxConcurrentStreamsLowWatermark != null && message.hasOwnProperty("maxConcurrentStreamsLowWatermark"))
448462
if (!$util.isInteger(message.maxConcurrentStreamsLowWatermark))
449463
return "maxConcurrentStreamsLowWatermark: integer expected";
464+
if (message.debugHeaderIntervalSecs != null && message.hasOwnProperty("debugHeaderIntervalSecs"))
465+
if (!$util.isInteger(message.debugHeaderIntervalSecs))
466+
return "debugHeaderIntervalSecs: integer expected";
450467
return null;
451468
};
452469

@@ -477,6 +494,8 @@ $root.grpc = (function() {
477494
message.idleTimeout = new $util.LongBits(object.idleTimeout.low >>> 0, object.idleTimeout.high >>> 0).toNumber(true);
478495
if (object.maxConcurrentStreamsLowWatermark != null)
479496
message.maxConcurrentStreamsLowWatermark = object.maxConcurrentStreamsLowWatermark >>> 0;
497+
if (object.debugHeaderIntervalSecs != null)
498+
message.debugHeaderIntervalSecs = object.debugHeaderIntervalSecs >>> 0;
480499
return message;
481500
};
482501

@@ -502,6 +521,7 @@ $root.grpc = (function() {
502521
object.idleTimeout = options.longs === String ? "0" : 0;
503522
object.maxConcurrentStreamsLowWatermark = 0;
504523
object.minSize = 0;
524+
object.debugHeaderIntervalSecs = 0;
505525
}
506526
if (message.maxSize != null && message.hasOwnProperty("maxSize"))
507527
object.maxSize = message.maxSize;
@@ -514,6 +534,8 @@ $root.grpc = (function() {
514534
object.maxConcurrentStreamsLowWatermark = message.maxConcurrentStreamsLowWatermark;
515535
if (message.minSize != null && message.hasOwnProperty("minSize"))
516536
object.minSize = message.minSize;
537+
if (message.debugHeaderIntervalSecs != null && message.hasOwnProperty("debugHeaderIntervalSecs"))
538+
object.debugHeaderIntervalSecs = message.debugHeaderIntervalSecs;
517539
return object;
518540
};
519541

Diff for: src/index.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,13 @@ export = (grpc: GrpcModule) => {
6868
function gcpCallInvocationTransformer<RequestType, ResponseType>(
6969
callProperties: grpcType.CallProperties<RequestType, ResponseType>
7070
): grpcType.CallProperties<RequestType, ResponseType> {
71-
const channelFactory = callProperties.channel;
72-
if (!channelFactory || !(channelFactory instanceof GcpChannelFactory)) {
71+
if (!callProperties.channel || !(callProperties.channel instanceof GcpChannelFactory)) {
7372
// The gcpCallInvocationTransformer needs to use gcp channel factory.
7473
return callProperties;
7574
}
7675

76+
const channelFactory = callProperties.channel as GcpChannelFactoryInterface;
77+
7778
const argument = callProperties.argument;
7879
const metadata = callProperties.metadata;
7980
const call = callProperties.call;
@@ -152,6 +153,11 @@ export = (grpc: GrpcModule) => {
152153
: [];
153154
newCallOptions.interceptors = interceptors.concat([postProcessInterceptor]);
154155

156+
if (channelFactory.shouldRequestDebugHeaders(channelRef.getDebugHeadersRequestedAt())) {
157+
metadata.set('x-return-encrypted-headers', 'all_response');
158+
channelRef.notifyDebugHeadersRequested();
159+
}
160+
155161
return {
156162
argument,
157163
metadata,

Diff for: test/integration/local_service_test.js

+69
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
const protoLoader = require('@grpc/proto-loader');
2626
const assert = require('assert');
2727
const getGrpcGcpObjects = require('../../build/src');
28+
const { promisify } = require('util')
2829

2930
const PROTO_PATH = __dirname + '/../../protos/test_service.proto';
3031
const packageDef = protoLoader.loadSync(PROTO_PATH);
@@ -129,6 +130,74 @@ for (const grpcLibName of ['grpc', '@grpc/grpc-js']) {
129130
});
130131
});
131132

133+
// describe('Debug headers', () => {
134+
// let client;
135+
// beforeEach(() => {
136+
// const channelOptions = {
137+
// channelFactoryOverride: grpcGcp.gcpChannelFactoryOverride,
138+
// callInvocationTransformer: grpcGcp.gcpCallInvocationTransformer,
139+
// gcpApiConfig: grpcGcp.createGcpApiConfig({
140+
// channelPool: {
141+
// maxSize: 1,
142+
// maxConcurrentStreamsLowWatermark: 1,
143+
// debugHeaderIntervalSecs: 1
144+
// },
145+
// }),
146+
// };
147+
148+
// client = new Client(
149+
// 'localhost:' + port,
150+
// grpc.credentials.createInsecure(),
151+
// channelOptions
152+
// );
153+
// });
154+
// afterEach(() => {
155+
// client.close();
156+
// });
157+
// it('with unary call', async () => {
158+
// function makeCallAndReturnMeta() {
159+
// return new Promise((resolve, reject) => {
160+
// let lastMeta = null;
161+
162+
// const call = client.unary({}, new grpc.Metadata(), (err, data) => {
163+
// if (err) reject(err);
164+
// else resolve(lastMeta);
165+
// });
166+
167+
// call.on('metadata', meta => lastMeta = meta);
168+
// });
169+
// }
170+
// let m1 = await makeCallAndReturnMeta();
171+
// assert.deepStrictEqual(m1.get('x-return-encrypted-headers'), ['all_response']);
172+
173+
// let m2 = await makeCallAndReturnMeta();
174+
// assert.deepStrictEqual(m2.get('x-return-encrypted-headers'), []);
175+
176+
// await promisify(setTimeout)(1100);
177+
178+
// let m3 = await makeCallAndReturnMeta();
179+
// assert.deepStrictEqual(m3.get('x-return-encrypted-headers'), ['all_response']);
180+
// });
181+
// it('with server streaming call', async () => {
182+
// function makeCallAndReturnMeta() {
183+
// return new Promise((resolve, reject) => {
184+
// const call = client.serverStream({}, new grpc.Metadata());
185+
// call.on('metadata', meta => resolve(meta));
186+
// call.on('data', (d) => {})
187+
// });
188+
// }
189+
// let m1 = await makeCallAndReturnMeta();
190+
// assert.deepStrictEqual(m1.get('x-return-encrypted-headers'), ['all_response']);
191+
192+
// let m2 = await makeCallAndReturnMeta();
193+
// assert.deepStrictEqual(m2.get('x-return-encrypted-headers'), []);
194+
195+
// await promisify(setTimeout)(1100);
196+
197+
// let m3 = await makeCallAndReturnMeta();
198+
// assert.deepStrictEqual(m3.get('x-return-encrypted-headers'), ['all_response']);
199+
// });
200+
// });
132201
describe('Echo metadata', () => {
133202
let metadata;
134203
let client;

Diff for: test/integration/spanner_test.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ const grpcLibName = process.argv[argIndex + 1];
5555

5656
describe('Using ' + grpcLibName, () => {
5757
before(async function () {
58-
this.timeout(60000);
58+
this.timeout(120000);
5959
// Create test instance.
6060
console.log(`Creating instance ${instance.formattedName_}.`);
6161
const [, instOp] = await instance.create({
@@ -101,7 +101,7 @@ describe('Using ' + grpcLibName, () => {
101101
});
102102

103103
after(async function () {
104-
this.timeout(60000);
104+
this.timeout(120000);
105105
// Delete test instance.
106106
console.log(`Deleting instance ${instance.id}...`);
107107
await instance.delete();

0 commit comments

Comments
 (0)