Skip to content

Commit

Permalink
[DI] Batch outgoing http requests
Browse files Browse the repository at this point in the history
  • Loading branch information
watson committed Dec 12, 2024
1 parent c6defbc commit c8456c4
Show file tree
Hide file tree
Showing 13 changed files with 428 additions and 140 deletions.
193 changes: 106 additions & 87 deletions integration-tests/debugger/basic.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,22 @@ describe('Dynamic Instrumentation', function () {
})

t.agent.on('debugger-diagnostics', ({ payload }) => {
const expected = expectedPayloads.shift()
assertObjectContains(payload, expected)
assertUUID(payload.debugger.diagnostics.runtimeId)

if (payload.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(t.breakpoint.url)
.then((response) => {
assert.strictEqual(response.status, 200)
assert.deepStrictEqual(response.data, { hello: 'foo' })
})
.catch(done)
} else {
endIfDone()
}
payload.forEach((event) => {
const expected = expectedPayloads.shift()
assertObjectContains(event, expected)
assertUUID(event.debugger.diagnostics.runtimeId)

if (event.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(t.breakpoint.url)
.then((response) => {
assert.strictEqual(response.status, 200)
assert.deepStrictEqual(response.data, { hello: 'foo' })
})
.catch(done)
} else {
endIfDone()
}
})
})

t.agent.addRemoteConfig(t.rcConfig)
Expand Down Expand Up @@ -107,11 +109,13 @@ describe('Dynamic Instrumentation', function () {
})

t.agent.on('debugger-diagnostics', ({ payload }) => {
const expected = expectedPayloads.shift()
assertObjectContains(payload, expected)
assertUUID(payload.debugger.diagnostics.runtimeId)
if (payload.debugger.diagnostics.status === 'INSTALLED') triggers.shift()()
endIfDone()
payload.forEach((event) => {
const expected = expectedPayloads.shift()
assertObjectContains(event, expected)
assertUUID(event.debugger.diagnostics.runtimeId)
if (event.debugger.diagnostics.status === 'INSTALLED') triggers.shift()()
endIfDone()
})
})

t.agent.addRemoteConfig(t.rcConfig)
Expand Down Expand Up @@ -146,18 +150,20 @@ describe('Dynamic Instrumentation', function () {
})

t.agent.on('debugger-diagnostics', ({ payload }) => {
const expected = expectedPayloads.shift()
assertObjectContains(payload, expected)
assertUUID(payload.debugger.diagnostics.runtimeId)

if (payload.debugger.diagnostics.status === 'INSTALLED') {
t.agent.removeRemoteConfig(t.rcConfig.id)
// Wait a little to see if we get any follow-up `debugger-diagnostics` messages
setTimeout(() => {
payloadsProcessed = true
endIfDone()
}, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval
}
payload.forEach((event) => {
const expected = expectedPayloads.shift()
assertObjectContains(event, expected)
assertUUID(event.debugger.diagnostics.runtimeId)

if (event.debugger.diagnostics.status === 'INSTALLED') {
t.agent.removeRemoteConfig(t.rcConfig.id)
// Wait a little to see if we get any follow-up `debugger-diagnostics` messages
setTimeout(() => {
payloadsProcessed = true
endIfDone()
}, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval
}
})
})

t.agent.addRemoteConfig(t.rcConfig)
Expand Down Expand Up @@ -205,19 +211,21 @@ describe('Dynamic Instrumentation', function () {
}]

t.agent.on('debugger-diagnostics', ({ payload }) => {
const expected = expectedPayloads.shift()
assertObjectContains(payload, expected)
const { diagnostics } = payload.debugger
assertUUID(diagnostics.runtimeId)

if (diagnostics.status === 'ERROR') {
assert.property(diagnostics, 'exception')
assert.hasAllKeys(diagnostics.exception, ['message', 'stacktrace'])
assert.typeOf(diagnostics.exception.message, 'string')
assert.typeOf(diagnostics.exception.stacktrace, 'string')
}
payload.forEach((event) => {
const expected = expectedPayloads.shift()
assertObjectContains(event, expected)
const { diagnostics } = event.debugger
assertUUID(diagnostics.runtimeId)

if (diagnostics.status === 'ERROR') {
assert.property(diagnostics, 'exception')
assert.hasAllKeys(diagnostics.exception, ['message', 'stacktrace'])
assert.typeOf(diagnostics.exception.message, 'string')
assert.typeOf(diagnostics.exception.stacktrace, 'string')
}

endIfDone()
endIfDone()
})
})

t.agent.addRemoteConfig({
Expand All @@ -238,6 +246,10 @@ describe('Dynamic Instrumentation', function () {
t.triggerBreakpoint()

t.agent.on('debugger-input', ({ payload }) => {
assert.isArray(payload)
assert.lengthOf(payload, 1)
payload = payload[0]

const expected = {
ddsource: 'dd_debugger',
hostname: os.hostname(),
Expand Down Expand Up @@ -304,10 +316,12 @@ describe('Dynamic Instrumentation', function () {
]

t.agent.on('debugger-diagnostics', ({ payload }) => {
if (payload.debugger.diagnostics.status === 'INSTALLED') triggers.shift()().catch(done)
payload.forEach((event) => {
if (event.debugger.diagnostics.status === 'INSTALLED') triggers.shift()().catch(done)
})
})

t.agent.on('debugger-input', ({ payload }) => {
t.agent.on('debugger-input', ({ payload: [payload] }) => {
assert.strictEqual(payload.message, expectedMessages.shift())
if (expectedMessages.length === 0) done()
})
Expand All @@ -317,17 +331,19 @@ describe('Dynamic Instrumentation', function () {

it('should not trigger if probe is deleted', function (done) {
t.agent.on('debugger-diagnostics', ({ payload }) => {
if (payload.debugger.diagnostics.status === 'INSTALLED') {
t.agent.once('remote-confg-responded', async () => {
await t.axios.get(t.breakpoint.url)
// We want to wait enough time to see if the client triggers on the breakpoint so that the test can fail
// if it does, but not so long that the test times out.
// TODO: Is there some signal we can use instead of a timer?
setTimeout(done, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval
})
payload.forEach((event) => {
if (event.debugger.diagnostics.status === 'INSTALLED') {
t.agent.once('remote-confg-responded', async () => {
await t.axios.get(t.breakpoint.url)
// We want to wait enough time to see if the client triggers on the breakpoint so that the test can fail
// if it does, but not so long that the test times out.
// TODO: Is there some signal we can use instead of a timer?
setTimeout(done, pollInterval * 2 * 1000) // wait twice as long as the RC poll interval
})

t.agent.removeRemoteConfig(t.rcConfig.id)
}
t.agent.removeRemoteConfig(t.rcConfig.id)
}
})
})

t.agent.on('debugger-input', () => {
Expand All @@ -342,39 +358,42 @@ describe('Dynamic Instrumentation', function () {
it('should remove the last breakpoint completely before trying to add a new one', function (done) {
const rcConfig2 = t.generateRemoteConfig()

t.agent.on('debugger-diagnostics', ({ payload: { debugger: { diagnostics: { status, probeId } } } }) => {
if (status !== 'INSTALLED') return

if (probeId === t.rcConfig.config.id) {
// First INSTALLED payload: Try to trigger the race condition.
t.agent.removeRemoteConfig(t.rcConfig.id)
t.agent.addRemoteConfig(rcConfig2)
} else {
// Second INSTALLED payload: Perform an HTTP request to see if we successfully handled the race condition.
let finished = false

// If the race condition occurred, the debugger will have been detached from the main thread and the new
// probe will never trigger. If that's the case, the following timer will fire:
const timer = setTimeout(() => {
done(new Error('Race condition occurred!'))
}, 1000)

// If we successfully handled the race condition, the probe will trigger, we'll get a probe result and the
// following event listener will be called:
t.agent.once('debugger-input', () => {
clearTimeout(timer)
finished = true
done()
})
t.agent.on('debugger-diagnostics', ({ payload }) => {
payload.forEach((event) => {
const { status, probeId } = event.debugger.diagnostics
if (status !== 'INSTALLED') return

if (probeId === t.rcConfig.config.id) {
// First INSTALLED payload: Try to trigger the race condition.
t.agent.removeRemoteConfig(t.rcConfig.id)
t.agent.addRemoteConfig(rcConfig2)
} else {
// Second INSTALLED payload: Perform an HTTP request to see if we successfully handled the race condition.
let finished = false

// If the race condition occurred, the debugger will have been detached from the main thread and the new
// probe will never trigger. If that's the case, the following timer will fire:
const timer = setTimeout(() => {
done(new Error('Race condition occurred!'))
}, 2000)

// If we successfully handled the race condition, the probe will trigger, we'll get a probe result and the
// following event listener will be called:
t.agent.once('debugger-input', () => {
clearTimeout(timer)
finished = true
done()
})

// Perform HTTP request to try and trigger the probe
t.axios.get(t.breakpoint.url).catch((err) => {
// If the request hasn't fully completed by the time the tests ends and the target app is destroyed, Axios
// will complain with a "socket hang up" error. Hence this sanity check before calling `done(err)`. If we
// later add more tests below this one, this shouuldn't be an issue.
if (!finished) done(err)
})
}
// Perform HTTP request to try and trigger the probe
t.axios.get(t.breakpoint.url).catch((err) => {
// If the request hasn't fully completed by the time the tests ends and the target app is destroyed, Axios
// will complain with a "socket hang up" error. Hence this sanity check before calling `done(err)`. If we
// later add more tests below this one, this shouuldn't be an issue.
if (!finished) done(err)
})
}
})
})

t.agent.addRemoteConfig(t.rcConfig)
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/debugger/snapshot-pruning.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () {
beforeEach(t.triggerBreakpoint)

it('should prune snapshot if payload is too large', function (done) {
t.agent.on('debugger-input', ({ payload }) => {
t.agent.on('debugger-input', ({ payload: [payload] }) => {
assert.isBelow(Buffer.byteLength(JSON.stringify(payload)), 1024 * 1024) // 1MB
assert.deepEqual(payload['debugger.snapshot'].captures, {
lines: {
Expand Down
10 changes: 5 additions & 5 deletions integration-tests/debugger/snapshot.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () {
beforeEach(t.triggerBreakpoint)

it('should capture a snapshot', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
assert.deepEqual(Object.keys(captures), ['lines'])
assert.deepEqual(Object.keys(captures.lines), [String(t.breakpoint.line)])

Expand Down Expand Up @@ -114,7 +114,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxReferenceDepth', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]
delete locals.request
delete locals.fastify
Expand Down Expand Up @@ -150,7 +150,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxLength', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(locals.lstr, {
Expand All @@ -167,7 +167,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxCollectionSize', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(locals.arr, {
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('Dynamic Instrumentation', function () {
}
}

t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(Object.keys(locals), [
Expand Down
8 changes: 5 additions & 3 deletions integration-tests/debugger/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ function setup () {
function triggerBreakpoint () {
// Trigger the breakpoint once probe is successfully installed
t.agent.on('debugger-diagnostics', ({ payload }) => {
if (payload.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(breakpoint.url)
}
payload.forEach((event) => {
if (event.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(breakpoint.url)
}
})
})
}

Expand Down
3 changes: 2 additions & 1 deletion packages/dd-trace/src/debugger/devtools_client/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const config = module.exports = {
service: parentConfig.service,
commitSHA: parentConfig.commitSHA,
repositoryUrl: parentConfig.repositoryUrl,
parentThreadId
parentThreadId,
maxTotalPayloadSize: 5 * 1024 * 1024 // 5MB
}

updateUrl(parentConfig)
Expand Down
5 changes: 2 additions & 3 deletions packages/dd-trace/src/debugger/devtools_client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ session.on('Debugger.paused', async ({ params }) => {
}

// TODO: Process template (DEBUG-2628)
send(probe.template, logger, snapshot, (err) => {
if (err) log.error('Debugger error', err)
else ackEmitting(probe)
send(probe.template, logger, snapshot, () => {
ackEmitting(probe)
})
}
})
Expand Down
36 changes: 36 additions & 0 deletions packages/dd-trace/src/debugger/devtools_client/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict'

class JSONQueue {
constructor ({ size, timeout, onFlush }) {
this._maxSize = size
this._timeout = timeout
this._onFlush = onFlush
this._reset()
}

_reset () {
clearTimeout(this._timer)
this._timer = null
this._partialJson = null
}

_flush () {
const json = `${this._partialJson}]`
this._reset()
this._onFlush(json)
}

add (str, size = Buffer.byteLength(str)) {
if (this._timer === null) {
this._partialJson = `[${str}`
this._timer = setTimeout(() => this._flush(), this._timeout)
} else if (Buffer.byteLength(this._partialJson) + size + 2 > this._maxSize) {
this._flush()
this.add(str, size)
} else {
this._partialJson += `,${str}`
}
}
}

module.exports = JSONQueue
Loading

0 comments on commit c8456c4

Please sign in to comment.