Skip to content

Commit

Permalink
feat: 'elasticsearchCaptureBodyUrls' config option (#2873)
Browse files Browse the repository at this point in the history
This adds a new config option, per the
'elasticsearch_capture_body_urls' cross-agent spec, to control for which
ES client requests the request body is captured. By default it is just
the body of search-related APIs. The use case for configuring this is
to capture for *all* requests for reply in Kibana performance work. This
option is not expected to be used often.
  • Loading branch information
trentm authored Aug 18, 2022
1 parent a950fe8 commit ee08f45
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 162 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ Notes:
[float]
===== Features
- Improve the captured information for Elasticsearch client instrumentation.
For all outgoing Elasticsearch client requests, the full HTTP url is
now captured (stored in the "url.original" field). For Elasticsearch requests
that do a search the outgoing request body is captured (to the
"span.db.statement" field), as before, but the format has changed to only
hold the request body. Before this change the "span.db.statement" would
also hold any HTTP query parameters. These are now more naturally captured
with "url.origin". ({issues}2019[#2019])
+
This change also introduces the <<elasticsearch-capture-body-urls>>
configuration option to enable controlling which Elasticsearch REST API
paths are considered for body capture. ({pull}2873[#2873])
- Support instrumenting core modules when require'd with the optional
https://nodejs.org/api/modules.html#core-modules['node:'-prefix].
For example `require('node:http')` will now be instrumented.
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1356,3 +1356,15 @@ require('elastic-apm-node').start({
exitSpanMinDuration: '10ms'
})
----

[[elasticsearch-capture-body-urls]]
==== `elasticsearchCaptureBodyUrls`

* *Type:* Array of wildcard patterns
* *Default:* `['*/_search', '*/_search/template', '*/_msearch', '*/_msearch/template', '*/_async_search', '*/_count', '*/_sql', '*/_eql/search' ]`
* *Env:* `ELASTIC_APM_ELASTICSEARCH_CAPTURE_BODY_URLS`
// Spec name: elasticsearch_capture_body_urls

The URL path patterns for which the APM agent will capture the request body of outgoing requests to Elasticsearch made with the `@elastic/elasticsearch` module (or the legacy `elasticsearch` module). The default setting captures the body for https://www.elastic.co/guide/en/elasticsearch/reference/current/rest-apis.html[Elasticsearch REST APIs] making a search.

The captured request body (if any) is stored on the `span.db.statement` field. Captured request bodies are truncated to a maximum length defined by <<long-field-max-length>>.
2 changes: 2 additions & 0 deletions examples/trace-elasticsearch7.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const apm = require('../').start({ // elastic-apm-node
logUncaughtExceptions: true
})

// Note that version 7 is *not* installed by default. To use v7 you'll need to:
// npm install @elastic/elasticsearch@7
const { Client } = require('@elastic/elasticsearch')

const client = new Client({
Expand Down
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ declare namespace apm {
contextPropagationOnly?: boolean;
disableInstrumentations?: string | string[];
disableSend?: boolean;
elasticsearchCaptureBodyUrls?: Array<string>;
environment?: string;
errorMessageMaxLength?: string; // DEPRECATED: use `longFieldMaxLength`.
errorOnAbortedRequests?: boolean;
Expand Down
19 changes: 19 additions & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ var DEFAULTS = {
contextPropagationOnly: false,
disableInstrumentations: [],
disableSend: false,
elasticsearchCaptureBodyUrls: [
'*/_search', '*/_search/template', '*/_msearch', '*/_msearch/template',
'*/_async_search', '*/_count', '*/_sql', '*/_eql/search'
],
environment: process.env.NODE_ENV || 'development',
errorOnAbortedRequests: false,
exitSpanMinDuration: '0ms',
Expand Down Expand Up @@ -125,6 +129,7 @@ var ENV_TABLE = {
environment: 'ELASTIC_APM_ENVIRONMENT',
exitSpanMinDuration: 'ELASTIC_APM_EXIT_SPAN_MIN_DURATION',
ignoreMessageQueues: ['ELASTIC_IGNORE_MESSAGE_QUEUES', 'ELASTIC_APM_IGNORE_MESSAGE_QUEUES'],
elasticsearchCaptureBodyUrls: 'ELASTIC_APM_ELASTICSEARCH_CAPTURE_BODY_URLS',
errorMessageMaxLength: 'ELASTIC_APM_ERROR_MESSAGE_MAX_LENGTH',
errorOnAbortedRequests: 'ELASTIC_APM_ERROR_ON_ABORTED_REQUESTS',
filterHttpHeaders: 'ELASTIC_APM_FILTER_HTTP_HEADERS',
Expand Down Expand Up @@ -289,6 +294,7 @@ var MINUS_ONE_EQUAL_INFINITY = [

var ARRAY_OPTS = [
'disableInstrumentations',
'elasticsearchCaptureBodyUrls',
'sanitizeFieldNames',
'transactionIgnoreUrls',
'ignoreMessageQueues'
Expand Down Expand Up @@ -336,6 +342,7 @@ function initialConfig (logger) {
cfg.ignoreUrlRegExp = []
cfg.ignoreUserAgentStr = []
cfg.ignoreUserAgentRegExp = []
cfg.elasticsearchCaptureBodyUrlsRegExp = []
cfg.transactionIgnoreUrlRegExp = []
cfg.sanitizeFieldNamesRegExp = []
cfg.ignoreMessageQueuesRegExp = []
Expand All @@ -356,6 +363,7 @@ class Config {
this.ignoreUrlRegExp = []
this.ignoreUserAgentStr = []
this.ignoreUserAgentRegExp = []
this.elasticsearchCaptureBodyUrlsRegExp = []
this.transactionIgnoreUrlRegExp = []
this.sanitizeFieldNamesRegExp = []
this.ignoreMessageQueuesRegExp = []
Expand Down Expand Up @@ -700,6 +708,7 @@ function normalize (opts, logger) {
normalizeDurationOptions(opts, logger)
normalizeBools(opts, logger)
normalizeIgnoreOptions(opts)
normalizeElasticsearchCaptureBodyUrls(opts)
normalizeSanitizeFieldNames(opts)
normalizeContextManager(opts, logger) // Must be after normalizeBools().
normalizeCloudProvider(opts, logger)
Expand Down Expand Up @@ -926,6 +935,16 @@ function normalizeIgnoreOptions (opts) {
}
}

function normalizeElasticsearchCaptureBodyUrls (opts) {
if (opts.elasticsearchCaptureBodyUrls) {
const wildcard = new WildcardMatcher()
for (const ptn of opts.elasticsearchCaptureBodyUrls) {
const re = wildcard.compile(ptn)
opts.elasticsearchCaptureBodyUrlsRegExp.push(re)
}
}
}

function normalizeNumbers (opts) {
for (const key of NUM_OPTS) {
if (key in opts) opts[key] = Number(opts[key])
Expand Down
100 changes: 43 additions & 57 deletions lib/instrumentation/elasticsearch-shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,64 +10,50 @@
// - elasticsearch - the legacy Elasticsearch JS client
// - @elastic/elasticsearch - the new Elasticsearch JS client

const querystring = require('querystring')

// URL paths matching the following pattern will have their query params and
// request body captured in the span (as `context.db.statement`). We match
// a complete URL path component to attempt to avoid accidental matches of
// user data, like `GET /my_index_search/...`.
const pathIsAQuery = /\/(_search|_msearch|_count|_async_search|_sql|_eql)(\/|$)/

// (This is exported for testing.)
exports.pathIsAQuery = pathIsAQuery

// Set the span's `context.db` from the Elasticsearch request querystring and
// body, if the request path looks like it is a query API.
//
// Some ES endpoints, e.g. '_search', support both query params and a body.
// We encode both into 'span.context.db.statement', separated by '\n\n'
// if both are present. E.g. for a possible msearch:
//
// search_type=query_then_fetch&typed_keys=false
//
// {}
// {"query":{"query_string":{"query":"pants"}}}
//
// `path`, `query`, and `body` can all be null or undefined.
exports.setElasticsearchDbContext = function (span, path, query, body) {
if (path && pathIsAQuery.test(path)) {
const parts = []
if (query) {
if (typeof (query) === 'string') {
parts.push(query)
} else if (typeof (query) === 'object') {
const encodedQuery = querystring.encode(query)
if (encodedQuery) {
parts.push(encodedQuery)
}
}
}
if (body) {
if (typeof (body) === 'string') {
parts.push(body)
} else if (Buffer.isBuffer(body) || typeof body.pipe === 'function') {
// Never serialize a Buffer or a Readable. These guards mirror
// `shouldSerialize()` in the ES client, e.g.:
// https://github.com/elastic/elastic-transport-js/blob/069172506d1fcd544b23747d8c2d497bab053038/src/Transport.ts#L614-L618
} else if (Array.isArray(body)) {
try {
parts.push(body.map(JSON.stringify).join('\n') + '\n') // ndjson
} catch (_ignoredErr) {}
} else if (typeof (body) === 'object') {
try {
parts.push(JSON.stringify(body))
} catch (_ignoredErr) {}
}
// Only capture the ES request body if the request path matches the
// `elasticsearchCaptureBodyUrls` config.
function shouldCaptureBody (path, elasticsearchCaptureBodyUrlsRegExp) {
if (!path) {
return false
}
for (var i = 0; i < elasticsearchCaptureBodyUrlsRegExp.length; i++) {
const re = elasticsearchCaptureBodyUrlsRegExp[i]
if (re.test(path)) {
return true
}
}
return false
}

span.setDbContext({
type: 'elasticsearch',
statement: parts.join('\n\n')
})
/**
* Get an appropriate `span.context.db.statement` for this ES client request, if any.
* https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-db.md#elasticsearch_capture_body_urls-configuration
*
* @param {string | null} path
* @param {string | null} body
* @param {RegExp[]} elasticsearchCaptureBodyUrlsRegExp
* @return {string | undefined}
*/
function getElasticsearchDbStatement (path, body, elasticsearchCaptureBodyUrlsRegExp) {
if (body && shouldCaptureBody(path, elasticsearchCaptureBodyUrlsRegExp)) {
if (typeof (body) === 'string') {
return body
} else if (Buffer.isBuffer(body) || typeof body.pipe === 'function') {
// Never serialize a Buffer or a Readable. These guards mirror
// `shouldSerialize()` in the ES client, e.g.:
// https://github.com/elastic/elastic-transport-js/blob/069172506d1fcd544b23747d8c2d497bab053038/src/Transport.ts#L614-L618
} else if (Array.isArray(body)) {
try {
return body.map(JSON.stringify).join('\n') + '\n' // ndjson
} catch (_ignoredErr) {}
} else if (typeof (body) === 'object') {
try {
return JSON.stringify(body)
} catch (_ignoredErr) {}
}
}
}

module.exports = {
getElasticsearchDbStatement
}
58 changes: 42 additions & 16 deletions lib/instrumentation/modules/@elastic/elasticsearch.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
// 2. Users cannot access `apm.currentSpan` inside a diagnostic event handler.

const semver = require('semver')
const { URL, URLSearchParams } = require('url')

const { getDBDestination } = require('../../context')
const { setElasticsearchDbContext } = require('../../elasticsearch-shared')
const { getElasticsearchDbStatement } = require('../../elasticsearch-shared')
const shimmer = require('../../shimmer')

module.exports = function (elasticsearch, agent, { version, enabled }) {
Expand All @@ -53,6 +54,7 @@ module.exports = function (elasticsearch, agent, { version, enabled }) {
const doubleCallsRequestIfNoCb = semver.lt(version, '7.7.0')
const ins = agent._instrumentation
const isGteV8 = semver.satisfies(version, '>=8', { includePrerelease: true })
const elasticsearchCaptureBodyUrlsRegExp = agent._conf.elasticsearchCaptureBodyUrlsRegExp

agent.logger.debug('shimming elasticsearch.Transport.prototype.{request,getConnection}')
shimmer.wrap(elasticsearch.Transport && elasticsearch.Transport.prototype, 'request', wrapRequest)
Expand Down Expand Up @@ -127,15 +129,20 @@ module.exports = function (elasticsearch, agent, { version, enabled }) {
const spanRunContext = parentRunContext.enterSpan(span)
const finish = ins.bindFunctionToRunContext(spanRunContext, (err, result) => {
// Set DB context.
// In @elastic/elasticsearch@7, `Transport#request` encodes
// `params.{querystring,body}` in-place; use it. In >=8 this encoding is
// no longer in-place. A better eventual solution would be to wrap
// `Connection.request` to capture the serialized params.
setElasticsearchDbContext(
span,
params && params.path,
params && params.querystring,
params && (params.body || params.bulkBody))
const dbContext = {
type: 'elasticsearch'
}
if (params) {
const statement = getElasticsearchDbStatement(
params.path,
params.body || params.bulkBody,
elasticsearchCaptureBodyUrlsRegExp
)
if (statement) {
dbContext.statement = statement
}
}
span.setDbContext(dbContext)

// Set destination context.
// Use the connection from wrappedGetConnection() above, if that worked.
Expand All @@ -152,7 +159,7 @@ module.exports = function (elasticsearch, agent, { version, enabled }) {
span.setDestinationContext(getDBDestination(span,
connUrl && connUrl.hostname, connUrl && connUrl.port))

// Gather some HTTP context from the "DiagnosticResult" object.
// Gather some HTTP context.
// We are *not* including the response headers b/c they are boring:
//
// X-elastic-product: Elasticsearch
Expand All @@ -170,6 +177,8 @@ module.exports = function (elasticsearch, agent, { version, enabled }) {
// The result is that with Promise usage of v7, ES client requests that
// are queued behind the "product-check" and that reject, won't have a
// `diagResult`.
const httpContext = {}
let haveHttpContext = false
let diagResult = isGteV8 ? null : result
if (!diagResult) {
diagResult = diagResultFromSpan.get(span)
Expand All @@ -178,12 +187,11 @@ module.exports = function (elasticsearch, agent, { version, enabled }) {
}
}
if (diagResult) {
const httpContext = {}
let haveHttpContext = false
if (diagResult.statusCode) {
haveHttpContext = true
httpContext.status_code = diagResult.statusCode
}

// *Not* currently adding headers because
if (diagResult.headers && 'content-length' in diagResult.headers) {
const contentLength = Number(diagResult.headers['content-length'])
Expand All @@ -192,9 +200,27 @@ module.exports = function (elasticsearch, agent, { version, enabled }) {
httpContext.response = { encoded_body_size: contentLength }
}
}
if (haveHttpContext) {
span.setHttpContext(httpContext)
}
}

// Reconstruct the full URL (including query params).
let origin
if (connUrl) {
origin = connUrl.origin
} else if (diagResult && diagResult.meta && diagResult.meta.connection && diagResult.meta.connection.url) {
try {
origin = new URL(diagResult.meta.connection.url).origin
} catch (_ignoredErr) {}
}
if (origin && params && params.path) {
const fullUrl = new URL(origin)
fullUrl.pathname = params.path
fullUrl.search = new URLSearchParams(params.querystring).toString()
httpContext.url = fullUrl.toString()
haveHttpContext = true
}

if (haveHttpContext) {
span.setHttpContext(httpContext)
}

if (err) {
Expand Down
Loading

0 comments on commit ee08f45

Please sign in to comment.