Skip to content

Commit 3735152

Browse files
committed
Only decode JSON input buffer in Anthropic Claude streaming
_decode_tool_use was only used when _tool_json_input_buf was found, but we were decoding the entire _content_block after adding _tool_json_input_buf to it. The _content_block overall which could contain non-JSON elements (e.g. {}), causing failures. To fix this, we have removed _decode_tool_use helper function and inlined JSON decoding logic directly into content_block_stop handler in _process_anthropic_claude_chunk, where we only use it to decode _tool_json_input_buf before appending to _content_block. Patch based on open-telemetry/opentelemetry-python-contrib#3875 with code copied directly from https://github.com/open-telemetry/opentelemetry-python-contrib/blob/v0.54b1/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py#L289
1 parent e85c964 commit 3735152

File tree

2 files changed

+163
-5
lines changed

2 files changed

+163
-5
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,88 @@ def patched_extract_tool_calls(
334334
tool_calls.append(tool_call)
335335
return tool_calls
336336

337+
# TODO: The following code is to patch a bedrock bug that was fixed in
338+
# opentelemetry-instrumentation-botocore==0.60b0 in:
339+
# https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3875
340+
# Remove this code once we've bumped opentelemetry-instrumentation-botocore dependency to 0.60b0
341+
def patched_process_anthropic_claude_chunk(self, chunk):
342+
# pylint: disable=too-many-return-statements,too-many-branches
343+
if not (message_type := chunk.get("type")):
344+
return
345+
346+
if message_type == "message_start":
347+
# {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant', 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 18, 'output_tokens': 1}}}
348+
if chunk.get("message", {}).get("role") == "assistant":
349+
self._record_message = True
350+
message = chunk["message"]
351+
self._message = {
352+
"role": message["role"],
353+
"content": message.get("content", []),
354+
}
355+
return
356+
357+
if message_type == "content_block_start":
358+
# {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}}
359+
# {'type': 'content_block_start', 'index': 1, 'content_block': {'type': 'tool_use', 'id': 'id', 'name': 'func_name', 'input': {}}}
360+
if self._record_message:
361+
block = chunk.get("content_block", {})
362+
if block.get("type") == "text":
363+
self._content_block = block
364+
elif block.get("type") == "tool_use":
365+
self._content_block = block
366+
return
367+
368+
if message_type == "content_block_delta":
369+
# {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}}
370+
# {'type': 'content_block_delta', 'index': 1, 'delta': {'type': 'input_json_delta', 'partial_json': ''}}
371+
if self._record_message:
372+
delta = chunk.get("delta", {})
373+
if delta.get("type") == "text_delta":
374+
self._content_block["text"] += delta.get("text", "")
375+
elif delta.get("type") == "input_json_delta":
376+
self._tool_json_input_buf += delta.get("partial_json", "")
377+
return
378+
379+
if message_type == "content_block_stop":
380+
# {'type': 'content_block_stop', 'index': 0}
381+
if self._tool_json_input_buf:
382+
try:
383+
self._content_block["input"] = json.loads(
384+
self._tool_json_input_buf
385+
)
386+
except json.JSONDecodeError:
387+
self._content_block["input"] = self._tool_json_input_buf
388+
self._message["content"].append(self._content_block)
389+
self._content_block = {}
390+
self._tool_json_input_buf = ""
391+
return
392+
393+
if message_type == "message_delta":
394+
# {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': 123}}
395+
if (
396+
stop_reason := chunk.get("delta", {}).get("stop_reason")
397+
) is not None:
398+
self._response["stopReason"] = stop_reason
399+
return
400+
401+
if message_type == "message_stop":
402+
# {'type': 'message_stop', 'amazon-bedrock-invocationMetrics': {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}}
403+
if invocation_metrics := chunk.get(
404+
"amazon-bedrock-invocationMetrics"
405+
):
406+
self._process_invocation_metrics(invocation_metrics)
407+
408+
if self._record_message:
409+
self._response["output"] = {"message": self._message}
410+
self._record_message = False
411+
self._message = None
412+
413+
self._stream_done_callback(self._response)
414+
return
415+
337416
bedrock_utils.ConverseStreamWrapper.__init__ = patched_init
338417
bedrock_utils.ConverseStreamWrapper._process_event = patched_process_event
418+
bedrock_utils.InvokeModelWithResponseStreamWrapper._process_anthropic_claude_chunk = patched_process_anthropic_claude_chunk
339419
bedrock_utils.extract_tool_calls = patched_extract_tool_calls
340420

341421
# END The OpenTelemetry Authors code

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_instrumentation_patch.py

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,6 @@ def _run_patch_behaviour_tests(self):
120120
self._test_unpatched_botocore_propagator()
121121
self._test_unpatched_gevent_instrumentation()
122122
self._test_unpatched_starlette_instrumentation()
123-
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
124-
# Bedrock Runtime tests
125-
self._test_unpatched_converse_stream_wrapper()
126-
self._test_unpatched_extract_tool_calls()
127123

128124
# Apply patches
129125
apply_instrumentation_patches()
@@ -222,6 +218,17 @@ def _test_unpatched_botocore_instrumentation(self):
222218
# DynamoDB
223219
self.assertTrue("dynamodb" in _KNOWN_EXTENSIONS, "Upstream has removed a DynamoDB extension")
224220

221+
# Bedrock Runtime tests
222+
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
223+
self._test_unpatched_converse_stream_wrapper()
224+
self._test_unpatched_extract_tool_calls()
225+
226+
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
227+
self._test_unpatched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
228+
self._test_unpatched_process_anthropic_claude_chunk(None, None)
229+
self._test_unpatched_process_anthropic_claude_chunk({}, {})
230+
231+
225232
def _test_unpatched_gevent_instrumentation(self):
226233
self.assertFalse(gevent.monkey.is_module_patched("os"), "gevent os module has been patched")
227234
self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched")
@@ -267,10 +274,14 @@ def _test_patched_botocore_instrumentation(self):
267274
# Bedrock Agent Operation
268275
self._test_patched_bedrock_agent_instrumentation()
269276

270-
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
271277
# Bedrock Runtime
278+
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
272279
self._test_patched_converse_stream_wrapper()
273280
self._test_patched_extract_tool_calls()
281+
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
282+
self._test_patched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
283+
self._test_patched_process_anthropic_claude_chunk(None, None)
284+
self._test_patched_process_anthropic_claude_chunk({}, {})
274285

275286
# Bedrock Agent Runtime
276287
self.assertTrue("bedrock-agent-runtime" in _KNOWN_EXTENSIONS)
@@ -679,6 +690,73 @@ def _test_patched_extract_tool_calls(self):
679690
result = bedrock_utils.extract_tool_calls(message_with_string_content, True)
680691
self.assertIsNone(result)
681692

693+
def _test_patched_process_anthropic_claude_chunk(self, input_value: Dict[str, str], expected_output: Dict[str, str]):
694+
self._test_process_anthropic_claude_chunk(input_value, expected_output, False)
695+
696+
def _test_unpatched_process_anthropic_claude_chunk(self, input_value: Dict[str, str], expected_output: Dict[str, str]):
697+
self._test_process_anthropic_claude_chunk(input_value, expected_output, True)
698+
699+
def _test_process_anthropic_claude_chunk(self, input_value: Dict[str, str], expected_output: Dict[str, str], expect_exception: bool):
700+
"""Test that _process_anthropic_claude_chunk handles various tool_use input formats."""
701+
wrapper = bedrock_utils.InvokeModelWithResponseStreamWrapper(
702+
stream=MagicMock(),
703+
stream_done_callback=MagicMock,
704+
stream_error_callback=MagicMock,
705+
model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
706+
)
707+
708+
# Simulate message_start
709+
wrapper._process_anthropic_claude_chunk(
710+
{
711+
"type": "message_start",
712+
"message": {
713+
"role": "assistant",
714+
"content": [],
715+
},
716+
}
717+
)
718+
719+
# Simulate content_block_start with specified input
720+
content_block = {
721+
"type": "tool_use",
722+
"id": "test_id",
723+
"name": "test_tool",
724+
}
725+
if input_value is not None:
726+
content_block["input"] = input_value
727+
728+
wrapper._process_anthropic_claude_chunk(
729+
{
730+
"type": "content_block_start",
731+
"index": 0,
732+
"content_block": content_block,
733+
}
734+
)
735+
736+
# Simulate content_block_stop
737+
try:
738+
wrapper._process_anthropic_claude_chunk(
739+
{"type": "content_block_stop", "index": 0}
740+
)
741+
except TypeError:
742+
if expect_exception:
743+
return
744+
else:
745+
raise
746+
747+
# Verify the message content
748+
self.assertEqual(len(wrapper._message["content"]), 1)
749+
tool_block = wrapper._message["content"][0]
750+
self.assertEqual(tool_block["type"], "tool_use")
751+
self.assertEqual(tool_block["id"], "test_id")
752+
self.assertEqual(tool_block["name"], "test_tool")
753+
754+
if expected_output is not None:
755+
self.assertEqual(tool_block["input"], expected_output)
756+
self.assertIsInstance(tool_block["input"], dict)
757+
else:
758+
self.assertNotIn("input", tool_block)
759+
682760
def _test_patched_bedrock_agent_instrumentation(self):
683761
"""For bedrock-agent service, both extract_attributes and on_success provides attributes,
684762
the attributes depend on the API being invoked."""

0 commit comments

Comments
 (0)