Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DI] Batch outgoing http requests #5007

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 150 additions & 132 deletions integration-tests/debugger/basic.spec.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration-tests/debugger/snapshot-pruning.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () {
beforeEach(t.triggerBreakpoint)

it('should prune snapshot if payload is too large', function (done) {
t.agent.on('debugger-input', ({ payload }) => {
t.agent.on('debugger-input', ({ payload: [payload] }) => {
assert.isBelow(Buffer.byteLength(JSON.stringify(payload)), 1024 * 1024) // 1MB
assert.deepEqual(payload['debugger.snapshot'].captures, {
lines: {
Expand Down
10 changes: 5 additions & 5 deletions integration-tests/debugger/snapshot.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () {
beforeEach(t.triggerBreakpoint)

it('should capture a snapshot', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
assert.deepEqual(Object.keys(captures), ['lines'])
assert.deepEqual(Object.keys(captures.lines), [String(t.breakpoint.line)])

Expand Down Expand Up @@ -114,7 +114,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxReferenceDepth', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]
delete locals.request
delete locals.fastify
Expand Down Expand Up @@ -150,7 +150,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxLength', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(locals.lstr, {
Expand All @@ -167,7 +167,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxCollectionSize', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(locals.arr, {
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('Dynamic Instrumentation', function () {
}
}

t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(Object.keys(locals), [
Expand Down
8 changes: 5 additions & 3 deletions integration-tests/debugger/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ function setup ({ env, testApp } = {}) {
function triggerBreakpoint (url) {
// Trigger the breakpoint once probe is successfully installed
t.agent.on('debugger-diagnostics', ({ payload }) => {
if (payload.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(url)
}
payload.forEach((event) => {
if (event.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(url)
}
})
})
}

Expand Down
3 changes: 2 additions & 1 deletion packages/dd-trace/src/debugger/devtools_client/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const config = module.exports = {
service: parentConfig.service,
commitSHA: parentConfig.commitSHA,
repositoryUrl: parentConfig.repositoryUrl,
parentThreadId
parentThreadId,
maxTotalPayloadSize: 5 * 1024 * 1024 // 5MB
}

updateUrl(parentConfig)
Expand Down
5 changes: 2 additions & 3 deletions packages/dd-trace/src/debugger/devtools_client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,8 @@ session.on('Debugger.paused', async ({ params }) => {
}

// TODO: Process template (DEBUG-2628)
send(probe.template, logger, dd, snapshot, (err) => {
if (err) log.error('Debugger error', err)
else ackEmitting(probe)
send(probe.template, logger, dd, snapshot, () => {
ackEmitting(probe)
})
}
})
Expand Down
36 changes: 36 additions & 0 deletions packages/dd-trace/src/debugger/devtools_client/json-buffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict'

class JSONBuffer {
constructor ({ size, timeout, onFlush }) {
this._maxSize = size
this._timeout = timeout
this._onFlush = onFlush
this._reset()
}

_reset () {
clearTimeout(this._timer)
this._timer = null
this._partialJson = null
}

_flush () {
const json = `${this._partialJson}]`
this._reset()
this._onFlush(json)
}

write (str, size = Buffer.byteLength(str)) {
if (this._timer === null) {
this._partialJson = `[${str}`
this._timer = setTimeout(() => this._flush(), this._timeout)
} else if (Buffer.byteLength(this._partialJson) + size + 2 > this._maxSize) {
this._flush()
this.write(str, size)
} else {
this._partialJson += `,${str}`
}
}
}

module.exports = JSONBuffer
38 changes: 28 additions & 10 deletions packages/dd-trace/src/debugger/devtools_client/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ const { hostname: getHostname } = require('os')
const { stringify } = require('querystring')

const config = require('./config')
const JSONBuffer = require('./json-buffer')
const request = require('../../exporters/common/request')
const { GIT_COMMIT_SHA, GIT_REPOSITORY_URL } = require('../../plugins/util/tags')
const log = require('../../log')
const { version } = require('../../../../../package.json')

module.exports = send

const MAX_PAYLOAD_SIZE = 1024 * 1024 // 1MB
const MAX_LOG_PAYLOAD_SIZE = 1024 * 1024 // 1MB

const ddsource = 'dd_debugger'
const hostname = getHostname()
Expand All @@ -27,14 +29,10 @@ const ddtags = [

const path = `/debugger/v1/input?${stringify({ ddtags })}`

function send (message, logger, dd, snapshot, cb) {
const opts = {
method: 'POST',
url: config.url,
path,
headers: { 'Content-Type': 'application/json; charset=utf-8' }
}
let callbacks = []
const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })

function send (message, logger, dd, snapshot, cb) {
const payload = {
ddsource,
hostname,
Expand All @@ -46,16 +44,36 @@ function send (message, logger, dd, snapshot, cb) {
}

let json = JSON.stringify(payload)
let size = Buffer.byteLength(json)

if (Buffer.byteLength(json) > MAX_PAYLOAD_SIZE) {
if (size > MAX_LOG_PAYLOAD_SIZE) {
// TODO: This is a very crude way to handle large payloads. Proper pruning will be implemented later (DEBUG-2624)
const line = Object.values(payload['debugger.snapshot'].captures.lines)[0]
line.locals = {
notCapturedReason: 'Snapshot was too large',
size: Object.keys(line.locals).length
}
json = JSON.stringify(payload)
size = Buffer.byteLength(json)
}

jsonBuffer.write(json, size)
callbacks.push(cb)
}

function onFlush (payload) {
const opts = {
method: 'POST',
url: config.url,
path,
headers: { 'Content-Type': 'application/json; charset=utf-8' }
}

request(json, opts, cb)
const _callbacks = callbacks
callbacks = []

request(payload, opts, (err) => {
if (err) log.error('Could not send debugger payload', err)
else _callbacks.forEach(cb => cb())
})
}
11 changes: 9 additions & 2 deletions packages/dd-trace/src/debugger/devtools_client/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const LRUCache = require('lru-cache')
const config = require('./config')
const JSONBuffer = require('./json-buffer')
const request = require('../../exporters/common/request')
const FormData = require('../../exporters/common/form-data')
const log = require('../../log')
Expand All @@ -25,6 +26,8 @@ const cache = new LRUCache({
ttlAutopurge: true
})

const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })

const STATUSES = {
RECEIVED: 'RECEIVED',
INSTALLED: 'INSTALLED',
Expand Down Expand Up @@ -71,11 +74,15 @@ function ackError (err, { id: probeId, version }) {
}

function send (payload) {
jsonBuffer.write(JSON.stringify(payload))
}

function onFlush (payload) {
const form = new FormData()

form.append(
'event',
JSON.stringify(payload),
payload,
{ filename: 'event.json', contentType: 'application/json; charset=utf-8' }
)

Expand All @@ -87,7 +94,7 @@ function send (payload) {
}

request(form, options, (err) => {
if (err) log.error('[debugger:devtools_client] Error sending debugger payload', err)
if (err) log.error('[debugger:devtools_client] Error sending probe payload', err)
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict'

require('../../setup/mocha')

const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer')

const MAX_SAFE_SIGNED_INTEGER = 2 ** 31 - 1

describe('JSONBuffer', () => {
it('should call onFlush with the expected payload when the timeout is reached', function (done) {
const onFlush = (json) => {
const diff = Date.now() - start
expect(json).to.equal('[{"message":1},{"message":2},{"message":3}]')
expect(diff).to.be.within(95, 110)
done()
}

const jsonBuffer = new JSONBuffer({ size: Infinity, timeout: 100, onFlush })

const start = Date.now()
jsonBuffer.write(JSON.stringify({ message: 1 }))
jsonBuffer.write(JSON.stringify({ message: 2 }))
jsonBuffer.write(JSON.stringify({ message: 3 }))
})

it('should call onFlush with the expected payload when the size is reached', function (done) {
const expectedPayloads = [
'[{"message":1},{"message":2}]',
'[{"message":3},{"message":4}]'
]

const onFlush = (json) => {
expect(json).to.equal(expectedPayloads.shift())
if (expectedPayloads.length === 0) done()
}

const jsonBuffer = new JSONBuffer({ size: 30, timeout: MAX_SAFE_SIGNED_INTEGER, onFlush })

jsonBuffer.write(JSON.stringify({ message: 1 })) // size: 15
jsonBuffer.write(JSON.stringify({ message: 2 })) // size: 29
jsonBuffer.write(JSON.stringify({ message: 3 })) // size: 15 (flushed, and re-added)
jsonBuffer.write(JSON.stringify({ message: 4 })) // size: 29
jsonBuffer.write(JSON.stringify({ message: 5 })) // size: 15 (flushed, and re-added)
})
})
Loading
Loading