diff --git a/.github/workflows/build-assets.yml b/.github/workflows/build-assets.yml index bbdb68925..71b7ddc2c 100644 --- a/.github/workflows/build-assets.yml +++ b/.github/workflows/build-assets.yml @@ -103,7 +103,7 @@ jobs: "container_image": "rockylinux:9", "tag": "rocky", "archive_name_suffix": "-rocky", - "feature": "redis_events" + "feature": "redis" }, { "os": "ubuntu-22.04-arm", @@ -113,7 +113,7 @@ jobs: "container_image": null, "tag": "ubuntu", "archive_name_suffix": "", - "feature": "redis_events" + "feature": "redis" }, { "os": "ubuntu-22.04", @@ -163,7 +163,7 @@ jobs: "container_image": "rockylinux:9", "tag": "rocky", "archive_name_suffix": "-rocky", - "feature": "redis_events" + "feature": "redis" }, { "os": "ubuntu-22.04", @@ -173,7 +173,7 @@ jobs: "container_image": null, "tag": "ubuntu", "archive_name_suffix": "", - "feature": "redis_events" + "feature": "redis" } ] }' diff --git a/crates/scouter_sql/src/sql/schema.rs b/crates/scouter_sql/src/sql/schema.rs index 3742f3aa1..f97de1e7d 100644 --- a/crates/scouter_sql/src/sql/schema.rs +++ b/crates/scouter_sql/src/sql/schema.rs @@ -279,12 +279,12 @@ impl VersionResult { pub fn to_version(&self) -> Result { let mut version = Version::new(self.major as u64, self.minor as u64, self.patch as u64); - if self.pre_tag.is_some() { - version.pre = Prerelease::new(self.pre_tag.as_ref().unwrap())?; + if let Some(pre_tag) = &self.pre_tag { + version.pre = Prerelease::new(pre_tag)?; } - if self.build_tag.is_some() { - version.build = BuildMetadata::new(self.build_tag.as_ref().unwrap())?; + if let Some(build_tag) = &self.build_tag { + version.build = BuildMetadata::new(build_tag)?; } Ok(version) diff --git a/crates/scouter_tracing/src/tracer.rs b/crates/scouter_tracing/src/tracer.rs index dce097047..60be8f03f 100644 --- a/crates/scouter_tracing/src/tracer.rs +++ b/crates/scouter_tracing/src/tracer.rs @@ -51,6 +51,10 @@ use tracing::{debug, info, instrument, warn}; static TRACER_PROVIDER_STORE: RwLock>> = RwLock::new(None); static TRACE_METADATA_STORE: OnceLock = OnceLock::new(); +// Static ScouterQueue store for global access if needed +// This allows us to set the queue anytime get_tracer is called +static SCOUTER_QUEUE_STORE: RwLock>> = RwLock::new(None); + fn get_tracer_provider() -> Result>, TraceError> { TRACER_PROVIDER_STORE .read() @@ -165,6 +169,8 @@ fn get_trace_metadata_store() -> &'static TraceMetadataStore { /// * `transport_config` - Optional transport configuration for the Scouter exporter /// * `exporter` - Optional span exporter to use if you want to export spans to an OTLP collector /// * `batch_config` - Optional batch configuration for span exporting +/// * `sample_ratio` - Optional sampling ratio between 0.0 and 1.0 +/// * `scouter_queue` - Optional ScouterQueue to associate with the tracer for span queueing #[pyfunction] #[pyo3(signature = ( service_name="scouter_service".to_string(), @@ -173,6 +179,7 @@ fn get_trace_metadata_store() -> &'static TraceMetadataStore { exporter=None, batch_config=None, sample_ratio=None, + scouter_queue=None, ))] #[instrument(skip_all)] #[allow(clippy::too_many_arguments)] @@ -184,12 +191,12 @@ pub fn init_tracer( exporter: Option<&Bound<'_, PyAny>>, batch_config: Option>, sample_ratio: Option, -) -> Result<(), TraceError> { + scouter_queue: Option>, +) -> Result { debug!("Initializing tracer"); let transport_config = match transport_config { Some(config) => TransportConfig::from_py_config(config)?, None => { - // default to http transport config let config = HttpConfig::default(); TransportConfig::Http(config) } @@ -213,39 +220,43 @@ pub fn init_tracer( None }; - let mut store_guard = TRACER_PROVIDER_STORE - .write() - .map_err(|e| TraceError::PoisonError(e.to_string()))?; + // setupt the store provider in different scope to avoid deadlock + { + debug!("Setting up tracer provider store"); + let mut store_guard = TRACER_PROVIDER_STORE + .write() + .map_err(|e| TraceError::PoisonError(e.to_string()))?; - if store_guard.is_some() { - return Err(TraceError::InitializationError( - "Tracer provider already initialized. Call shutdown_tracer() first.".to_string(), - )); - } + if store_guard.is_some() { + return Err(TraceError::InitializationError( + "Tracer provider already initialized. Call shutdown_tracer() first.".to_string(), + )); + } - let resource = Resource::builder() - .with_service_name(service_name.clone()) - .with_attributes([KeyValue::new(SCOUTER_SCOPE, scope.clone())]) - .build(); + let resource = Resource::builder() + .with_service_name(service_name.clone()) + .with_attributes([KeyValue::new(SCOUTER_SCOPE, scope.clone())]) + .build(); - let scouter_export = ScouterSpanExporter::new(transport_config, &resource)?; + let scouter_export = ScouterSpanExporter::new(transport_config, &resource)?; - let mut span_exporter = if let Some(exporter) = exporter { - SpanExporterNum::from_pyobject(exporter).expect("failed to convert exporter") - } else { - SpanExporterNum::default() - }; + let mut span_exporter = if let Some(exporter) = exporter { + SpanExporterNum::from_pyobject(exporter).expect("failed to convert exporter") + } else { + SpanExporterNum::default() + }; - // set the sample ratio on the exporter (this will apply to both OTLP and Scouter exporters) - span_exporter.set_sample_ratio(clamped_sample_ratio); + span_exporter.set_sample_ratio(clamped_sample_ratio); - let provider = span_exporter - .build_provider(resource, scouter_export, batch_config) - .expect("failed to build tracer provider"); + let provider = span_exporter + .build_provider(resource, scouter_export, batch_config) + .expect("failed to build tracer provider"); - *store_guard = Some(Arc::new(provider)); + *store_guard = Some(Arc::new(provider)); + } - Ok(()) + // BaseTracer accesses store provider internally + BaseTracer::new(py, service_name, scouter_queue) } fn reset_current_context(py: Python, token: &Py) -> PyResult<()> { @@ -637,11 +648,33 @@ impl BaseTracer { impl BaseTracer { #[new] #[pyo3(signature = (name, queue=None))] - fn new(name: String, queue: Option>) -> Result { + fn new( + py: Python<'_>, + name: String, + queue: Option>, + ) -> Result { + debug!("Creating new BaseTracer instance"); let tracer = get_tracer(name)?; - // - Ok(BaseTracer { tracer, queue }) + // if queue is provided, set it on the tracer + let mut base_tracer = BaseTracer { + tracer, + queue: None, + }; + if let Some(queue) = queue { + // set queue on the tracer + base_tracer.set_scouter_queue(py, queue)?; + } else { + // check if global queue is set. If set, clone reference + let store_guard = SCOUTER_QUEUE_STORE + .read() + .map_err(|e| TraceError::PoisonError(e.to_string()))?; + if let Some(global_queue) = &*store_guard { + base_tracer.queue = Some(global_queue.clone_ref(py)); + } + } + + Ok(base_tracer) } pub fn set_scouter_queue( @@ -656,7 +689,14 @@ impl BaseTracer { let bound_queue = queue.bind(py); bound_queue.call_method1("_set_sample_ratio", (1.0,))?; + // update the store + let mut store_guard = SCOUTER_QUEUE_STORE + .write() + .map_err(|e| TraceError::PoisonError(e.to_string()))?; + *store_guard = Some(queue.clone_ref(py)); + self.queue = Some(queue); + Ok(()) } @@ -916,6 +956,13 @@ pub fn shutdown_tracer() -> Result<(), TraceError> { } get_trace_metadata_store().clear_all()?; + + // clear global scouter queue + let mut queue_store_guard = SCOUTER_QUEUE_STORE + .write() + .map_err(|e| TraceError::PoisonError(e.to_string()))?; + *queue_store_guard = None; + Ok(()) } diff --git a/py-scouter/tests/integration/api/conftest.py b/py-scouter/tests/integration/api/conftest.py index 22d1822c8..fce621f6d 100644 --- a/py-scouter/tests/integration/api/conftest.py +++ b/py-scouter/tests/integration/api/conftest.py @@ -365,7 +365,8 @@ async def lifespan(app: FastAPI): @app.post("/chat", response_model=TestResponse) async def chat(request: Request, payload: ChatRequest) -> TestResponse: - with tracer.start_as_current_span("genai_service") as active_span: + service_tracer = get_tracer("api-tracer") + with service_tracer.start_as_current_span("genai_service") as active_span: agent: Agent = request.app.state.agent prompt: Prompt = request.app.state.prompt