Skip to content

Commit

Permalink
Rename Queue to JSONBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
watson committed Dec 20, 2024
1 parent 857d4ef commit 39ba330
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

class JSONQueue {
class JSONBuffer {
constructor ({ size, timeout, onFlush }) {
this._maxSize = size
this._timeout = timeout
Expand All @@ -20,17 +20,17 @@ class JSONQueue {
this._onFlush(json)
}

add (str, size = Buffer.byteLength(str)) {
write (str, size = Buffer.byteLength(str)) {
if (this._timer === null) {
this._partialJson = `[${str}`
this._timer = setTimeout(() => this._flush(), this._timeout)
} else if (Buffer.byteLength(this._partialJson) + size + 2 > this._maxSize) {
this._flush()
this.add(str, size)
this.write(str, size)
} else {
this._partialJson += `,${str}`
}
}
}

module.exports = JSONQueue
module.exports = JSONBuffer
6 changes: 3 additions & 3 deletions packages/dd-trace/src/debugger/devtools_client/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { hostname: getHostname } = require('os')
const { stringify } = require('querystring')

const config = require('./config')
const JSONQueue = require('./queue')
const JSONBuffer = require('./json-buffer')
const request = require('../../exporters/common/request')
const { GIT_COMMIT_SHA, GIT_REPOSITORY_URL } = require('../../plugins/util/tags')
const log = require('../../log')
Expand All @@ -30,7 +30,7 @@ const ddtags = [
const path = `/debugger/v1/input?${stringify({ ddtags })}`

let callbacks = []
const queue = new JSONQueue({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })
const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })

function send (message, logger, dd, snapshot, cb) {
const payload = {
Expand All @@ -57,7 +57,7 @@ function send (message, logger, dd, snapshot, cb) {
size = Buffer.byteLength(json)
}

queue.add(json, size)
jsonBuffer.write(json, size)
callbacks.push(cb)
}

Expand Down
6 changes: 3 additions & 3 deletions packages/dd-trace/src/debugger/devtools_client/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const LRUCache = require('lru-cache')
const config = require('./config')
const JSONQueue = require('./queue')
const JSONBuffer = require('./json-buffer')
const request = require('../../exporters/common/request')
const FormData = require('../../exporters/common/form-data')
const log = require('../../log')
Expand All @@ -26,7 +26,7 @@ const cache = new LRUCache({
ttlAutopurge: true
})

const queue = new JSONQueue({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })
const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })

const STATUSES = {
RECEIVED: 'RECEIVED',
Expand Down Expand Up @@ -74,7 +74,7 @@ function ackError (err, { id: probeId, version }) {
}

function send (payload) {
queue.add(JSON.stringify(payload))
jsonBuffer.write(JSON.stringify(payload))
}

function onFlush (payload) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict'

require('../../setup/mocha')

const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer')

const MAX_SAFE_SIGNED_INTEGER = 2 ** 31 - 1

describe('JSONBuffer', () => {
it('should call onFlush with the expected payload when the timeout is reached', function (done) {
const onFlush = (json) => {
const diff = Date.now() - start
expect(json).to.equal('[{"message":1},{"message":2},{"message":3}]')
expect(diff).to.be.within(95, 110)
done()
}

const jsonBuffer = new JSONBuffer({ size: Infinity, timeout: 100, onFlush })

const start = Date.now()
jsonBuffer.write(JSON.stringify({ message: 1 }))
jsonBuffer.write(JSON.stringify({ message: 2 }))
jsonBuffer.write(JSON.stringify({ message: 3 }))
})

it('should call onFlush with the expected payload when the size is reached', function (done) {
const expectedPayloads = [
'[{"message":1},{"message":2}]',
'[{"message":3},{"message":4}]'
]

const onFlush = (json) => {
expect(json).to.equal(expectedPayloads.shift())
if (expectedPayloads.length === 0) done()
}

const jsonBuffer = new JSONBuffer({ size: 30, timeout: MAX_SAFE_SIGNED_INTEGER, onFlush })

jsonBuffer.write(JSON.stringify({ message: 1 })) // size: 15
jsonBuffer.write(JSON.stringify({ message: 2 })) // size: 29
jsonBuffer.write(JSON.stringify({ message: 3 })) // size: 15 (flushed, and re-added)
jsonBuffer.write(JSON.stringify({ message: 4 })) // size: 29
jsonBuffer.write(JSON.stringify({ message: 5 })) // size: 15 (flushed, and re-added)
})
})
45 changes: 0 additions & 45 deletions packages/dd-trace/test/debugger/devtools_client/queue.spec.js

This file was deleted.

18 changes: 9 additions & 9 deletions packages/dd-trace/test/debugger/devtools_client/send.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require('../../setup/mocha')

const { hostname: getHostname } = require('os')
const { expectWithin, getRequestOptions } = require('./utils')
const JSONQueue = require('../../../src/debugger/devtools_client/queue')
const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer')
const { version } = require('../../../../../package.json')

process.env.DD_ENV = 'my-env'
Expand All @@ -21,39 +21,39 @@ const dd = { dd: true }
const snapshot = { snapshot: true }

describe('input message http requests', function () {
let send, request, queue
let send, request, jsonBuffer

beforeEach(function () {
request = sinon.spy()
request['@noCallThru'] = true

class JSONQueueSpy extends JSONQueue {
class JSONBufferSpy extends JSONBuffer {
constructor (...args) {
super(...args)
queue = this
sinon.spy(this, 'add')
jsonBuffer = this
sinon.spy(this, 'write')
}
}

send = proxyquire('../src/debugger/devtools_client/send', {
'./config': { service, commitSHA, repositoryUrl, url, '@noCallThru': true },
'./queue': JSONQueueSpy,
'./json-buffer': JSONBufferSpy,
'../../exporters/common/request': request
})
})

it('should queue instead of calling request directly', function () {
it('should buffer instead of calling request directly', function () {
const callback = sinon.spy()

send(message, logger, dd, snapshot, callback)
expect(request).to.not.have.been.called
expect(queue.add).to.have.been.calledOnceWith(
expect(jsonBuffer.write).to.have.been.calledOnceWith(
JSON.stringify(getPayload())
)
expect(callback).to.not.have.been.called
})

it('should call request with the expected payload once the queue is flushed', function (done) {
it('should call request with the expected payload once the buffer is flushed', function (done) {
const callback1 = sinon.spy()
const callback2 = sinon.spy()
const callback3 = sinon.spy()
Expand Down
40 changes: 20 additions & 20 deletions packages/dd-trace/test/debugger/devtools_client/status.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
require('../../setup/mocha')

const { expectWithin, getRequestOptions } = require('./utils')
const JSONQueue = require('../../../src/debugger/devtools_client/queue')
const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer')

const ddsource = 'dd_debugger'
const service = 'my-service'
const runtimeId = 'my-runtime-id'

describe('diagnostic message http requests', function () {
let statusproxy, request, queue
let statusproxy, request, jsonBuffer

const acks = [
['ackReceived', 'RECEIVED'],
Expand All @@ -23,17 +23,17 @@ describe('diagnostic message http requests', function () {
request = sinon.spy()
request['@noCallThru'] = true

class JSONQueueSpy extends JSONQueue {
class JSONBufferSpy extends JSONBuffer {
constructor (...args) {
super(...args)
queue = this
sinon.spy(this, 'add')
jsonBuffer = this
sinon.spy(this, 'write')
}
}

statusproxy = proxyquire('../src/debugger/devtools_client/status', {
'./config': { service, runtimeId, '@noCallThru': true },
'./queue': JSONQueueSpy,
'./json-buffer': JSONBufferSpy,
'../../exporters/common/request': request
})
})
Expand All @@ -57,51 +57,51 @@ describe('diagnostic message http requests', function () {
}
})

it('should queue instead of calling request directly', function () {
it('should buffer instead of calling request directly', function () {
ackFn({ id: 'foo', version: 0 })
expect(request).to.not.have.been.called
expect(queue.add).to.have.been.calledOnceWith(
expect(jsonBuffer.write).to.have.been.calledOnceWith(
JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception }))
)
})

it('should only add to queue once if no change', function () {
it('should only add to buffer once if no change', function () {
ackFn({ id: 'foo', version: 0 })
expect(queue.add).to.have.been.calledOnceWith(
expect(jsonBuffer.write).to.have.been.calledOnceWith(
JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception }))
)

ackFn({ id: 'foo', version: 0 })
expect(queue.add).to.have.been.calledOnce
expect(jsonBuffer.write).to.have.been.calledOnce
})

it('should add to queue again if version changes', function () {
it('should add to buffer again if version changes', function () {
ackFn({ id: 'foo', version: 0 })
expect(queue.add).to.have.been.calledOnceWith(
expect(jsonBuffer.write).to.have.been.calledOnceWith(
JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception }))
)

ackFn({ id: 'foo', version: 1 })
expect(queue.add).to.have.been.calledTwice
expect(queue.add.lastCall).to.have.been.calledWith(
expect(jsonBuffer.write).to.have.been.calledTwice
expect(jsonBuffer.write.lastCall).to.have.been.calledWith(
JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 1, status, exception }))
)
})

it('should add to queue again if probeId changes', function () {
it('should add to buffer again if probeId changes', function () {
ackFn({ id: 'foo', version: 0 })
expect(queue.add).to.have.been.calledOnceWith(
expect(jsonBuffer.write).to.have.been.calledOnceWith(
JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'foo', version: 0, status, exception }))
)

ackFn({ id: 'bar', version: 0 })
expect(queue.add).to.have.been.calledTwice
expect(queue.add.lastCall).to.have.been.calledWith(
expect(jsonBuffer.write).to.have.been.calledTwice
expect(jsonBuffer.write.lastCall).to.have.been.calledWith(
JSON.stringify(formatAsDiagnosticsEvent({ probeId: 'bar', version: 0, status, exception }))
)
})

it('should call request with the expected payload once the queue is flushed', function (done) {
it('should call request with the expected payload once the buffer is flushed', function (done) {
ackFn({ id: 'foo', version: 0 })
ackFn({ id: 'foo', version: 1 })
ackFn({ id: 'bar', version: 0 })
Expand Down

0 comments on commit 39ba330

Please sign in to comment.