From f8f6f785537db3902948d738536f97f72605facc Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Tue, 4 Jan 2022 14:45:20 -0800 Subject: [PATCH] fix: ensure correct run-context for pg instrumentation (#2506) Refs: #2430 --- CHANGELOG.asciidoc | 2 + examples/trace-pg.js | 16 ++- lib/instrumentation/modules/pg.js | 138 ++++++++++----------- test/instrumentation/modules/pg/pg.test.js | 29 ++++- 4 files changed, 106 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0bb91b3d1a..625b72763c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -45,6 +45,8 @@ Notes: AWS span. This change also ensures captured errors from failing client commands are a child of the AWS span. ({issues}2430[#2430]) +* Fixes for run context handling for 'pg' instrumentation. ({issues}2430[#2430]) + [[release-notes-3.26.0]] ==== 3.26.0 2021/12/07 diff --git a/examples/trace-pg.js b/examples/trace-pg.js index a7ea623649..353e36fe12 100755 --- a/examples/trace-pg.js +++ b/examples/trace-pg.js @@ -3,9 +3,8 @@ // // By default this will use a Postgres on localhost with user 'postgres'. // You can use: -// npm run docker:start -// to start a Postgres container (and other containers used for testing of -// this project). +// npm run docker:start postgres +// to start a Postgres container. Then `npm run docker:stop` to stop it. const apm = require('../').start({ // elastic-apm-node serviceName: 'example-trace-pg' @@ -27,12 +26,11 @@ client.connect(function (err) { // an HTTP server, we manually start a transaction. More details at: // https://www.elastic.co/guide/en/apm/agent/nodejs/current/custom-transactions.html apm.startTransaction('t1') -client.query('SELECT $1::text as message', ['Hello world!'], (err, res) => { - if (err) { - console.log('[t1] Failure: err is', err) - } else { - console.log('[t1] Success: message is %s', res.rows[0].message) - } +client.query('SELECT $1::text as message', ['hi'], (err, res) => { + console.log('[t1] err=%s res=%s', err && err.message, !err && res.rows[0].message) +}) +client.query('SELECT $1::text as message', ['bye'], (err, res) => { + console.log('[t1] err=%s res=%s', err && err.message, !err && res.rows[0].message) apm.endTransaction() }) diff --git a/lib/instrumentation/modules/pg.js b/lib/instrumentation/modules/pg.js index 5fed898176..4a34fd12a8 100644 --- a/lib/instrumentation/modules/pg.js +++ b/lib/instrumentation/modules/pg.js @@ -10,12 +10,15 @@ var symbols = require('../../symbols') var { getDBDestination } = require('../context') module.exports = function (pg, agent, { version, enabled }) { + if (!enabled) { + return pg + } if (!semver.satisfies(version, '>=4.0.0 <9.0.0')) { agent.logger.debug('pg version %s not supported - aborting...', version) return pg } - patchClient(pg.Client, 'pg.Client', agent, enabled) + patchClient(pg.Client, 'pg.Client', agent) // Trying to access the pg.native getter will trigger and log the warning // "Cannot find module 'pg-native'" to STDERR if the module isn't installed. @@ -29,7 +32,7 @@ module.exports = function (pg, agent, { version, enabled }) { pg.__defineGetter__('native', function () { var native = getter() if (native && native.Client) { - patchClient(native.Client, 'pg.native.Client', agent, enabled) + patchClient(native.Client, 'pg.native.Client', agent) } return native }) @@ -38,86 +41,83 @@ module.exports = function (pg, agent, { version, enabled }) { return pg } -function patchClient (Client, klass, agent, enabled) { - if (!enabled) return - +function patchClient (Client, klass, agent) { agent.logger.debug('shimming %s.prototype.query', klass) shimmer.wrap(Client.prototype, 'query', wrapQuery) function wrapQuery (orig, name) { return function wrappedFunction (sql) { - var span = agent.startSpan('SQL', 'db', 'postgresql', 'query') - var id = span && span.transaction.id - - if (sql && typeof sql.text === 'string') sql = sql.text - - agent.logger.debug('intercepted call to %s.prototype.%s %o', klass, name, { id: id, sql: sql }) + agent.logger.debug('intercepted call to %s.prototype.%s', klass, name) + const ins = agent._instrumentation + const span = ins.createSpan('SQL', 'db', 'postgresql', 'query') + if (!span) { + return orig.apply(this, arguments) + } - if (span) { - // get connection parameters from Client - let host, port - if (typeof this.connectionParameters === 'object') { - ({ host, port } = this.connectionParameters) - } - span.setDestinationContext(getDBDestination(span, host, port)) + let sqlText = sql + if (sql && typeof sql.text === 'string') { + sqlText = sql.text + } + if (typeof sqlText === 'string') { + span.setDbContext({ statement: sqlText, type: 'sql' }) + span.name = sqlSummary(sqlText) + } else { + agent.logger.debug('unable to parse sql form pg module (type: %s)', typeof sqlText) + } - var args = arguments - var index = args.length - 1 - var cb = args[index] + // Get connection parameters from Client. + let host, port + if (typeof this.connectionParameters === 'object') { + ({ host, port } = this.connectionParameters) + } + span.setDestinationContext(getDBDestination(span, host, port)) - if (this[symbols.knexStackObj]) { - span.customStackTrace(this[symbols.knexStackObj]) - this[symbols.knexStackObj] = null - } + if (this[symbols.knexStackObj]) { + span.customStackTrace(this[symbols.knexStackObj]) + this[symbols.knexStackObj] = null + } - if (Array.isArray(cb)) { - index = cb.length - 1 - cb = cb[index] - } + let index = arguments.length - 1 + let cb = arguments[index] + if (Array.isArray(cb)) { + index = cb.length - 1 + cb = cb[index] + } - if (typeof sql === 'string') { - span.setDbContext({ statement: sql, type: 'sql' }) - span.name = sqlSummary(sql) + const spanRunContext = ins.currRunContext().enterSpan(span) + const onQueryEnd = ins.bindFunctionToRunContext(spanRunContext, (_err) => { + agent.logger.debug('intercepted end of %s.prototype.%s', klass, name) + span.end() + }) + + if (typeof cb === 'function') { + arguments[index] = ins.bindFunction((err, res) => { + onQueryEnd(err) + return cb(err, res) + }) + return orig.apply(this, arguments) + } else { + var queryOrPromise = orig.apply(this, arguments) + + // It is important to prefer `.on` to `.then` for pg <7 >=6.3.0, because + // `query.then` is broken in those versions. See + // https://github.com/brianc/node-postgres/commit/b5b49eb895727e01290e90d08292c0d61ab86322#r23267714 + if (typeof queryOrPromise.on === 'function') { + queryOrPromise.on('end', onQueryEnd) + queryOrPromise.on('error', onQueryEnd) + if (queryOrPromise instanceof EventEmitter) { + ins.bindEmitter(queryOrPromise) + } + } else if (typeof queryOrPromise.then === 'function') { + queryOrPromise.then( + () => { onQueryEnd() }, + onQueryEnd + ) } else { - agent.logger.debug('unable to parse sql form pg module (type: %s)', typeof sql) - } - - const onQueryEnd = (_err) => { - agent.logger.debug('intercepted end of %s.prototype.%s %o', klass, name, { id: id }) - span.end() + agent.logger.debug('ERROR: unknown pg query type: %s', typeof queryOrPromise) } - if (typeof cb === 'function') { - args[index] = agent._instrumentation.bindFunction((err, res) => { - onQueryEnd(err) - return cb(err, res) - }) - return orig.apply(this, arguments) - } else { - var queryOrPromise = orig.apply(this, arguments) - - // It is import to prefer `.on` to `.then` for pg <7 >=6.3.0, because - // `query.then` is broken in those versions. See - // https://github.com/brianc/node-postgres/commit/b5b49eb895727e01290e90d08292c0d61ab86322#r23267714 - if (typeof queryOrPromise.on === 'function') { - queryOrPromise.on('end', onQueryEnd) - queryOrPromise.on('error', onQueryEnd) - if (queryOrPromise instanceof EventEmitter) { - agent._instrumentation.bindEmitter(queryOrPromise) - } - } else if (typeof queryOrPromise.then === 'function') { - queryOrPromise.then( - () => { onQueryEnd() }, - onQueryEnd - ) - } else { - agent.logger.debug('ERROR: unknown pg query type: %s %o', typeof queryOrPromise, { id: id }) - } - - return queryOrPromise - } - } else { - return orig.apply(this, arguments) + return queryOrPromise } } } diff --git a/test/instrumentation/modules/pg/pg.test.js b/test/instrumentation/modules/pg/pg.test.js index e451003155..059fd731bf 100644 --- a/test/instrumentation/modules/pg/pg.test.js +++ b/test/instrumentation/modules/pg/pg.test.js @@ -42,6 +42,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') queryable.query(sql, basicQueryCallback(t)) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') }) }) @@ -54,6 +55,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') queryable.query(sql, [1], basicQueryCallback(t)) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') }) }) @@ -66,6 +68,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') queryable.query({ text: sql }, basicQueryCallback(t)) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') }) }) @@ -78,6 +81,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') queryable.query({ text: sql }, [1], basicQueryCallback(t)) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') }) }) @@ -90,6 +94,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') queryable.query({ text: sql, values: [1] }, basicQueryCallback(t)) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') }) }) @@ -102,6 +107,7 @@ factories.forEach(function (f) { factory(function () { var trans = agent.startTransaction('foo') queryable.query(sql) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') setTimeout(function () { trans.end() }, 250) @@ -121,6 +127,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var stream = queryable.query(new pg.Query(sql)) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryStream(stream, t) }) }) @@ -136,6 +143,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var stream = queryable.query(sql) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryStream(stream, t) }) }) @@ -149,6 +157,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var stream = queryable.query(sql, [1]) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryStream(stream, t) }) }) @@ -162,6 +171,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var stream = queryable.query({ text: sql }) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryStream(stream, t) }) }) @@ -175,6 +185,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var stream = queryable.query({ text: sql }, [1]) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryStream(stream, t) }) }) @@ -188,6 +199,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var stream = queryable.query({ text: sql, values: [1] }) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryStream(stream, t) }) }) @@ -206,6 +218,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var p = queryable.query(sql) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryPromise(p, t) }) }) @@ -219,6 +232,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var p = queryable.query(sql, [1]) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryPromise(p, t) }) }) @@ -232,6 +246,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var p = queryable.query({ text: sql }) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryPromise(p, t) }) }) @@ -245,6 +260,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var p = queryable.query({ text: sql }, [1]) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryPromise(p, t) }) }) @@ -258,6 +274,7 @@ factories.forEach(function (f) { factory(function () { agent.startTransaction('foo') var p = queryable.query({ text: sql, values: [1] }) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') basicQueryPromise(p, t) }) }) @@ -275,6 +292,7 @@ factories.forEach(function (f) { t.strictEqual(trans.name, 'foo') data.spans.forEach(function (span) { assertSpan(t, span, sql) + t.equal(span.parent_id, trans.id, 'each span is a child of the transaction') }) t.end() @@ -322,6 +340,7 @@ factories.forEach(function (f) { const span = findObjInArray(data.spans, 'transaction_id', trans.id) t.ok(span, 'transaction should have span') assertSpan(t, span, sql) + t.equal(span.parent_id, trans.id, 'the span is a child of the transaction') }) t.end() @@ -374,6 +393,7 @@ if (global.Promise || semver.satisfies(pgVersion, '<6')) { t.strictEqual(trans.name, 'foo') data.spans.forEach(function (span) { assertSpan(t, span, sql) + t.equal(span.parent_id, trans.id, 'each span is a child of the transaction') }) t.end() @@ -438,6 +458,7 @@ if (global.Promise || semver.satisfies(pgVersion, '<6')) { connector(function (err, client, release) { t.error(err) client.query(sql, basicQueryCallback(t)) + t.ok(agent.currentSpan === null, 'no currentSpan in sync code after pg .query') if (semver.gte(pgVersion, '7.5.0')) { release() @@ -480,7 +501,9 @@ if (global.Promise || semver.satisfies(pgVersion, '<6')) { .then(function () { t.fail('query should have rejected') }) - .catch(function () {}) + .catch(function () { + t.ok(agent.currentSpan === null, 'no currentSpan in promise catch after pg .query') + }) .then(function () { setTimeout(function () { release() @@ -495,6 +518,7 @@ if (global.Promise || semver.satisfies(pgVersion, '<6')) { function basicQueryCallback (t) { return function queryCallback (err, result, fields) { + t.ok(agent.currentSpan === null, 'no currentSpan in pg .query callback') t.error(err) t.strictEqual(result.rows[0].solution, 2) agent.endTransaction() @@ -504,13 +528,16 @@ function basicQueryCallback (t) { function basicQueryStream (stream, t) { var results = 0 stream.on('error', function (err) { + t.ok(agent.currentSpan === null, 'pg span should not be active in user code') t.error(err) }) stream.on('row', function (row) { + t.ok(agent.currentSpan === null, 'pg span should not be active in user code') results++ t.strictEqual(row.solution, 2) }) stream.on('end', function () { + t.ok(agent.currentSpan === null, 'pg span should not be active in user code') t.strictEqual(results, 1) agent.endTransaction() })