1
1
from __future__ import annotations
2
2
3
3
import concurrent .futures as cf
4
+ import copy
4
5
import functools
5
6
import io
6
7
import logging
@@ -48,8 +49,8 @@ class TracingQueueItem:
48
49
49
50
Attributes:
50
51
priority (str): The priority of the item.
51
- action (str): The action associated with the item.
52
52
item (Any): The item itself.
53
+ otel_context (Optional[Context]): The OTEL context of the item.
53
54
"""
54
55
55
56
priority : str
@@ -158,9 +159,27 @@ def _tracing_thread_handle_batch(
158
159
tracing_queue : Queue ,
159
160
batch : list [TracingQueueItem ],
160
161
use_multipart : bool ,
162
+ mark_task_done : bool = True ,
163
+ ops : Optional [
164
+ list [Union [SerializedRunOperation , SerializedFeedbackOperation ]]
165
+ ] = None ,
161
166
) -> None :
167
+ """Handle a batch of tracing queue items by sending them to LangSmith.
168
+
169
+ Args:
170
+ client: The LangSmith client to use for sending data.
171
+ tracing_queue: The queue containing tracing items (used for task_done calls).
172
+ batch: List of tracing queue items to process.
173
+ use_multipart: Whether to use multipart endpoint for sending data.
174
+ mark_task_done: Whether to mark queue tasks as done after processing.
175
+ Set to False when called from parallel execution to avoid double counting.
176
+ ops: Pre-combined serialized operations to use instead of combining from batch.
177
+ If None, operations will be combined from the batch items.
178
+ """
162
179
try :
163
- ops = combine_serialized_queue_operations ([item .item for item in batch ])
180
+ if ops is None :
181
+ ops = combine_serialized_queue_operations ([item .item for item in batch ])
182
+
164
183
if use_multipart :
165
184
client ._multipart_ingest_ops (ops )
166
185
else :
@@ -180,22 +199,45 @@ def _tracing_thread_handle_batch(
180
199
"Error details:" ,
181
200
exc_info = True ,
182
201
)
183
- # exceptions are logged elsewhere, but we need to make sure the
184
- # background thread continues to run
185
- pass
186
202
finally :
187
- for _ in batch :
188
- tracing_queue .task_done ()
203
+ if mark_task_done :
204
+ for _ in batch :
205
+ try :
206
+ tracing_queue .task_done ()
207
+ except ValueError as e :
208
+ if "task_done() called too many times" in str (e ):
209
+ # This can happen during shutdown when multiple threads
210
+ # process the same queue items. It's harmless.
211
+ logger .debug (
212
+ f"Ignoring harmless task_done error during shutdown: { e } "
213
+ )
214
+ else :
215
+ raise
189
216
190
217
191
218
def _otel_tracing_thread_handle_batch (
192
219
client : Client ,
193
220
tracing_queue : Queue ,
194
221
batch : list [TracingQueueItem ],
222
+ mark_task_done : bool = True ,
223
+ ops : Optional [
224
+ list [Union [SerializedRunOperation , SerializedFeedbackOperation ]]
225
+ ] = None ,
195
226
) -> None :
196
- """Handle a batch of tracing queue items by exporting them to OTEL."""
227
+ """Handle a batch of tracing queue items by exporting them to OTEL.
228
+
229
+ Args:
230
+ client: The LangSmith client containing the OTEL exporter.
231
+ tracing_queue: The queue containing tracing items (used for task_done calls).
232
+ batch: List of tracing queue items to process.
233
+ mark_task_done: Whether to mark queue tasks as done after processing.
234
+ Set to False when called from parallel execution to avoid double counting.
235
+ ops: Pre-combined serialized operations to use instead of combining from batch.
236
+ If None, operations will be combined from the batch items.
237
+ """
197
238
try :
198
- ops = combine_serialized_queue_operations ([item .item for item in batch ])
239
+ if ops is None :
240
+ ops = combine_serialized_queue_operations ([item .item for item in batch ])
199
241
200
242
run_ops = [op for op in ops if isinstance (op , SerializedRunOperation )]
201
243
otel_context_map = {
@@ -215,17 +257,129 @@ def _otel_tracing_thread_handle_batch(
215
257
216
258
except Exception :
217
259
logger .error (
218
- "LangSmith tracing error: Failed to submit OTEL trace data.\n "
260
+ "OTEL tracing error: Failed to submit trace data.\n "
219
261
"This does not affect your application's runtime.\n "
220
262
"Error details:" ,
221
263
exc_info = True ,
222
264
)
223
- # Exceptions are logged elsewhere, but we need to make sure the
224
- # background thread continues to run
225
265
finally :
226
- # Mark all items in the batch as done
266
+ if mark_task_done :
267
+ for _ in batch :
268
+ try :
269
+ tracing_queue .task_done ()
270
+ except ValueError as e :
271
+ if "task_done() called too many times" in str (e ):
272
+ # This can happen during shutdown when multiple threads
273
+ # process the same queue items. It's harmless.
274
+ logger .debug (
275
+ f"Ignoring harmless task_done error during shutdown: { e } "
276
+ )
277
+ else :
278
+ raise
279
+
280
+
281
+ def _hybrid_tracing_thread_handle_batch (
282
+ client : Client ,
283
+ tracing_queue : Queue ,
284
+ batch : list [TracingQueueItem ],
285
+ use_multipart : bool ,
286
+ mark_task_done : bool = True ,
287
+ ) -> None :
288
+ """Handle a batch of tracing queue items by sending to both both LangSmith and OTEL.
289
+
290
+ Args:
291
+ client: The LangSmith client to use for sending data.
292
+ tracing_queue: The queue containing tracing items (used for task_done calls).
293
+ batch: List of tracing queue items to process.
294
+ use_multipart: Whether to use multipart endpoint for LangSmith.
295
+ mark_task_done: Whether to mark queue tasks as done after processing.
296
+ Set to False primarily for testing when items weren't actually queued.
297
+ """
298
+ # Combine operations once to avoid race conditions
299
+ ops = combine_serialized_queue_operations ([item .item for item in batch ])
300
+
301
+ # Create copies for each thread to avoid shared mutation
302
+ langsmith_ops = copy .deepcopy (ops )
303
+ otel_ops = copy .deepcopy (ops )
304
+
305
+ # Use ThreadPoolExecutor for parallel execution
306
+ with cf .ThreadPoolExecutor (max_workers = 2 ) as executor :
307
+ # Submit both tasks
308
+ future_langsmith = executor .submit (
309
+ _tracing_thread_handle_batch ,
310
+ client ,
311
+ tracing_queue ,
312
+ batch ,
313
+ use_multipart ,
314
+ False , # Don't mark tasks done - we'll do it once at the end
315
+ langsmith_ops ,
316
+ )
317
+ future_otel = executor .submit (
318
+ _otel_tracing_thread_handle_batch ,
319
+ client ,
320
+ tracing_queue ,
321
+ batch ,
322
+ False , # Don't mark tasks done - we'll do it once at the end
323
+ otel_ops ,
324
+ )
325
+
326
+ # Wait for both to complete
327
+ future_langsmith .result ()
328
+ future_otel .result ()
329
+
330
+ # Mark all tasks as done once, only if requested
331
+ if mark_task_done :
227
332
for _ in batch :
228
- tracing_queue .task_done ()
333
+ try :
334
+ tracing_queue .task_done ()
335
+ except ValueError as e :
336
+ if "task_done() called too many times" in str (e ):
337
+ # This can happen during shutdown when multiple threads
338
+ # process the same queue items. It's harmless.
339
+ logger .debug (
340
+ f"Ignoring harmless task_done error during shutdown: { e } "
341
+ )
342
+ else :
343
+ raise
344
+
345
+
346
+ def _is_using_internal_otlp_provider (client : Client ) -> bool :
347
+ """Check if client is using LangSmith's internal OTLP provider.
348
+
349
+ Returns True if using LangSmith's internal provider, False if user
350
+ provided their own.
351
+ """
352
+ if not hasattr (client , "otel_exporter" ) or client .otel_exporter is None :
353
+ return False
354
+
355
+ try :
356
+ # Use OpenTelemetry's standard API to get the global TracerProvider
357
+ # Check if OTEL is available
358
+ if not ls_utils .is_truish (ls_utils .get_env_var ("OTEL_ENABLED" )):
359
+ return False
360
+
361
+ # Get the global TracerProvider and check its resource attributes
362
+ from opentelemetry import trace # type: ignore[import]
363
+
364
+ tracer_provider = trace .get_tracer_provider ()
365
+ if hasattr (tracer_provider , "resource" ) and hasattr (
366
+ tracer_provider .resource , "attributes"
367
+ ):
368
+ is_internal = tracer_provider .resource .attributes .get (
369
+ "langsmith.internal_provider" , False
370
+ )
371
+ logger .debug (
372
+ f"TracerProvider resource check: "
373
+ f"langsmith.internal_provider={ is_internal } "
374
+ )
375
+ return is_internal
376
+
377
+ return False
378
+ except Exception as e :
379
+ logger .debug (
380
+ f"Could not determine TracerProvider type: { e } , assuming user-provided"
381
+ )
382
+ return False
229
383
230
384
231
385
def get_size_limit_from_env () -> Optional [int ]:
@@ -267,6 +421,29 @@ def _ensure_ingest_config(
267
421
return default_config
268
422
269
423
424
+ def get_tracing_mode () -> tuple [bool , bool ]:
425
+ """Get the current tracing mode configuration.
426
+
427
+ Returns:
428
+ tuple[bool, bool]:
429
+ - hybrid_otel_and_langsmith: True if both OTEL and LangSmith tracing
430
+ are enabled, which is default behavior if OTEL_ENABLED is set to
431
+ true and OTEL_ONLY is not set to true
432
+ - is_otel_only: True if only OTEL tracing is enabled
433
+ """
434
+ otel_enabled = ls_utils .is_truish (ls_utils .get_env_var ("OTEL_ENABLED" ))
435
+ otel_only = ls_utils .is_truish (ls_utils .get_env_var ("OTEL_ONLY" ))
436
+
437
+ # If OTEL is not enabled, neither mode should be active
438
+ if not otel_enabled :
439
+ return False , False
440
+
441
+ hybrid_otel_and_langsmith = not otel_only
442
+ is_otel_only = otel_only
443
+
444
+ return hybrid_otel_and_langsmith , is_otel_only
445
+
446
+
270
447
def tracing_control_thread_func (client_ref : weakref .ref [Client ]) -> None :
271
448
client = client_ref ()
272
449
if client is None :
@@ -351,21 +528,41 @@ def keep_thread_active() -> bool:
351
528
)
352
529
sub_threads .append (new_thread )
353
530
new_thread .start ()
531
+
532
+ hybrid_otel_and_langsmith , is_otel_only = get_tracing_mode ()
354
533
if next_batch := _tracing_thread_drain_queue (tracing_queue , limit = size_limit ):
355
- if client .otel_exporter is not None :
534
+ if hybrid_otel_and_langsmith :
535
+ # Hybrid mode: both OTEL and LangSmith
536
+ _hybrid_tracing_thread_handle_batch (
537
+ client , tracing_queue , next_batch , use_multipart
538
+ )
539
+ elif is_otel_only :
540
+ # OTEL-only mode
356
541
_otel_tracing_thread_handle_batch (client , tracing_queue , next_batch )
357
542
else :
543
+ # LangSmith-only mode
358
544
_tracing_thread_handle_batch (
359
545
client , tracing_queue , next_batch , use_multipart
360
546
)
361
547
362
- # drain the queue on exit
548
+ # drain the queue on exit - apply same logic
549
+ hybrid_otel_and_langsmith , is_otel_only = get_tracing_mode ()
363
550
while next_batch := _tracing_thread_drain_queue (
364
551
tracing_queue , limit = size_limit , block = False
365
552
):
366
- if client .otel_exporter is not None :
553
+ if hybrid_otel_and_langsmith :
554
+ # Hybrid mode cleanup
555
+ logger .debug ("Hybrid mode cleanup" )
556
+ _hybrid_tracing_thread_handle_batch (
557
+ client , tracing_queue , next_batch , use_multipart
558
+ )
559
+ elif is_otel_only :
560
+ # OTEL-only cleanup
561
+ logger .debug ("OTEL-only cleanup" )
367
562
_otel_tracing_thread_handle_batch (client , tracing_queue , next_batch )
368
563
else :
564
+ # LangSmith-only cleanup
565
+ logger .debug ("LangSmith-only cleanup" )
369
566
_tracing_thread_handle_batch (
370
567
client , tracing_queue , next_batch , use_multipart
371
568
)
@@ -378,7 +575,7 @@ def tracing_control_thread_func_compress_parallel(
378
575
client = client_ref ()
379
576
if client is None :
380
577
return
381
-
578
+ logger . debug ( "Tracing control thread func compress parallel called" )
382
579
if (
383
580
client .compressed_traces is None
384
581
or client ._data_available_event is None
@@ -542,22 +739,41 @@ def _tracing_sub_thread_func(
542
739
):
543
740
if next_batch := _tracing_thread_drain_queue (tracing_queue , limit = size_limit ):
544
741
seen_successive_empty_queues = 0
545
- if client .otel_exporter is not None :
742
+
743
+ hybrid_otel_and_langsmith , is_otel_only = get_tracing_mode ()
744
+ if hybrid_otel_and_langsmith :
745
+ # Hybrid mode: both OTEL and LangSmith
746
+ _hybrid_tracing_thread_handle_batch (
747
+ client , tracing_queue , next_batch , use_multipart
748
+ )
749
+ elif is_otel_only :
750
+ # OTEL-only mode
546
751
_otel_tracing_thread_handle_batch (client , tracing_queue , next_batch )
547
752
else :
753
+ # LangSmith-only mode
548
754
_tracing_thread_handle_batch (
549
755
client , tracing_queue , next_batch , use_multipart
550
756
)
551
757
else :
552
758
seen_successive_empty_queues += 1
553
759
554
- # drain the queue on exit
760
+ # drain the queue on exit - apply same logic
761
+ hybrid_otel_and_langsmith , is_otel_only = get_tracing_mode ()
555
762
while next_batch := _tracing_thread_drain_queue (
556
763
tracing_queue , limit = size_limit , block = False
557
764
):
558
- if client .otel_exporter is not None :
765
+ if hybrid_otel_and_langsmith :
766
+ # Hybrid mode cleanup
767
+ _hybrid_tracing_thread_handle_batch (
768
+ client , tracing_queue , next_batch , use_multipart
769
+ )
770
+ elif is_otel_only :
771
+ # OTEL-only cleanup
772
+ logger .debug ("OTEL-only cleanup" )
559
773
_otel_tracing_thread_handle_batch (client , tracing_queue , next_batch )
560
774
else :
775
+ # LangSmith-only cleanup
776
+ logger .debug ("LangSmith-only cleanup" )
561
777
_tracing_thread_handle_batch (
562
778
client , tracing_queue , next_batch , use_multipart
563
779
)
0 commit comments