From f094d3fd939663ba485db3ef5b2bdcbbe4f14684 Mon Sep 17 00:00:00 2001 From: Thomas Watson Date: Thu, 12 Dec 2024 17:01:51 +0100 Subject: [PATCH] [DI] Batch outgoing http requests --- integration-tests/debugger/basic.spec.js | 193 ++++++++++-------- .../debugger/snapshot-pruning.spec.js | 2 +- integration-tests/debugger/snapshot.spec.js | 10 +- integration-tests/debugger/utils.js | 8 +- .../src/debugger/devtools_client/config.js | 3 +- .../src/debugger/devtools_client/index.js | 5 +- .../src/debugger/devtools_client/queue.js | 36 ++++ .../src/debugger/devtools_client/send.js | 38 +++- .../src/debugger/devtools_client/status.js | 11 +- .../debugger/devtools_client/queue.spec.js | 45 ++++ .../debugger/devtools_client/send.spec.js | 100 +++++++++ .../debugger/devtools_client/status.spec.js | 97 ++++++--- .../test/debugger/devtools_client/utils.js | 20 +- 13 files changed, 428 insertions(+), 140 deletions(-) create mode 100644 packages/dd-trace/src/debugger/devtools_client/queue.js create mode 100644 packages/dd-trace/test/debugger/devtools_client/queue.spec.js create mode 100644 packages/dd-trace/test/debugger/devtools_client/send.spec.js diff --git a/integration-tests/debugger/basic.spec.js b/integration-tests/debugger/basic.spec.js index 57c0c4a67a8..fa9e74ddfcb 100644 --- a/integration-tests/debugger/basic.spec.js +++ b/integration-tests/debugger/basic.spec.js @@ -46,20 +46,22 @@ describe('Dynamic Instrumentation', function () { }) t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - assertUUID(payload.debugger.diagnostics.runtimeId) - - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.axios.get(t.breakpoint.url) - .then((response) => { - assert.strictEqual(response.status, 200) - assert.deepStrictEqual(response.data, { hello: 'bar' }) - }) - .catch(done) - } else { - endIfDone() - } + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + assertUUID(event.debugger.diagnostics.runtimeId) + + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.axios.get(t.breakpoint.url) + .then((response) => { + assert.strictEqual(response.status, 200) + assert.deepStrictEqual(response.data, { hello: 'bar' }) + }) + .catch(done) + } else { + endIfDone() + } + }) }) t.agent.addRemoteConfig(t.rcConfig) @@ -107,11 +109,13 @@ describe('Dynamic Instrumentation', function () { }) t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - assertUUID(payload.debugger.diagnostics.runtimeId) - if (payload.debugger.diagnostics.status === 'INSTALLED') triggers.shift()() - endIfDone() + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + assertUUID(event.debugger.diagnostics.runtimeId) + if (event.debugger.diagnostics.status === 'INSTALLED') triggers.shift()() + endIfDone() + }) }) t.agent.addRemoteConfig(t.rcConfig) @@ -146,18 +150,20 @@ describe('Dynamic Instrumentation', function () { }) t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - assertUUID(payload.debugger.diagnostics.runtimeId) - - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.agent.removeRemoteConfig(t.rcConfig.id) - // Wait a little to see if we get any follow-up `debugger-diagnostics` messages - setTimeout(() => { - payloadsProcessed = true - endIfDone() - }, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval - } + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + assertUUID(event.debugger.diagnostics.runtimeId) + + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.agent.removeRemoteConfig(t.rcConfig.id) + // Wait a little to see if we get any follow-up `debugger-diagnostics` messages + setTimeout(() => { + payloadsProcessed = true + endIfDone() + }, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval + } + }) }) t.agent.addRemoteConfig(t.rcConfig) @@ -205,19 +211,21 @@ describe('Dynamic Instrumentation', function () { }] t.agent.on('debugger-diagnostics', ({ payload }) => { - const expected = expectedPayloads.shift() - assertObjectContains(payload, expected) - const { diagnostics } = payload.debugger - assertUUID(diagnostics.runtimeId) - - if (diagnostics.status === 'ERROR') { - assert.property(diagnostics, 'exception') - assert.hasAllKeys(diagnostics.exception, ['message', 'stacktrace']) - assert.typeOf(diagnostics.exception.message, 'string') - assert.typeOf(diagnostics.exception.stacktrace, 'string') - } + payload.forEach((event) => { + const expected = expectedPayloads.shift() + assertObjectContains(event, expected) + const { diagnostics } = event.debugger + assertUUID(diagnostics.runtimeId) + + if (diagnostics.status === 'ERROR') { + assert.property(diagnostics, 'exception') + assert.hasAllKeys(diagnostics.exception, ['message', 'stacktrace']) + assert.typeOf(diagnostics.exception.message, 'string') + assert.typeOf(diagnostics.exception.stacktrace, 'string') + } - endIfDone() + endIfDone() + }) }) t.agent.addRemoteConfig({ @@ -238,6 +246,10 @@ describe('Dynamic Instrumentation', function () { t.triggerBreakpoint() t.agent.on('debugger-input', ({ payload }) => { + assert.isArray(payload) + assert.lengthOf(payload, 1) + payload = payload[0] + const expected = { ddsource: 'dd_debugger', hostname: os.hostname(), @@ -304,10 +316,12 @@ describe('Dynamic Instrumentation', function () { ] t.agent.on('debugger-diagnostics', ({ payload }) => { - if (payload.debugger.diagnostics.status === 'INSTALLED') triggers.shift()().catch(done) + payload.forEach((event) => { + if (event.debugger.diagnostics.status === 'INSTALLED') triggers.shift()().catch(done) + }) }) - t.agent.on('debugger-input', ({ payload }) => { + t.agent.on('debugger-input', ({ payload: [payload] }) => { assert.strictEqual(payload.message, expectedMessages.shift()) if (expectedMessages.length === 0) done() }) @@ -317,17 +331,19 @@ describe('Dynamic Instrumentation', function () { it('should not trigger if probe is deleted', function (done) { t.agent.on('debugger-diagnostics', ({ payload }) => { - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.agent.once('remote-confg-responded', async () => { - await t.axios.get(t.breakpoint.url) - // We want to wait enough time to see if the client triggers on the breakpoint so that the test can fail - // if it does, but not so long that the test times out. - // TODO: Is there some signal we can use instead of a timer? - setTimeout(done, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval - }) + payload.forEach((event) => { + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.agent.once('remote-confg-responded', async () => { + await t.axios.get(t.breakpoint.url) + // We want to wait enough time to see if the client triggers on the breakpoint so that the test can fail + // if it does, but not so long that the test times out. + // TODO: Is there some signal we can use instead of a timer? + setTimeout(done, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval + }) - t.agent.removeRemoteConfig(t.rcConfig.id) - } + t.agent.removeRemoteConfig(t.rcConfig.id) + } + }) }) t.agent.on('debugger-input', () => { @@ -436,39 +452,42 @@ describe('Dynamic Instrumentation', function () { it('should remove the last breakpoint completely before trying to add a new one', function (done) { const rcConfig2 = t.generateRemoteConfig() - t.agent.on('debugger-diagnostics', ({ payload: { debugger: { diagnostics: { status, probeId } } } }) => { - if (status !== 'INSTALLED') return - - if (probeId === t.rcConfig.config.id) { - // First INSTALLED payload: Try to trigger the race condition. - t.agent.removeRemoteConfig(t.rcConfig.id) - t.agent.addRemoteConfig(rcConfig2) - } else { - // Second INSTALLED payload: Perform an HTTP request to see if we successfully handled the race condition. - let finished = false - - // If the race condition occurred, the debugger will have been detached from the main thread and the new - // probe will never trigger. If that's the case, the following timer will fire: - const timer = setTimeout(() => { - done(new Error('Race condition occurred!')) - }, 1000) - - // If we successfully handled the race condition, the probe will trigger, we'll get a probe result and the - // following event listener will be called: - t.agent.once('debugger-input', () => { - clearTimeout(timer) - finished = true - done() - }) + t.agent.on('debugger-diagnostics', ({ payload }) => { + payload.forEach((event) => { + const { status, probeId } = event.debugger.diagnostics + if (status !== 'INSTALLED') return + + if (probeId === t.rcConfig.config.id) { + // First INSTALLED payload: Try to trigger the race condition. + t.agent.removeRemoteConfig(t.rcConfig.id) + t.agent.addRemoteConfig(rcConfig2) + } else { + // Second INSTALLED payload: Perform an HTTP request to see if we successfully handled the race condition. + let finished = false + + // If the race condition occurred, the debugger will have been detached from the main thread and the new + // probe will never trigger. If that's the case, the following timer will fire: + const timer = setTimeout(() => { + done(new Error('Race condition occurred!')) + }, 2000) + + // If we successfully handled the race condition, the probe will trigger, we'll get a probe result and the + // following event listener will be called: + t.agent.once('debugger-input', () => { + clearTimeout(timer) + finished = true + done() + }) - // Perform HTTP request to try and trigger the probe - t.axios.get(t.breakpoint.url).catch((err) => { - // If the request hasn't fully completed by the time the tests ends and the target app is destroyed, Axios - // will complain with a "socket hang up" error. Hence this sanity check before calling `done(err)`. If we - // later add more tests below this one, this shouuldn't be an issue. - if (!finished) done(err) - }) - } + // Perform HTTP request to try and trigger the probe + t.axios.get(t.breakpoint.url).catch((err) => { + // If the request hasn't fully completed by the time the tests ends and the target app is destroyed, Axios + // will complain with a "socket hang up" error. Hence this sanity check before calling `done(err)`. If we + // later add more tests below this one, this shouuldn't be an issue. + if (!finished) done(err) + }) + } + }) }) t.agent.addRemoteConfig(t.rcConfig) diff --git a/integration-tests/debugger/snapshot-pruning.spec.js b/integration-tests/debugger/snapshot-pruning.spec.js index c1ba218dd1c..b94d6afcce3 100644 --- a/integration-tests/debugger/snapshot-pruning.spec.js +++ b/integration-tests/debugger/snapshot-pruning.spec.js @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () { beforeEach(t.triggerBreakpoint) it('should prune snapshot if payload is too large', function (done) { - t.agent.on('debugger-input', ({ payload }) => { + t.agent.on('debugger-input', ({ payload: [payload] }) => { assert.isBelow(Buffer.byteLength(JSON.stringify(payload)), 1024 * 1024) // 1MB assert.deepEqual(payload['debugger.snapshot'].captures, { lines: { diff --git a/integration-tests/debugger/snapshot.spec.js b/integration-tests/debugger/snapshot.spec.js index e3d17b225c4..7804ca94c91 100644 --- a/integration-tests/debugger/snapshot.spec.js +++ b/integration-tests/debugger/snapshot.spec.js @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () { beforeEach(t.triggerBreakpoint) it('should capture a snapshot', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { assert.deepEqual(Object.keys(captures), ['lines']) assert.deepEqual(Object.keys(captures.lines), [String(t.breakpoint.line)]) @@ -114,7 +114,7 @@ describe('Dynamic Instrumentation', function () { }) it('should respect maxReferenceDepth', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] delete locals.request delete locals.fastify @@ -150,7 +150,7 @@ describe('Dynamic Instrumentation', function () { }) it('should respect maxLength', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] assert.deepEqual(locals.lstr, { @@ -167,7 +167,7 @@ describe('Dynamic Instrumentation', function () { }) it('should respect maxCollectionSize', function (done) { - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] assert.deepEqual(locals.arr, { @@ -205,7 +205,7 @@ describe('Dynamic Instrumentation', function () { } } - t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => { + t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => { const { locals } = captures.lines[t.breakpoint.line] assert.deepEqual(Object.keys(locals), [ diff --git a/integration-tests/debugger/utils.js b/integration-tests/debugger/utils.js index bca970dea87..86bdd2fb91e 100644 --- a/integration-tests/debugger/utils.js +++ b/integration-tests/debugger/utils.js @@ -50,9 +50,11 @@ function setup () { function triggerBreakpoint (url) { // Trigger the breakpoint once probe is successfully installed t.agent.on('debugger-diagnostics', ({ payload }) => { - if (payload.debugger.diagnostics.status === 'INSTALLED') { - t.axios.get(url) - } + payload.forEach((event) => { + if (event.debugger.diagnostics.status === 'INSTALLED') { + t.axios.get(url) + } + }) }) } diff --git a/packages/dd-trace/src/debugger/devtools_client/config.js b/packages/dd-trace/src/debugger/devtools_client/config.js index 7783bc84d75..950d8938872 100644 --- a/packages/dd-trace/src/debugger/devtools_client/config.js +++ b/packages/dd-trace/src/debugger/devtools_client/config.js @@ -9,7 +9,8 @@ const config = module.exports = { service: parentConfig.service, commitSHA: parentConfig.commitSHA, repositoryUrl: parentConfig.repositoryUrl, - parentThreadId + parentThreadId, + maxTotalPayloadSize: 5 * 1024 * 1024 // 5MB } updateUrl(parentConfig) diff --git a/packages/dd-trace/src/debugger/devtools_client/index.js b/packages/dd-trace/src/debugger/devtools_client/index.js index 7ca828786ac..cddd1283181 100644 --- a/packages/dd-trace/src/debugger/devtools_client/index.js +++ b/packages/dd-trace/src/debugger/devtools_client/index.js @@ -122,9 +122,8 @@ session.on('Debugger.paused', async ({ params }) => { } // TODO: Process template (DEBUG-2628) - send(probe.template, logger, snapshot, (err) => { - if (err) log.error('Debugger error', err) - else ackEmitting(probe) + send(probe.template, logger, snapshot, () => { + ackEmitting(probe) }) } }) diff --git a/packages/dd-trace/src/debugger/devtools_client/queue.js b/packages/dd-trace/src/debugger/devtools_client/queue.js new file mode 100644 index 00000000000..42f40f26ca4 --- /dev/null +++ b/packages/dd-trace/src/debugger/devtools_client/queue.js @@ -0,0 +1,36 @@ +'use strict' + +class JSONQueue { + constructor ({ size, timeout, onFlush }) { + this._maxSize = size + this._timeout = timeout + this._onFlush = onFlush + this._reset() + } + + _reset () { + clearTimeout(this._timer) + this._timer = null + this._partialJson = null + } + + _flush () { + const json = `${this._partialJson}]` + this._reset() + this._onFlush(json) + } + + add (str, size = Buffer.byteLength(str)) { + if (this._timer === null) { + this._partialJson = `[${str}` + this._timer = setTimeout(() => this._flush(), this._timeout) + } else if (Buffer.byteLength(this._partialJson) + size + 2 > this._maxSize) { + this._flush() + this.add(str, size) + } else { + this._partialJson += `,${str}` + } + } +} + +module.exports = JSONQueue diff --git a/packages/dd-trace/src/debugger/devtools_client/send.js b/packages/dd-trace/src/debugger/devtools_client/send.js index f2ba5befd46..0584c3da6d2 100644 --- a/packages/dd-trace/src/debugger/devtools_client/send.js +++ b/packages/dd-trace/src/debugger/devtools_client/send.js @@ -4,12 +4,14 @@ const { hostname: getHostname } = require('os') const { stringify } = require('querystring') const config = require('./config') +const JSONQueue = require('./queue') const request = require('../../exporters/common/request') const { GIT_COMMIT_SHA, GIT_REPOSITORY_URL } = require('../../plugins/util/tags') +const log = require('../../log') module.exports = send -const MAX_PAYLOAD_SIZE = 1024 * 1024 // 1MB +const MAX_LOG_PAYLOAD_SIZE = 1024 * 1024 // 1MB const ddsource = 'dd_debugger' const hostname = getHostname() @@ -22,14 +24,10 @@ const ddtags = [ const path = `/debugger/v1/input?${stringify({ ddtags })}` -function send (message, logger, snapshot, cb) { - const opts = { - method: 'POST', - url: config.url, - path, - headers: { 'Content-Type': 'application/json; charset=utf-8' } - } +let callbacks = [] +const queue = new JSONQueue({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush }) +function send (message, logger, snapshot, cb) { const payload = { ddsource, hostname, @@ -40,8 +38,9 @@ function send (message, logger, snapshot, cb) { } let json = JSON.stringify(payload) + let size = Buffer.byteLength(json) - if (Buffer.byteLength(json) > MAX_PAYLOAD_SIZE) { + if (size > MAX_LOG_PAYLOAD_SIZE) { // TODO: This is a very crude way to handle large payloads. Proper pruning will be implemented later (DEBUG-2624) const line = Object.values(payload['debugger.snapshot'].captures.lines)[0] line.locals = { @@ -49,7 +48,26 @@ function send (message, logger, snapshot, cb) { size: Object.keys(line.locals).length } json = JSON.stringify(payload) + size = Buffer.byteLength(json) + } + + queue.add(json, size) + callbacks.push(cb) +} + +function onFlush (payload) { + const opts = { + method: 'POST', + url: config.url, + path, + headers: { 'Content-Type': 'application/json; charset=utf-8' } } - request(json, opts, cb) + const _callbacks = callbacks + callbacks = [] + + request(payload, opts, (err) => { + if (err) log.error('Could not send debugger payload', err) + else _callbacks.forEach(cb => cb()) + }) } diff --git a/packages/dd-trace/src/debugger/devtools_client/status.js b/packages/dd-trace/src/debugger/devtools_client/status.js index b228d7e50b7..936d77c3d09 100644 --- a/packages/dd-trace/src/debugger/devtools_client/status.js +++ b/packages/dd-trace/src/debugger/devtools_client/status.js @@ -2,6 +2,7 @@ const LRUCache = require('lru-cache') const config = require('./config') +const JSONQueue = require('./queue') const request = require('../../exporters/common/request') const FormData = require('../../exporters/common/form-data') const log = require('../../log') @@ -25,6 +26,8 @@ const cache = new LRUCache({ ttlAutopurge: true }) +const queue = new JSONQueue({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush }) + const STATUSES = { RECEIVED: 'RECEIVED', INSTALLED: 'INSTALLED', @@ -71,11 +74,15 @@ function ackError (err, { id: probeId, version }) { } function send (payload) { + queue.add(JSON.stringify(payload)) +} + +function onFlush (payload) { const form = new FormData() form.append( 'event', - JSON.stringify(payload), + payload, { filename: 'event.json', contentType: 'application/json; charset=utf-8' } ) @@ -87,7 +94,7 @@ function send (payload) { } request(form, options, (err) => { - if (err) log.error('[debugger:devtools_client] Error sending debugger payload', err) + if (err) log.error('[debugger:devtools_client] Error sending probe payload', err) }) } diff --git a/packages/dd-trace/test/debugger/devtools_client/queue.spec.js b/packages/dd-trace/test/debugger/devtools_client/queue.spec.js new file mode 100644 index 00000000000..cf3891df083 --- /dev/null +++ b/packages/dd-trace/test/debugger/devtools_client/queue.spec.js @@ -0,0 +1,45 @@ +'use strict' + +require('../../setup/mocha') + +const JSONQueue = require('../../../src/debugger/devtools_client/queue') + +const MAX_SAFE_SIGNED_INTEGER = 2 ** 31 - 1 + +describe('JSONQueue', () => { + it('should call onFlush with the expected payload when the timeout is reached', function (done) { + const onFlush = (json) => { + const diff = Date.now() - start + expect(json).to.equal('[{"message":1},{"message":2},{"message":3}]') + expect(diff).to.be.within(95, 110) + done() + } + + const queue = new JSONQueue({ size: Infinity, timeout: 100, onFlush }) + + const start = Date.now() + queue.add(JSON.stringify({ message: 1 })) + queue.add(JSON.stringify({ message: 2 })) + queue.add(JSON.stringify({ message: 3 })) + }) + + it('should call onFlush with the expected payload when the size is reached', function (done) { + const expectedPayloads = [ + '[{"message":1},{"message":2}]', + '[{"message":3},{"message":4}]' + ] + + const onFlush = (json) => { + expect(json).to.equal(expectedPayloads.shift()) + if (expectedPayloads.length === 0) done() + } + + const queue = new JSONQueue({ size: 30, timeout: MAX_SAFE_SIGNED_INTEGER, onFlush }) + + queue.add(JSON.stringify({ message: 1 })) // size: 15 + queue.add(JSON.stringify({ message: 2 })) // size: 29 + queue.add(JSON.stringify({ message: 3 })) // size: 15 (flushed, and re-added) + queue.add(JSON.stringify({ message: 4 })) // size: 29 + queue.add(JSON.stringify({ message: 5 })) // size: 15 (flushed, and re-added) + }) +}) diff --git a/packages/dd-trace/test/debugger/devtools_client/send.spec.js b/packages/dd-trace/test/debugger/devtools_client/send.spec.js new file mode 100644 index 00000000000..c83cdd4c09a --- /dev/null +++ b/packages/dd-trace/test/debugger/devtools_client/send.spec.js @@ -0,0 +1,100 @@ +'use strict' + +require('../../setup/mocha') + +const { hostname: getHostname } = require('os') +const { expectWithin, getRequestOptions } = require('./utils') +const JSONQueue = require('../../../src/debugger/devtools_client/queue') + +const service = 'my-service' +const commitSHA = 'my-commit-sha' +const repositoryUrl = 'my-repository-url' +const url = 'my-url' +const ddsource = 'dd_debugger' +const hostname = getHostname() +const message = { message: true } +const logger = { logger: true } +const snapshot = { snapshot: true } + +describe('input message http requests', function () { + let send, request, queue + + beforeEach(function () { + request = sinon.spy() + request['@noCallThru'] = true + + class JSONQueueSpy extends JSONQueue { + constructor (...args) { + super(...args) + queue = this + sinon.spy(this, 'add') + } + } + + send = proxyquire('../src/debugger/devtools_client/send', { + './config': { service, commitSHA, repositoryUrl, url, '@noCallThru': true }, + './queue': JSONQueueSpy, + '../../exporters/common/request': request + }) + }) + + it('should queue instead of calling request directly', function () { + const callback = sinon.spy() + + send(message, logger, snapshot, callback) + expect(request).to.not.have.been.called + expect(queue.add).to.have.been.calledOnceWith( + JSON.stringify(getPayload()) + ) + expect(callback).to.not.have.been.called + }) + + it('should call request with the expected payload once the queue is flushed', function (done) { + const callback1 = sinon.spy() + const callback2 = sinon.spy() + const callback3 = sinon.spy() + + send({ message: 1 }, logger, snapshot, callback1) + send({ message: 2 }, logger, snapshot, callback2) + send({ message: 3 }, logger, snapshot, callback3) + expect(request).to.not.have.been.called + + expectWithin(1200, () => { + expect(request).to.have.been.calledOnceWith(JSON.stringify([ + getPayload({ message: 1 }), + getPayload({ message: 2 }), + getPayload({ message: 3 }) + ])) + + const opts = getRequestOptions(request) + expect(opts).to.have.property('method', 'POST') + expect(opts).to.have.property( + 'path', + '/debugger/v1/input?ddtags=git.commit.sha%3Amy-commit-sha%2Cgit.repository_url%3Amy-repository-url' + ) + + expect(callback1).to.not.have.been.calledOnce + expect(callback2).to.not.have.been.calledOnce + expect(callback3).to.not.have.been.calledOnce + + request.firstCall.callback() + + expect(callback1).to.have.been.calledOnce + expect(callback2).to.have.been.calledOnce + expect(callback3).to.have.been.calledOnce + + done() + }) + }) +}) + +function getPayload (_message = message) { + return { + ddsource, + hostname, + service, + message: _message, + logger, + 'debugger.snapshot': snapshot + } +} diff --git a/packages/dd-trace/test/debugger/devtools_client/status.spec.js b/packages/dd-trace/test/debugger/devtools_client/status.spec.js index 365d86d6e96..ef1992feffd 100644 --- a/packages/dd-trace/test/debugger/devtools_client/status.spec.js +++ b/packages/dd-trace/test/debugger/devtools_client/status.spec.js @@ -2,12 +2,15 @@ require('../../setup/mocha') +const { expectWithin, getRequestOptions } = require('./utils') +const JSONQueue = require('../../../src/debugger/devtools_client/queue') + const ddsource = 'dd_debugger' const service = 'my-service' const runtimeId = 'my-runtime-id' -describe('diagnostic message http request caching', function () { - let statusproxy, request +describe('diagnostic message http requests', function () { + let statusproxy, request, queue const acks = [ ['ackReceived', 'RECEIVED'], @@ -20,8 +23,17 @@ describe('diagnostic message http request caching', function () { request = sinon.spy() request['@noCallThru'] = true + class JSONQueueSpy extends JSONQueue { + constructor (...args) { + super(...args) + queue = this + sinon.spy(this, 'add') + } + } + statusproxy = proxyquire('../src/debugger/devtools_client/status', { './config': { service, runtimeId, '@noCallThru': true }, + './queue': JSONQueueSpy, '../../exporters/common/request': request }) }) @@ -45,54 +57,85 @@ describe('diagnostic message http request caching', function () { } }) - it('should only call once if no change', function () { + it('should queue instead of calling request directly', function () { + ackFn({ id: 'foo', version: 0 }) + expect(request).to.not.have.been.called + expect(queue.add).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) + }) + + it('should only add to queue once if no change', function () { ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce - assertRequestData(request, { probeId: 'foo', version: 0, status, exception }) + expect(queue.add).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce + expect(queue.add).to.have.been.calledOnce }) - it('should call again if version changes', function () { + it('should add to queue again if version changes', function () { ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce - assertRequestData(request, { probeId: 'foo', version: 0, status, exception }) + expect(queue.add).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) ackFn({ id: 'foo', version: 1 }) - expect(request).to.have.been.calledTwice - assertRequestData(request, { probeId: 'foo', version: 1, status, exception }) + expect(queue.add).to.have.been.calledTwice + expect(queue.add.lastCall).to.have.been.calledWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 1, status, exception })) + ) }) - it('should call again if probeId changes', function () { + it('should add to queue again if probeId changes', function () { ackFn({ id: 'foo', version: 0 }) - expect(request).to.have.been.calledOnce - assertRequestData(request, { probeId: 'foo', version: 0, status, exception }) + expect(queue.add).to.have.been.calledOnceWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception })) + ) ackFn({ id: 'bar', version: 0 }) - expect(request).to.have.been.calledTwice - assertRequestData(request, { probeId: 'bar', version: 0, status, exception }) + expect(queue.add).to.have.been.calledTwice + expect(queue.add.lastCall).to.have.been.calledWith( + JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'bar', version: 0, status, exception })) + ) + }) + + it('should call request with the expected payload once the queue is flushed', function (done) { + ackFn({ id: 'foo', version: 0 }) + ackFn({ id: 'foo', version: 1 }) + ackFn({ id: 'bar', version: 0 }) + expect(request).to.not.have.been.called + + expectWithin(1200, () => { + expect(request).to.have.been.calledOnce + + const payload = getFormPayload(request) + + expect(payload).to.deep.equal([ + formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception }), + formatAsDiagnosticsEvent({ probeId: 'foo', version: 1, status, exception }), + formatAsDiagnosticsEvent({ probeId: 'bar', version: 0, status, exception }) + ]) + + const opts = getRequestOptions(request) + expect(opts).to.have.property('method', 'POST') + expect(opts).to.have.property('path', '/debugger/v1/diagnostics') + + done() + }) }) }) } }) -function assertRequestData (request, { probeId, version, status, exception }) { - const payload = getFormPayload(request) +function formatAsDiagnosticsEvent ({ probeId, version, status, exception }) { const diagnostics = { probeId, runtimeId, probeVersion: version, status } // Error requests will also contain an `exception` property if (exception) diagnostics.exception = exception - expect(payload).to.deep.equal({ ddsource, service, debugger: { diagnostics } }) - - const opts = getRequestOptions(request) - expect(opts).to.have.property('method', 'POST') - expect(opts).to.have.property('path', '/debugger/v1/diagnostics') -} - -function getRequestOptions (request) { - return request.lastCall.args[1] + return { ddsource, service, debugger: { diagnostics } } } function getFormPayload (request) { diff --git a/packages/dd-trace/test/debugger/devtools_client/utils.js b/packages/dd-trace/test/debugger/devtools_client/utils.js index e15d567a7c1..2da3216cea1 100644 --- a/packages/dd-trace/test/debugger/devtools_client/utils.js +++ b/packages/dd-trace/test/debugger/devtools_client/utils.js @@ -3,7 +3,21 @@ const { randomUUID } = require('node:crypto') module.exports = { - generateProbeConfig + expectWithin, + generateProbeConfig, + getRequestOptions +} + +function expectWithin (timeout, fn, start = Date.now(), backoff = 1) { + try { + fn() + } catch (e) { + if (Date.now() - start > timeout) { + throw e + } else { + setTimeout(expectWithin, backoff, timeout, fn, start, backoff < 128 ? backoff * 2 : backoff) + } + } } function generateProbeConfig (breakpoint, overrides = {}) { @@ -23,3 +37,7 @@ function generateProbeConfig (breakpoint, overrides = {}) { ...overrides } } + +function getRequestOptions (request) { + return request.lastCall.args[1] +}