Skip to content

Commit

Permalink
fix: ensure correct run context for 'cassandra-driver' instrumentation (
Browse files Browse the repository at this point in the history
#2549)

Refs: #2430
  • Loading branch information
trentm authored and Alan Storm committed Feb 3, 2022
1 parent f132ae8 commit 3d951a9
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Notes:
[float]
===== Bug fixes
* Fixes for run context handling for 'cassandra-driver' instrumentation.
({issues}2430[#2430])
* Fixes for run context handling for 'mongodb-core' instrumentation.
({issues}2430[#2430])
Expand Down
2 changes: 1 addition & 1 deletion docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ The Node.js agent will automatically instrument the following modules to give yo
|=======================================================================
|Module |Version |Note
|https://www.npmjs.com/package/aws-sdk[aws-sdk] |>1 <3 |Will instrument SQS send/receive/delete messages, all S3 methods, all DynamoDB methods, and the SNS publish method
|https://www.npmjs.com/package/cassandra-driver[cassandra-driver] |>=3.0.0 |Will instrument all queries
|https://www.npmjs.com/package/cassandra-driver[cassandra-driver] |>=3.0.0 <5 |Will instrument all queries
|https://www.npmjs.com/package/elasticsearch[elasticsearch] |>=8.0.0 |Will instrument all queries
|https://www.npmjs.com/package/@elastic/elasticsearch[@elastic/elasticsearch] |>=7.0.0 <9.0.0 |Will instrument all queries
|https://www.npmjs.com/package/graphql[graphql] |>=0.7.0 <16.0.0 |Will instrument all queries
Expand Down
104 changes: 104 additions & 0 deletions examples/trace-cassandra-driver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// A small example showing Elastic APM tracing the 'cassandra-driver' package.
//
// This assumes a Cassandra server running on localhost. You can use:
// npm run docker:start cassandra
// to start a Cassandra docker container. Then `npm run docker:stop` to stop it.

const apm = require('../').start({ // elastic-apm-node
serviceName: 'example-trace-cassandra-driver',
logUncaughtExceptions: true
})

const cassandra = require('cassandra-driver')

const KEYSPACE = 'tracecassandradriver'
const TABLE = 'testtable'
let client

async function run () {
let res

client = new cassandra.Client({
contactPoints: ['localhost'],
localDataCenter: 'datacenter1'
})
await client.connect()
res = await client.execute('SELECT key FROM system.local')
console.log('select result:', res)

// Create a keyspace and table in which to play.
await client.execute(`
CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': 1
};
`)
await client.execute(`
CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id uuid,text varchar,PRIMARY KEY(id));
`)

// Make a new client in our now-existing keyspace.
await client.shutdown()
client = new cassandra.Client({
contactPoints: ['localhost'],
localDataCenter: 'datacenter1',
keyspace: KEYSPACE
})

// Play in this keyspace and table.
const sqlInsert = `INSERT INTO ${TABLE} (id, text) VALUES (uuid(), ?)`
res = await client.batch([
{ query: sqlInsert, params: ['foo'] },
{ query: sqlInsert, params: ['bar'] },
{ query: sqlInsert, params: ['foo'] }
])
console.log('batch insert result:', res)

function useEachRow () {
console.log('-- client.eachRow')
// `eachRow` doesn't provide a Promise interface, so we promisify ourselves.
return new Promise((resolve, reject) => {
client.eachRow(
`SELECT id, text FROM ${TABLE} WHERE text=? ALLOW FILTERING`,
['foo'],
(n, row) => {
console.log('row %d: %j', n, row)
},
(err, res) => {
if (err) {
reject(err)
} else {
resolve(res)
}
}
)
})
}
await useEachRow()

console.log('-- client.stream')
const q = client.stream(`SELECT id, text FROM ${TABLE} WHERE text=? ALLOW FILTERING`, ['foo'])
for await (const row of q) {
console.log('row: %j', row)
}

await client.execute(`DROP TABLE ${TABLE}`)
}

// For tracing spans to be created, there must be an active APM transaction.
// Typically, a transaction is automatically started for incoming HTTP
// requests to a Node.js server. However, because this script is not running
// an HTTP server, we manually start a transaction. More details at:
// https://www.elastic.co/guide/en/apm/agent/nodejs/current/custom-transactions.html
const t1 = apm.startTransaction('t1')

run()
.catch(err => {
console.warn('run err:', err)
})
.finally(() => {
if (client) {
client.shutdown()
}
t1.end()
})
12 changes: 7 additions & 5 deletions lib/instrumentation/modules/cassandra-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ module.exports = function (cassandra, agent, { version, enabled }) {
return cassandra
}

const ins = agent._instrumentation

if (cassandra.Client) {
if (semver.gte(version, '4.4.0')) {
// Prior to v4.4.0, the regular `connect` function would be called by the
Expand All @@ -31,7 +33,7 @@ module.exports = function (cassandra, agent, { version, enabled }) {

function wrapAsyncConnect (original) {
return async function wrappedAsyncConnect () {
const span = agent.startSpan('Cassandra: Connect', 'db', 'cassandra', 'connect')
const span = ins.createSpan('Cassandra: Connect', 'db', 'cassandra', 'connect')
try {
return await original.apply(this, arguments)
} finally {
Expand All @@ -42,7 +44,7 @@ module.exports = function (cassandra, agent, { version, enabled }) {

function wrapConnect (original) {
return function wrappedConnect (callback) {
const span = agent.startSpan('Cassandra: Connect', 'db', 'cassandra', 'connect')
const span = ins.createSpan('Cassandra: Connect', 'db', 'cassandra', 'connect')
if (!span) {
return original.apply(this, arguments)
}
Expand Down Expand Up @@ -80,7 +82,7 @@ module.exports = function (cassandra, agent, { version, enabled }) {

function wrapBatch (original) {
return function wrappedBatch (queries, options, callback) {
const span = agent.startSpan('Cassandra: Batch query', 'db', 'cassandra', 'query')
const span = ins.createSpan('Cassandra: Batch query', 'db', 'cassandra', 'query')
if (!span) {
return original.apply(this, arguments)
}
Expand Down Expand Up @@ -124,7 +126,7 @@ module.exports = function (cassandra, agent, { version, enabled }) {

function wrapExecute (original) {
return function wrappedExecute (query, params, options, callback) {
const span = agent.startSpan(null, 'db', 'cassandra', 'query')
const span = ins.createSpan(null, 'db', 'cassandra', 'query')
if (!span) {
return original.apply(this, arguments)
}
Expand Down Expand Up @@ -163,7 +165,7 @@ module.exports = function (cassandra, agent, { version, enabled }) {

function wrapEachRow (original) {
return function wrappedEachRow (query, params, options, rowCallback, callback) {
const span = agent.startSpan(null, 'db', 'cassandra', 'query')
const span = ins.createSpan(null, 'db', 'cassandra', 'query')
if (!span) {
return original.apply(this, arguments)
}
Expand Down
7 changes: 7 additions & 0 deletions test/instrumentation/modules/cassandra-driver/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ test('connect', function (t) {
agent.startTransaction('foo')

client.connect(assertCallback(t))
t.ok(agent.currentSpan === null, 'no currentSpan in sync code after cassandra-driver client command')
})
})

Expand All @@ -54,6 +55,7 @@ if (hasPromises) {
t.strictEqual(rows.length, 1, 'number of rows')
t.strictEqual(rows[0].key, 'local', 'result key')
})
t.ok(agent.currentSpan === null, 'no currentSpan in sync code after cassandra-driver client command')
})
})
}
Expand All @@ -74,6 +76,7 @@ test('execute - callback', function (t) {
t.strictEqual(rows.length, 1, 'number of rows')
t.strictEqual(rows[0].key, 'local', 'result key')
}))
t.ok(agent.currentSpan === null, 'no currentSpan in sync code after cassandra-driver client command')
})
})

Expand Down Expand Up @@ -104,6 +107,7 @@ if (hasPromises) {
agent.startTransaction('foo')

assertPromise(t, client.batch(queries))
t.ok(agent.currentSpan === null, 'no currentSpan in sync code after cassandra-driver client command')
})
})
}
Expand Down Expand Up @@ -136,6 +140,7 @@ test('batch - callback', function (t) {
client.batch(queries, assertCallback(t, function (err) {
t.error(err, 'no error')
}))
t.ok(agent.currentSpan === null, 'no currentSpan in sync code after cassandra-driver client command')
})
})

Expand All @@ -157,6 +162,7 @@ test('eachRow', function (t) {
t.error(err, 'no error')
agent.endTransaction()
})
t.ok(agent.currentSpan === null, 'no currentSpan in sync code after cassandra-driver client command')
})
})

Expand All @@ -173,6 +179,7 @@ test('stream', function (t) {
agent.startTransaction('foo')

const stream = client.stream(sql, [])
t.ok(agent.currentSpan === null, 'no currentSpan in sync code after cassandra-driver client command')
let rows = 0

stream.on('readable', function () {
Expand Down

0 comments on commit 3d951a9

Please sign in to comment.