Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ const V4_PACKAGE_SHIMS = [
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/responses/responses',
targetClass: 'Responses',
baseResource: 'responses',
methods: ['create'],
streamedResponse: true,
versions: ['>=4.87.0']
},
{
file: 'resources/embeddings',
targetClass: 'Embeddings',
Expand Down
25 changes: 24 additions & 1 deletion packages/datadog-plugin-openai/src/stream-helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,31 @@ function constructChatCompletionResponseFromStreamedChunks (chunks, n) {
})
}

/**
* Constructs the entire response from a stream of OpenAI responses chunks.
* The responses API uses event-based streaming with delta chunks.
* @param {Array<Record<string, any>>} chunks
* @returns {Record<string, any>}
*/
function constructResponseResponseFromStreamedChunks (chunks) {
// The responses API streams events with different types:
// - response.output_text.delta: incremental text deltas
// - response.output_text.done: complete text for a content part
// - response.output_item.done: complete output item with role
// - response.done/response.incomplete/response.completed: final response with output array and usage

// Find the last chunk with a complete response object (status: done, incomplete, or completed)
for (let i = chunks.length - 1; i >= 0; i--) {
const chunk = chunks[i]
if (chunk.response && ['done', 'incomplete', 'completed'].includes(chunk.response.status)) {
return chunk.response
}
}
}

module.exports = {
convertBuffersToObjects,
constructCompletionResponseFromStreamedChunks,
constructChatCompletionResponseFromStreamedChunks
constructChatCompletionResponseFromStreamedChunks,
constructResponseResponseFromStreamedChunks
}
47 changes: 46 additions & 1 deletion packages/datadog-plugin-openai/src/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const { MEASURED } = require('../../../ext/tags')
const {
convertBuffersToObjects,
constructCompletionResponseFromStreamedChunks,
constructChatCompletionResponseFromStreamedChunks
constructChatCompletionResponseFromStreamedChunks,
constructResponseResponseFromStreamedChunks
} = require('./stream-helpers')

const { DD_MAJOR } = require('../../../version')
Expand Down Expand Up @@ -59,6 +60,8 @@ class OpenAiTracingPlugin extends TracingPlugin {
response = constructCompletionResponseFromStreamedChunks(chunks, n)
} else if (methodName === 'createChatCompletion') {
response = constructChatCompletionResponseFromStreamedChunks(chunks, n)
} else if (methodName === 'createResponse') {
response = constructResponseResponseFromStreamedChunks(chunks)
}

ctx.result = { data: response }
Expand Down Expand Up @@ -134,6 +137,10 @@ class OpenAiTracingPlugin extends TracingPlugin {
case 'createEdit':
createEditRequestExtraction(tags, payload, openaiStore)
break

case 'createResponse':
createResponseRequestExtraction(tags, payload, openaiStore)
break
}

span.addTags(tags)
Expand Down Expand Up @@ -313,6 +320,10 @@ function normalizeMethodName (methodName) {
case 'embeddings.create':
return 'createEmbedding'

// responses
case 'responses.create':
return 'createResponse'

// files
case 'files.create':
return 'createFile'
Expand Down Expand Up @@ -376,6 +387,16 @@ function createEditRequestExtraction (tags, payload, openaiStore) {
openaiStore.instruction = instruction
}

function createResponseRequestExtraction (tags, payload, openaiStore) {
// Extract model information
if (payload.model) {
tags['openai.request.model'] = payload.model
}

// Store the full payload for response extraction
openaiStore.responseData = payload
}

function retrieveModelRequestExtraction (tags, payload) {
tags['openai.request.id'] = payload.id
}
Expand Down Expand Up @@ -410,6 +431,10 @@ function responseDataExtractionByMethod (methodName, tags, body, openaiStore) {
commonCreateResponseExtraction(tags, body, openaiStore, methodName)
break

case 'createResponse':
createResponseResponseExtraction(tags, body, openaiStore)
break

case 'listFiles':
case 'listFineTunes':
case 'listFineTuneEvents':
Expand Down Expand Up @@ -513,6 +538,26 @@ function commonCreateResponseExtraction (tags, body, openaiStore, methodName) {
openaiStore.choices = body.choices
}

function createResponseResponseExtraction (tags, body, openaiStore) {
// Extract response ID if available
if (body.id) {
tags['openai.response.id'] = body.id
}

// Extract status if available
if (body.status) {
tags['openai.response.status'] = body.status
}

// Extract model from response if available
if (body.model) {
tags['openai.response.model'] = body.model
}

// Store the full response for potential future use
openaiStore.response = body
}

// The server almost always responds with JSON
function coerceResponseBody (body, methodName) {
switch (methodName) {
Expand Down
Loading
Loading