Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .sg/rules/core-raising-dispatch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ ignores:
# Allow-listed existing callsites
- "ddtrace/contrib/internal/langchain/patch.py"
- "ddtrace/contrib/internal/openai/patch.py"
# Tests for raising_dispatch itself
- "tests/internal/test_context_events_api.py"
rule:
any:
# Match function calls to raising_dispatch
Expand Down
72 changes: 72 additions & 0 deletions benchmarks/core_api/config.yaml
Original file line number Diff line number Diff line change
@@ -1,36 +1,108 @@
# --- has_listeners microbenchmark ---
has_listeners_no_listeners:
listeners: 0
set_item_count: 0
get_item_exists: false
listener_raises: false
has_listeners_with_listeners:
listeners: 10
set_item_count: 0
get_item_exists: false
listener_raises: false

# --- dispatch: listener count sweep ---
core_dispatch_no_listeners:
listeners: 0
set_item_count: 0
get_item_exists: false
listener_raises: false
core_dispatch_1_listener:
listeners: 1
set_item_count: 0
get_item_exists: false
listener_raises: false
core_dispatch_listeners:
listeners: 10
set_item_count: 0
get_item_exists: false
listener_raises: false
core_dispatch_50_listeners:
listeners: 50
set_item_count: 0
get_item_exists: false
listener_raises: false

# --- dispatch: empty args tuple ---
core_dispatch_no_args_no_listeners:
listeners: 0
set_item_count: 0
get_item_exists: false
listener_raises: false
core_dispatch_no_args_listeners:
listeners: 10
set_item_count: 0
get_item_exists: false
listener_raises: false

# --- dispatch: exception path (listeners raise, config._raise=False) ---
core_dispatch_exception_no_listeners:
listeners: 0
set_item_count: 0
get_item_exists: false
listener_raises: true
core_dispatch_exception_listeners:
listeners: 10
set_item_count: 0
get_item_exists: false
listener_raises: true

# --- dispatch_with_results: listener count sweep ---
core_dispatch_with_results_no_listeners:
listeners: 0
set_item_count: 0
get_item_exists: false
listener_raises: false
core_dispatch_with_results_1_listener:
listeners: 1
set_item_count: 0
get_item_exists: false
listener_raises: false
core_dispatch_with_results_listeners:
listeners: 10
set_item_count: 0
get_item_exists: false
listener_raises: false
core_dispatch_with_results_50_listeners:
listeners: 50
set_item_count: 0
get_item_exists: false
listener_raises: false

# --- context_with_data ---
context_with_data_no_listeners:
listeners: 0
set_item_count: 0
get_item_exists: false
listener_raises: false
context_with_data_listeners:
listeners: 10
set_item_count: 0
get_item_exists: false
listener_raises: false

# --- context item get/set ---
set_item:
listeners: 0
set_item_count: 100
get_item_exists: false
listener_raises: false
get_item_missing:
listeners: 0
set_item_count: 0
get_item_exists: false
listener_raises: false
get_item_exists:
listeners: 0
set_item_count: 0
get_item_exists: true
listener_raises: false
28 changes: 24 additions & 4 deletions benchmarks/core_api/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ class CoreAPIScenario(bm.Scenario):
listeners: int
set_item_count: int
get_item_exists: bool
listener_raises: bool # whether registered listeners raise exceptions

def run(self):
# Activate a number of no-op listeners for known events
for _ in range(self.listeners):
if self.listener_raises:

def listener(*_):
pass
def listener(*_):
raise ValueError("benchmark listener exception")

else:

def listener(*_):
pass

core.on(CUSTOM_EVENT_NAME, listener)
core.on("context.started.with_data", listener)
Expand All @@ -28,6 +34,16 @@ def listener(*_):
if self.get_item_exists:
core.set_item("key", "value")

def has_listeners(loops):
"""Measure the cost of core.has_listeners"""
for _ in range(loops):
core.has_listeners(CUSTOM_EVENT_NAME)

def core_dispatch_no_args(loops):
"""Measure dispatch cost with an empty args tuple"""
for _ in range(loops):
core.dispatch(CUSTOM_EVENT_NAME, ())

def core_dispatch(loops):
"""Measure the cost to dispatch an event on the hub"""
for _ in range(loops):
Expand Down Expand Up @@ -59,7 +75,11 @@ def get_item(loops):
for _ in range(loops):
core.find_item("key")

if "core_dispatch_with_results" in self.scenario_name:
if "has_listeners" in self.scenario_name:
yield has_listeners
elif "core_dispatch_no_args" in self.scenario_name:
yield core_dispatch_no_args
elif "core_dispatch_with_results" in self.scenario_name:
yield core_dispatch_with_results
elif "core_dispatch" in self.scenario_name:
yield core_dispatch
Expand Down
10 changes: 5 additions & 5 deletions ddtrace/contrib/internal/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def prep_context_injection(ctx, endpoint_name, operation, trace_operation, param

core.dispatch(
"botocore.prep_context_injection.post",
[ctx, cloud_service, schematization_function, injection_function, trace_operation],
(ctx, cloud_service, schematization_function, injection_function, trace_operation),
)


Expand Down Expand Up @@ -274,7 +274,7 @@ def patched_api_call_fallback(original_func, instance, args, kwargs, function_va
) as ctx,
ctx.span,
):
core.dispatch("botocore.patched_api_call.started", [ctx])
core.dispatch("botocore.patched_api_call.started", (ctx,))
if args and config.botocore["distributed_tracing"]:
prep_context_injection(ctx, endpoint_name, operation, trace_operation, params)

Expand All @@ -283,14 +283,14 @@ def patched_api_call_fallback(original_func, instance, args, kwargs, function_va
except botocore.exceptions.ClientError as e:
core.dispatch(
"botocore.patched_api_call.exception",
[
(
ctx,
e.response,
botocore.exceptions.ClientError,
config.botocore.operations[ctx.span.resource].is_error_code,
],
),
)
raise
else:
core.dispatch("botocore.patched_api_call.success", [ctx, result])
core.dispatch("botocore.patched_api_call.success", (ctx, result))
return result
31 changes: 11 additions & 20 deletions ddtrace/contrib/internal/botocore/services/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ def traced_stream_read(traced_stream, original_read, amt=None):
handler.chunks.append(json.loads(body))
if traced_stream.__wrapped__.tell() == int(traced_stream.__wrapped__._content_length):
formatted_response = _extract_text_and_response_reason(execution_ctx, handler.chunks[0])
core.dispatch(
"botocore.bedrock.process_response",
[execution_ctx, formatted_response],
)
core.dispatch("botocore.bedrock.process_response", (execution_ctx, formatted_response))
return body
except Exception:
core.dispatch("botocore.patched_bedrock_api_call.exception", [execution_ctx, sys.exc_info()])
core.dispatch("botocore.patched_bedrock_api_call.exception", (execution_ctx, sys.exc_info()))
raise


Expand All @@ -53,13 +50,10 @@ def traced_stream_readlines(traced_stream, original_readlines):
for line in lines:
handler.chunks.append(json.loads(line))
formatted_response = _extract_text_and_response_reason(execution_ctx, handler.chunks[0])
core.dispatch(
"botocore.bedrock.process_response",
[execution_ctx, formatted_response],
)
core.dispatch("botocore.bedrock.process_response", (execution_ctx, formatted_response))
return lines
except Exception:
core.dispatch("botocore.patched_bedrock_api_call.exception", [execution_ctx, sys.exc_info()])
core.dispatch("botocore.patched_bedrock_api_call.exception", (execution_ctx, sys.exc_info()))
raise


Expand All @@ -69,18 +63,15 @@ def process_chunk(self, chunk, iterator=None):

def handle_exception(self, exception):
core.dispatch(
"botocore.patched_bedrock_api_call.exception", [self.options.get("execution_ctx", {}), sys.exc_info()]
"botocore.patched_bedrock_api_call.exception", (self.options.get("execution_ctx", {}), sys.exc_info())
)

def finalize_stream(self, exception=None):
if exception:
return
execution_ctx = self.options.get("execution_ctx", {})
formatted_response = _extract_streamed_response(execution_ctx, self.chunks)
core.dispatch(
"botocore.bedrock.process_response",
[execution_ctx, formatted_response],
)
core.dispatch("botocore.bedrock.process_response", (execution_ctx, formatted_response))


class BotocoreConverseStreamHandler(StreamHandler):
Expand All @@ -92,14 +83,14 @@ def process_chunk(self, chunk: dict[str, Any], iterator=None):
def handle_exception(self, exception):
stream_processor = self.options.get("stream_processor", None)
execution_ctx = self.options.get("execution_ctx", {})
core.dispatch("botocore.bedrock.process_response_converse", [execution_ctx, stream_processor])
core.dispatch("botocore.bedrock.process_response_converse", (execution_ctx, stream_processor))

def finalize_stream(self, exception=None):
if exception:
return
stream_processor = self.options.get("stream_processor", None)
execution_ctx = self.options.get("execution_ctx", {})
core.dispatch("botocore.bedrock.process_response_converse", [execution_ctx, stream_processor])
core.dispatch("botocore.bedrock.process_response_converse", (execution_ctx, stream_processor))


def make_botocore_streaming_body_traced_stream(streaming_body, execution_ctx):
Expand Down Expand Up @@ -391,7 +382,7 @@ def handle_bedrock_request(ctx: core.ExecutionContext) -> None:
else _extract_request_params_for_invoke(ctx["params"], ctx["model_provider"])
)

core.dispatch("botocore.patched_bedrock_api_call.started", [ctx, request_params])
core.dispatch("botocore.patched_bedrock_api_call.started", (ctx, request_params))
if ctx["bedrock_integration"].llmobs_enabled:
ctx.set_item("llmobs.request_params", request_params)

Expand Down Expand Up @@ -441,7 +432,7 @@ def handle_bedrock_response(
)

if ctx["resource"] == "Converse":
core.dispatch("botocore.bedrock.process_response_converse", [ctx, result])
core.dispatch("botocore.bedrock.process_response_converse", (ctx, result))
return result
if ctx["resource"] == "ConverseStream":
if "stream" in result:
Expand Down Expand Up @@ -483,5 +474,5 @@ def patched_bedrock_api_call(original_func, instance, args, kwargs, function_var
result = handle_bedrock_response(ctx, result)
return result
except Exception:
core.dispatch("botocore.patched_bedrock_api_call.exception", [ctx, sys.exc_info()])
core.dispatch("botocore.patched_bedrock_api_call.exception", (ctx, sys.exc_info()))
raise
20 changes: 10 additions & 10 deletions ddtrace/contrib/internal/botocore/services/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def update_record(ctx, record: dict[str, Any], stream: str, inject_trace_context
if data_obj is not None:
core.dispatch(
"botocore.kinesis.update_record",
[ctx, stream, data_obj, record, inject_trace_context],
(ctx, stream, data_obj, record, inject_trace_context),
)

try:
Expand Down Expand Up @@ -86,7 +86,7 @@ def _patched_kinesis_api_call(parent_ctx, original_func, instance, args, kwargs,
try:
start_ns = time_ns()
is_getrecords_call = True
core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params])
core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", (params,))
result = original_func(*args, **kwargs)

records = result["Records"]
Expand All @@ -96,7 +96,7 @@ def _patched_kinesis_api_call(parent_ctx, original_func, instance, args, kwargs,
time_estimate = record.get("ApproximateArrivalTimestamp", datetime.now()).timestamp()
core.dispatch(
f"botocore.{endpoint_name}.{operation}.post",
[
(
parent_ctx,
params,
time_estimate,
Expand All @@ -105,7 +105,7 @@ def _patched_kinesis_api_call(parent_ctx, original_func, instance, args, kwargs,
result,
config.botocore.propagation_enabled,
extract_DD_json,
],
),
)

except Exception as e:
Expand Down Expand Up @@ -156,7 +156,7 @@ def _patched_kinesis_api_call(parent_ctx, original_func, instance, args, kwargs,
) as ctx,
ctx.span,
):
core.dispatch("botocore.patched_kinesis_api_call.started", [ctx])
core.dispatch("botocore.patched_kinesis_api_call.started", (ctx,))

if is_kinesis_put_operation:
records_to_process = select_records_for_injection(params, bool(config.botocore["distributed_tracing"]))
Expand All @@ -165,25 +165,25 @@ def _patched_kinesis_api_call(parent_ctx, original_func, instance, args, kwargs,

try:
if not is_getrecords_call:
core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params])
core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", (params,))
result = original_func(*args, **kwargs)
core.dispatch(f"botocore.{endpoint_name}.{operation}.post", [params, result])
core.dispatch(f"botocore.{endpoint_name}.{operation}.post", (params, result))

if getrecords_error:
raise getrecords_error

core.dispatch("botocore.patched_kinesis_api_call.success", [ctx, result])
core.dispatch("botocore.patched_kinesis_api_call.success", (ctx, result))
return result

except botocore.exceptions.ClientError as e:
core.dispatch(
"botocore.patched_kinesis_api_call.exception",
[
(
ctx,
e.response,
botocore.exceptions.ClientError,
config.botocore.operations[ctx.span.resource].is_error_code,
],
),
)
raise
elif is_getrecords_call:
Expand Down
Loading
Loading