Skip to content

Commit

Permalink
fix: ensure correct run context for '@elastic/elasticsearch' instrume…
Browse files Browse the repository at this point in the history
…ntation (#2550)

This is a re-write of the @elastic/elasticsearch instrumentation that
stops using the ES client observability events
    https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/observability.html
and switches to patching Transport#request() instead. This is necessary
to be able to bind that `request()` invocation to the RunContext for the
Span we've created, without using `apm.startSpan(...)`, which bleeds the
RunContext out to user code. (Patching Transport#request() is what the
legacy 'elasticsearch' instrumentation is also doing.)

Also: '@elastic/elasticsearch-canary' TAV tests were failing because an
ES service container was not being setup for its test run; we didn't see
this in CI before because the Jenkins TAV config doesn't have the
canary module.

Refs: #2430
  • Loading branch information
trentm authored Jan 31, 2022
1 parent bb32bc2 commit ddad434
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 213 deletions.
1 change: 1 addition & 0 deletions .ci/.jenkins_tav.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
TAV:
- '@elastic/elasticsearch'
- '@elastic/elasticsearch-canary'
- '@hapi/hapi'
- '@koa/router'
- apollo-server-express
Expand Down
2 changes: 1 addition & 1 deletion .ci/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ elif [[ -n "${TAV_MODULE}" ]]; then
cassandra-driver)
DOCKER_COMPOSE_FILE=docker-compose-cassandra.yml
;;
elasticsearch|@elastic/elasticsearch)
elasticsearch|@elastic/elasticsearch|@elastic/elasticsearch-canary)
DOCKER_COMPOSE_FILE=docker-compose-elasticsearch.yml
;;
mysql|mysql2)
Expand Down
2 changes: 1 addition & 1 deletion .tav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ elasticsearch:
commands: node test/instrumentation/modules/@elastic/elasticsearch.test.js
'@elastic/elasticsearch-canary':
name: '@elastic/elasticsearch-canary'
versions: '^8.0.0-canary.35'
versions: '>=8.0.0-canary.36 || >=8.1.0-canary.2'
node: '>=12'
commands: node test/instrumentation/modules/@elastic/elasticsearch-canary.test.js

Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Notes:
[float]
===== Bug fixes
* Fixes for run context handling for '@elastic/elasticsearch' instrumentation.
({issues}2430[#2430])
* Fixes for run context handling for 'cassandra-driver' instrumentation.
({issues}2430[#2430])
Expand Down
60 changes: 38 additions & 22 deletions examples/trace-elasticsearch7.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
// A small example showing Elastic APM tracing @elastic/elasticsearch version 7.
//
// This assumes an Elasticsearch running on localhost. You can use:
// npm run docker:start
// to start an Elasticsearch docker container (and other containers used for
// testing of this project). Then `npm run docker:stop` to stop them.
// npm run docker:start elasticsearch
// to start an Elasticsearch docker container. Then the following to stop:
// npm run docker:stop

const apm = require('../').start({ // elastic-apm-node
serviceName: 'example-trace-elasticsearch'
serviceName: 'example-trace-elasticsearch7',
logUncaughtExceptions: true
})

const { Client } = require('@elastic/elasticsearch')
Expand All @@ -17,27 +18,42 @@ const client = new Client({
node: `http://${process.env.ES_HOST || 'localhost'}:9200`
})

// For tracing spans to be created, there must be an active transaction.
// Typically, a transaction is automatically started for incoming HTTP
// requests to a Node.js server. However, because this script is not running
// an HTTP server, we manually start a transaction. More details at:
// https://www.elastic.co/guide/en/apm/agent/nodejs/current/custom-transactions.html
apm.startTransaction('t1')
client.ping()
.then((res) => { console.log('ping response:', res) })
.catch((err) => { console.log('ping error:', err) })
.finally(() => { apm.endTransaction() })

// Example using async/await style.
async function awaitStyle () {
apm.startTransaction('t2')
async function run () {
// For tracing spans to be created, there must be an active transaction.
// Typically, a transaction is automatically started for incoming HTTP
// requests to a Node.js server. However, because this script is not running
// an HTTP server, we manually start a transaction. More details at:
// https://www.elastic.co/guide/en/apm/agent/nodejs/current/custom-transactions.html
const t1 = apm.startTransaction('t1')

// Using await.
try {
const res = await client.search({ q: 'pants' })
console.log('search response:', res)
console.log('search succeeded: hits:', res.body.hits)
} catch (err) {
console.log('search error:', err)
console.log('search error:', err.message)
} finally {
apm.endTransaction()
t1.end()
}

// Using Promises directly.
const t2 = apm.startTransaction('t2')
client.ping()
.then(_res => { console.log('ping succeeded') })
.catch(err => { console.log('ping error:', err) })
// Another request to have two concurrent requests. Also use a bogus index
// to trigger an error and see APM error capture.
client.search({ index: 'no-such-index', q: 'pants' })
.then(_res => { console.log('search succeeded') })
.catch(err => { console.log('search error:', err.message) })
.finally(() => { t2.end() })

// Callback style.
const t3 = apm.startTransaction('t3')
client.ping(function (err, _res) {
console.log('ping', err ? `error ${err.name}: ${err.message}` : 'succeeded')
t3.end()
})
}
awaitStyle()

run()
94 changes: 50 additions & 44 deletions examples/trace-elasticsearch8.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,68 +3,74 @@
// A small example showing Elastic APM tracing @elastic/elasticsearch version 8.
//
// This assumes an Elasticsearch running on localhost. You can use:
// npm run docker:start
// to start an Elasticsearch docker container (and other containers used for
// testing of this project). Then `npm run docker:stop` to stop them.
// npm run docker:start elasticsearch
// to start an Elasticsearch docker container. Then the following to stop:
// npm run docker:stop

const apm = require('../').start({ // elastic-apm-node
serviceName: 'example-trace-elasticsearch'
serviceName: 'example-trace-elasticsearch8',
logUncaughtExceptions: true
})

// Currently, pre-releases of v8 are published as the "...-canary" package name.
const { Client } = require('@elastic/elasticsearch-canary')
// eslint-disable-next-line no-unused-vars
const { Client, HttpConnection } = require('@elastic/elasticsearch-canary')

const client = new Client({
// With version 8 of the client, you can use `HttpConnection` to use the old
// HTTP client:
// Connection: HttpConnection,
// Connection: HttpConnection,
node: `http://${process.env.ES_HOST || 'localhost'}:9200`
})

// For tracing spans to be created, there must be an active transaction.
// Typically, a transaction is automatically started for incoming HTTP
// requests to a Node.js server. However, because this script is not running
// an HTTP server, we manually start a transaction. More details at:
// https://www.elastic.co/guide/en/apm/agent/nodejs/current/custom-transactions.html
apm.startTransaction('t1')
client.ping()
.then((res) => { console.log('[example 1] ping response:', res) })
.catch((err) => { console.log('[example 1] ping error:', err) })
.finally(() => { apm.endTransaction() })
async function run () {
// For tracing spans to be created, there must be an active transaction.
// Typically, a transaction is automatically started for incoming HTTP
// requests to a Node.js server. However, because this script is not running
// an HTTP server, we manually start a transaction. More details at:
// https://www.elastic.co/guide/en/apm/agent/nodejs/current/custom-transactions.html
const t1 = apm.startTransaction('t1')

// Example using async/await style.
async function awaitStyle () {
apm.startTransaction('t2')
// Using await.
try {
const res = await client.search({ q: 'pants' })
console.log('[example 2] search response:', res)
console.log('[example 1] search succeeded: hits:', res.hits)
} catch (err) {
console.log('[example 2] search error:', err)
console.log('[example 1] search error:', err.message)
} finally {
apm.endTransaction()
t1.end()
}
}
awaitStyle()

// Example aborting requests using AbortController.
async function abortExample () {
apm.startTransaction('t3')
// eslint-disable-next-line no-undef
const ac = new AbortController()
setImmediate(() => {
ac.abort()
})
try {
const res = await client.search(
{ query: { match_all: {} } },
{ signal: ac.signal })
console.log('[example 3] search response:', res)
} catch (err) {
console.log('[example 3] search error:', err)
} finally {
apm.endTransaction()
// Using Promises directly.
const t2 = apm.startTransaction('t2')
client.ping()
.then(_res => { console.log('[example 2] ping succeeded') })
.catch(err => { console.log('[example 2] ping error:', err) })
// Another request to have two concurrent requests. Also use a bogus index
// to trigger an error and see APM error capture.
client.search({ index: 'no-such-index', q: 'pants' })
.then(_res => { console.log('[example 2] search succeeded') })
.catch(err => { console.log('[example 2] search error:', err.message) })
.finally(() => { t2.end() })

// Example aborting requests using AbortController (node v15 and above).
if (global.AbortController) {
const t3 = apm.startTransaction('t3')
const ac = new AbortController() // eslint-disable-line no-undef
setImmediate(() => {
ac.abort()
})
try {
const res = await client.search(
{ query: { match_all: {} } },
{ signal: ac.signal })
console.log('[example 3] search response:', res)
} catch (err) {
console.log('[example 3] search error:', err)
} finally {
t3.end()
}
}
}
if (global.AbortController) {
abortExample()
}

run()
27 changes: 13 additions & 14 deletions lib/instrumentation/elasticsearch-shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@ exports.pathIsAQuery = pathIsAQuery
// {"query":{"query_string":{"query":"pants"}}}
//
// `path`, `query`, and `body` can all be null or undefined.
exports.setElasticsearchDbContext = function (span, path, query, body, isLegacy) {
exports.setElasticsearchDbContext = function (span, path, query, body) {
if (path && pathIsAQuery.test(path)) {
// From @elastic/elasticsearch: A read of Transport.js suggests query and
// body will always be serialized strings, however the documented
// `TransportRequestParams` (`ConnectionRequestParams` in version 8)
// allows for non-strings, so we will be defensive.
//
// From legacy elasticsearch: query will be an object and body will be an
// object, or an array of objects, e.g. for bulk endpoints.
const parts = []
if (query) {
if (typeof (query) === 'string') {
parts.push(query)
} else if (isLegacy && typeof (query) === 'object') {
} else if (typeof (query) === 'object') {
const encodedQuery = querystring.encode(query)
if (encodedQuery) {
parts.push(encodedQuery)
Expand All @@ -51,12 +44,18 @@ exports.setElasticsearchDbContext = function (span, path, query, body, isLegacy)
if (body) {
if (typeof (body) === 'string') {
parts.push(body)
} else if (isLegacy) {
if (Array.isArray(body)) {
parts.push(body.map(JSON.stringify).join('\n')) // ndjson
} else if (typeof (body) === 'object') {
} else if (Buffer.isBuffer(body) || typeof body.pipe === 'function') {
// Never serialize a Buffer or a Readable. These guards mirror
// `shouldSerialize()` in the ES client, e.g.:
// https://github.com/elastic/elastic-transport-js/blob/069172506d1fcd544b23747d8c2d497bab053038/src/Transport.ts#L614-L618
} else if (Array.isArray(body)) {
try {
parts.push(body.map(JSON.stringify).join('\n') + '\n') // ndjson
} catch (_ignoredErr) {}
} else if (typeof (body) === 'object') {
try {
parts.push(JSON.stringify(body))
}
} catch (_ignoredErr) {}
}
}

Expand Down
Loading

0 comments on commit ddad434

Please sign in to comment.