Skip to content

Commit

Permalink
add open telemetry tracing (#170)
Browse files Browse the repository at this point in the history
Closes: #149
  • Loading branch information
HomelessDinosaur authored Apr 11, 2023
2 parents ba31660 + 6052bdb commit ded1901
Show file tree
Hide file tree
Showing 20 changed files with 2,713 additions and 2,387 deletions.
10 changes: 9 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@nitric/sdk",
"description": "Nitric NodeJS client sdk",
"nitric": "v0.20.0",
"nitric": "v0.22.0",
"author": "Nitric <https://github.com/nitrictech>",
"repository": "https://github.com/nitrictech/node-sdk",
"main": "lib/index.js",
Expand Down Expand Up @@ -31,6 +31,14 @@
],
"dependencies": {
"@grpc/grpc-js": "1.8.1",
"@opentelemetry/api": "^1.4.1",
"@opentelemetry/exporter-trace-otlp-http": "^0.36.1",
"@opentelemetry/instrumentation": "^0.36.1",
"@opentelemetry/instrumentation-grpc": "^0.36.1",
"@opentelemetry/instrumentation-http": "^0.36.1",
"@opentelemetry/resources": "^1.10.1",
"@opentelemetry/sdk-trace-node": "^1.10.1",
"@opentelemetry/semantic-conventions": "^1.10.1",
"google-protobuf": "3.14.0",
"tslib": "^2.1.0"
},
Expand Down
20 changes: 8 additions & 12 deletions src/faas/v0/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ describe('NitricTriggger.toGrpcTriggerResponse', () => {
let response: TriggerResponse;

beforeEach(() => {
const ctx: TriggerContext = TriggerContext.fromGrpcTriggerRequest(
request
);
const ctx: TriggerContext =
TriggerContext.fromGrpcTriggerRequest(request);
ctx.http.res.body = 'test';
response = HttpContext.toGrpcTriggerResponse(ctx);
});
Expand All @@ -168,9 +167,8 @@ describe('NitricTriggger.toGrpcTriggerResponse', () => {
let response: TriggerResponse;

beforeEach(() => {
const ctx: TriggerContext = TriggerContext.fromGrpcTriggerRequest(
request
);
const ctx: TriggerContext =
TriggerContext.fromGrpcTriggerRequest(request);
ctx.http.res.body = { any: 'object' };
response = HttpContext.toGrpcTriggerResponse(ctx);
});
Expand All @@ -190,9 +188,8 @@ describe('NitricTriggger.toGrpcTriggerResponse', () => {
let response: TriggerResponse;

beforeEach(() => {
const ctx: TriggerContext = TriggerContext.fromGrpcTriggerRequest(
request
);
const ctx: TriggerContext =
TriggerContext.fromGrpcTriggerRequest(request);
ctx.http.res.body = new TextEncoder().encode('response text');
response = HttpContext.toGrpcTriggerResponse(ctx);
});
Expand All @@ -214,9 +211,8 @@ describe('NitricTriggger.toGrpcTriggerResponse', () => {
let response: TriggerResponse;

beforeEach(() => {
const ctx: TriggerContext = TriggerContext.fromGrpcTriggerRequest(
request
);
const ctx: TriggerContext =
TriggerContext.fromGrpcTriggerRequest(request);
ctx.http.res.headers['Content-Type'] = ['application/json'];
ctx.http.res.body = new TextEncoder().encode(
'{"json":"which is already text"}'
Expand Down
70 changes: 54 additions & 16 deletions src/faas/v0/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import {
TopicResponseContext,
HttpResponseContext,
HeaderValue,
TraceContext,
} from '@nitric/api/proto/faas/v1/faas_pb';
import * as api from '@opentelemetry/api';
import { jsonResponse } from './json';

export abstract class TriggerContext<
Expand Down Expand Up @@ -89,9 +91,11 @@ export abstract class TriggerContext<

export abstract class AbstractRequest {
readonly data: string | Uint8Array;
readonly traceContext: api.Context;

protected constructor(data: string | Uint8Array) {
protected constructor(data: string | Uint8Array, traceContext: api.Context) {
this.data = data;
this.traceContext = traceContext;
}

text(): string {
Expand Down Expand Up @@ -122,6 +126,7 @@ interface HttpRequestArgs {
params: Record<string, string>;
query: Record<string, string[]>;
headers: Record<string, string[]>;
traceContext?: api.Context;
}

export class HttpRequest extends AbstractRequest {
Expand All @@ -131,8 +136,16 @@ export class HttpRequest extends AbstractRequest {
public readonly query: Record<string, string[]>;
public readonly headers: Record<string, string[] | string>;

constructor({ data, method, path, params, query, headers }: HttpRequestArgs) {
super(data);
constructor({
data,
method,
path,
params,
query,
headers,
traceContext,
}: HttpRequestArgs) {
super(data, traceContext);
this.method = method;
this.path = path;
this.params = params;
Expand Down Expand Up @@ -174,12 +187,28 @@ export class HttpResponse {
export class EventRequest extends AbstractRequest {
public readonly topic: string;

constructor(data: string | Uint8Array, topic: string) {
super(data);
constructor(
data: string | Uint8Array,
topic: string,
traceContext: api.Context
) {
super(data, traceContext);
this.topic = topic;
}
}

// Propagate the context to the root context
const getTraceContext = (traceContext: TraceContext): api.Context => {
const traceContextObject: Record<string, string> = traceContext
? traceContext
.getValuesMap()
.toObject()
.reduce((prev, [k, v]) => (prev[k] = v), {})
: {};

return api.propagation.extract(api.context.active(), traceContextObject);
};

export class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
public get http(): HttpContext {
return this;
Expand All @@ -189,23 +218,27 @@ export class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
const http = trigger.getHttp();
const ctx = new HttpContext();

const headers = ((http
.getHeadersMap()
// getEntryList claims to return [string, faas.HeaderValue][], but really returns [string, string[][]][]
// we force the type to match the real return type.
.getEntryList() as unknown) as [string, string[][]][]).reduce(
const headers = (
http
.getHeadersMap()
// getEntryList claims to return [string, faas.HeaderValue][], but really returns [string, string[][]][]
// we force the type to match the real return type.
.getEntryList() as unknown as [string, string[][]][]
).reduce(
(acc, [key, [val]]) => ({
...acc,
[key.toLowerCase()]: val.length === 1 ? val[0] : val,
}),
{}
);

const query = ((http
.getQueryParamsMap()
// getEntryList claims to return [string, faas.HeaderValue][], but really returns [string, string[][]][]
// we force the type to match the real return type.
.getEntryList() as unknown) as [string, string[][]][]).reduce(
const query = (
http
.getQueryParamsMap()
// getEntryList claims to return [string, faas.HeaderValue][], but really returns [string, string[][]][]
// we force the type to match the real return type.
.getEntryList() as unknown as [string, string[][]][]
).reduce(
(acc, [key, [val]]) => ({
...acc,
[key]: val.length === 1 ? val[0] : val,
Expand Down Expand Up @@ -259,6 +292,7 @@ export class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
// check for old headers if new headers is unpopulated. This is for backwards compatibility.
headers: Object.keys(headers).length ? headers : oldHeaders,
method: http.getMethod(),
traceContext: getTraceContext(trigger.getTraceContext()),
});

ctx.response = new HttpResponse({
Expand Down Expand Up @@ -330,7 +364,11 @@ export class EventContext extends TriggerContext<EventRequest, EventResponse> {
const topic = trigger.getTopic();
const ctx = new EventContext();

ctx.request = new EventRequest(trigger.getData_asU8(), topic.getTopic());
ctx.request = new EventRequest(
trigger.getData_asU8(),
topic.getTopic(),
getTraceContext(trigger.getTraceContext())
);

ctx.response = {
success: true,
Expand Down
14 changes: 7 additions & 7 deletions src/faas/v0/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ export const json = (): HttpMiddleware => (ctx: HttpContext, next) => {
* @param ctx HttpContext
* @returns HttpContext with body property set with an encoded JSON string and json headers set.
*/
export const jsonResponse = (ctx: HttpContext) => (
data: string | number | boolean | Record<string, any>
) => {
ctx.res.body = new TextEncoder().encode(JSON.stringify(data));
ctx.res.headers['Content-Type'] = ['application/json'];
export const jsonResponse =
(ctx: HttpContext) =>
(data: string | number | boolean | Record<string, any>) => {
ctx.res.body = new TextEncoder().encode(JSON.stringify(data));
ctx.res.headers['Content-Type'] = ['application/json'];

return ctx;
};
return ctx;
};
3 changes: 3 additions & 0 deletions src/faas/v0/start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {
ClientMessage,
} from '@nitric/api/proto/faas/v1/faas_pb';

jest.mock('./traceProvider');

// We only need to handle half of the duplex stream
class MockClientStream<Req, Resp> {
public receivedMessages: Req[] = [];
Expand Down Expand Up @@ -59,6 +61,7 @@ describe('faas.start', () => {
streamSpy = jest
.spyOn(FaasServiceClient.prototype, 'triggerStream')
.mockReturnValueOnce(mockStream as any);

const startPromise = start(f);
mockStream.emit('end', 'EOF');

Expand Down
17 changes: 13 additions & 4 deletions src/faas/v0/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as grpc from '@grpc/grpc-js';
import { SERVICE_BIND } from '../../constants';

import { FaasServiceClient } from '@nitric/api/proto/faas/v1/faas_grpc_pb';
Expand All @@ -36,19 +35,22 @@ import {
createHandler,
EventMiddleware,
GenericMiddleware,
HttpContext,
HttpMiddleware,
TriggerContext,
TriggerMiddleware,
} from '.';

import newTracerProvider from './traceProvider';

import {
ApiWorkerOptions,
CronWorkerOptions,
RateWorkerOptions,
SubscriptionWorkerOptions,
} from '../../resources';

import * as grpc from '@grpc/grpc-js';

class FaasWorkerOptions {}

type FaasClientOptions =
Expand Down Expand Up @@ -117,6 +119,8 @@ export class Faas {
* @returns a promise that resolves when the server terminates
*/
async start(...handlers: TriggerMiddleware[]): Promise<void> {
const provider = newTracerProvider();

this.anyHandler = handlers.length && createHandler(...handlers);
if (!this.httpHandler && !this.eventHandler && !this.anyHandler) {
throw new Error('A handler function must be provided.');
Expand Down Expand Up @@ -151,10 +155,12 @@ export class Faas {
let triggerType = 'Unknown';
if (ctx.http) {
triggerType = 'HTTP';
handler = this.getHttpHandler() as GenericMiddleware<TriggerContext>;
handler =
this.getHttpHandler() as GenericMiddleware<TriggerContext>;
} else if (ctx.event) {
triggerType = 'Event';
handler = this.getEventHandler() as GenericMiddleware<TriggerContext>;
handler =
this.getEventHandler() as GenericMiddleware<TriggerContext>;
} else {
console.error(
`received an unexpected trigger type, are you using an outdated version of the SDK?`
Expand Down Expand Up @@ -261,6 +267,9 @@ export class Faas {
res();
});
});

// Shutdown the trace provider, flushing the stream and stopping listeners
await provider?.shutdown();
}
}

Expand Down
67 changes: 67 additions & 0 deletions src/faas/v0/traceProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
ConsoleSpanExporter,
BatchSpanProcessor,
NodeTracerProvider,
} from '@opentelemetry/sdk-trace-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { Resource } from '@opentelemetry/resources';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { GrpcInstrumentation } from '@opentelemetry/instrumentation-grpc';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-node';

/**
* Creates a new node tracer provider
* If it is a local run, it will output to the console. If it is run on the cloud it will output to localhost:4317
* @returns a tracer provider
*/
const newTracerProvider = (): NodeTracerProvider => {
// Add trace provider
const localRun = !process.env.OTELCOL_BIN;
const samplePercentage = localRun
? 100 // local default to 100
: Number.parseInt(process.env.NITRIC_TRACE_SAMPLE_PERCENT) || 0;

const provider = new NodeTracerProvider({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: process.env.NITRIC_STACK ?? '',
[SemanticResourceAttributes.SERVICE_VERSION]:
process.env.npm_package_version ?? '0.0.1',
}),
sampler: new TraceIdRatioBasedSampler(samplePercentage),
});

registerInstrumentations({
instrumentations: [new HttpInstrumentation(), new GrpcInstrumentation()],
tracerProvider: provider,
});

const traceExporter = localRun // If running locally
? new ConsoleSpanExporter()
: new OTLPTraceExporter({
url: 'http://localhost:4317',
});

const processor = new BatchSpanProcessor(traceExporter);

provider.addSpanProcessor(processor);
provider.register();

return provider;
};

export default newTracerProvider;
8 changes: 7 additions & 1 deletion src/gen/proto/document/v1/document_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@

var jspb = require('google-protobuf');
var goog = jspb;
var global = Function('return this')();
var global = (function() {
if (this) { return this; }
if (typeof window !== 'undefined') { return window; }
if (typeof global !== 'undefined') { return global; }
if (typeof self !== 'undefined') { return self; }
return Function('return this')();
}.call(null));

var google_protobuf_struct_pb = require('google-protobuf/google/protobuf/struct_pb.js');
goog.object.extend(proto, google_protobuf_struct_pb);
Expand Down
Loading

0 comments on commit ded1901

Please sign in to comment.