Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build-assets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
"container_image": "rockylinux:9",
"tag": "rocky",
"archive_name_suffix": "-rocky",
"feature": "redis_events"
"feature": "redis"
},
{
"os": "ubuntu-22.04-arm",
Expand All @@ -113,7 +113,7 @@ jobs:
"container_image": null,
"tag": "ubuntu",
"archive_name_suffix": "",
"feature": "redis_events"
"feature": "redis"
},
{
"os": "ubuntu-22.04",
Expand Down Expand Up @@ -163,7 +163,7 @@ jobs:
"container_image": "rockylinux:9",
"tag": "rocky",
"archive_name_suffix": "-rocky",
"feature": "redis_events"
"feature": "redis"
},
{
"os": "ubuntu-22.04",
Expand All @@ -173,7 +173,7 @@ jobs:
"container_image": null,
"tag": "ubuntu",
"archive_name_suffix": "",
"feature": "redis_events"
"feature": "redis"
}
]
}'
Expand Down
8 changes: 4 additions & 4 deletions crates/scouter_sql/src/sql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ impl VersionResult {
pub fn to_version(&self) -> Result<Version, SqlError> {
let mut version = Version::new(self.major as u64, self.minor as u64, self.patch as u64);

if self.pre_tag.is_some() {
version.pre = Prerelease::new(self.pre_tag.as_ref().unwrap())?;
if let Some(pre_tag) = &self.pre_tag {
version.pre = Prerelease::new(pre_tag)?;
}

if self.build_tag.is_some() {
version.build = BuildMetadata::new(self.build_tag.as_ref().unwrap())?;
if let Some(build_tag) = &self.build_tag {
version.build = BuildMetadata::new(build_tag)?;
}

Ok(version)
Expand Down
107 changes: 77 additions & 30 deletions crates/scouter_tracing/src/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ use tracing::{debug, info, instrument, warn};
static TRACER_PROVIDER_STORE: RwLock<Option<Arc<SdkTracerProvider>>> = RwLock::new(None);
static TRACE_METADATA_STORE: OnceLock<TraceMetadataStore> = OnceLock::new();

// Static ScouterQueue store for global access if needed
// This allows us to set the queue anytime get_tracer is called
static SCOUTER_QUEUE_STORE: RwLock<Option<Py<ScouterQueue>>> = RwLock::new(None);

fn get_tracer_provider() -> Result<Option<Arc<SdkTracerProvider>>, TraceError> {
TRACER_PROVIDER_STORE
.read()
Expand Down Expand Up @@ -165,6 +169,8 @@ fn get_trace_metadata_store() -> &'static TraceMetadataStore {
/// * `transport_config` - Optional transport configuration for the Scouter exporter
/// * `exporter` - Optional span exporter to use if you want to export spans to an OTLP collector
/// * `batch_config` - Optional batch configuration for span exporting
/// * `sample_ratio` - Optional sampling ratio between 0.0 and 1.0
/// * `scouter_queue` - Optional ScouterQueue to associate with the tracer for span queueing
#[pyfunction]
#[pyo3(signature = (
service_name="scouter_service".to_string(),
Expand All @@ -173,6 +179,7 @@ fn get_trace_metadata_store() -> &'static TraceMetadataStore {
exporter=None,
batch_config=None,
sample_ratio=None,
scouter_queue=None,
))]
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
Expand All @@ -184,12 +191,12 @@ pub fn init_tracer(
exporter: Option<&Bound<'_, PyAny>>,
batch_config: Option<Py<BatchConfig>>,
sample_ratio: Option<f64>,
) -> Result<(), TraceError> {
scouter_queue: Option<Py<ScouterQueue>>,
) -> Result<BaseTracer, TraceError> {
debug!("Initializing tracer");
let transport_config = match transport_config {
Some(config) => TransportConfig::from_py_config(config)?,
None => {
// default to http transport config
let config = HttpConfig::default();
TransportConfig::Http(config)
}
Expand All @@ -213,39 +220,43 @@ pub fn init_tracer(
None
};

let mut store_guard = TRACER_PROVIDER_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
// setupt the store provider in different scope to avoid deadlock
{
debug!("Setting up tracer provider store");
let mut store_guard = TRACER_PROVIDER_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;

if store_guard.is_some() {
return Err(TraceError::InitializationError(
"Tracer provider already initialized. Call shutdown_tracer() first.".to_string(),
));
}
if store_guard.is_some() {
return Err(TraceError::InitializationError(
"Tracer provider already initialized. Call shutdown_tracer() first.".to_string(),
));
}

let resource = Resource::builder()
.with_service_name(service_name.clone())
.with_attributes([KeyValue::new(SCOUTER_SCOPE, scope.clone())])
.build();
let resource = Resource::builder()
.with_service_name(service_name.clone())
.with_attributes([KeyValue::new(SCOUTER_SCOPE, scope.clone())])
.build();

let scouter_export = ScouterSpanExporter::new(transport_config, &resource)?;
let scouter_export = ScouterSpanExporter::new(transport_config, &resource)?;

let mut span_exporter = if let Some(exporter) = exporter {
SpanExporterNum::from_pyobject(exporter).expect("failed to convert exporter")
} else {
SpanExporterNum::default()
};
let mut span_exporter = if let Some(exporter) = exporter {
SpanExporterNum::from_pyobject(exporter).expect("failed to convert exporter")
} else {
SpanExporterNum::default()
};

// set the sample ratio on the exporter (this will apply to both OTLP and Scouter exporters)
span_exporter.set_sample_ratio(clamped_sample_ratio);
span_exporter.set_sample_ratio(clamped_sample_ratio);

let provider = span_exporter
.build_provider(resource, scouter_export, batch_config)
.expect("failed to build tracer provider");
let provider = span_exporter
.build_provider(resource, scouter_export, batch_config)
.expect("failed to build tracer provider");

*store_guard = Some(Arc::new(provider));
*store_guard = Some(Arc::new(provider));
}

Ok(())
// BaseTracer accesses store provider internally
BaseTracer::new(py, service_name, scouter_queue)
}

fn reset_current_context(py: Python, token: &Py<PyAny>) -> PyResult<()> {
Expand Down Expand Up @@ -637,11 +648,33 @@ impl BaseTracer {
impl BaseTracer {
#[new]
#[pyo3(signature = (name, queue=None))]
fn new(name: String, queue: Option<Py<ScouterQueue>>) -> Result<Self, TraceError> {
fn new(
py: Python<'_>,
name: String,
queue: Option<Py<ScouterQueue>>,
) -> Result<Self, TraceError> {
debug!("Creating new BaseTracer instance");
let tracer = get_tracer(name)?;

//
Ok(BaseTracer { tracer, queue })
// if queue is provided, set it on the tracer
let mut base_tracer = BaseTracer {
tracer,
queue: None,
};
if let Some(queue) = queue {
// set queue on the tracer
base_tracer.set_scouter_queue(py, queue)?;
} else {
// check if global queue is set. If set, clone reference
let store_guard = SCOUTER_QUEUE_STORE
.read()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
if let Some(global_queue) = &*store_guard {
base_tracer.queue = Some(global_queue.clone_ref(py));
}
}

Ok(base_tracer)
}

pub fn set_scouter_queue(
Expand All @@ -656,7 +689,14 @@ impl BaseTracer {
let bound_queue = queue.bind(py);
bound_queue.call_method1("_set_sample_ratio", (1.0,))?;

// update the store
let mut store_guard = SCOUTER_QUEUE_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
*store_guard = Some(queue.clone_ref(py));

self.queue = Some(queue);

Ok(())
}

Expand Down Expand Up @@ -916,6 +956,13 @@ pub fn shutdown_tracer() -> Result<(), TraceError> {
}

get_trace_metadata_store().clear_all()?;

// clear global scouter queue
let mut queue_store_guard = SCOUTER_QUEUE_STORE
.write()
.map_err(|e| TraceError::PoisonError(e.to_string()))?;
*queue_store_guard = None;

Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion py-scouter/tests/integration/api/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ async def lifespan(app: FastAPI):

@app.post("/chat", response_model=TestResponse)
async def chat(request: Request, payload: ChatRequest) -> TestResponse:
with tracer.start_as_current_span("genai_service") as active_span:
service_tracer = get_tracer("api-tracer")
with service_tracer.start_as_current_span("genai_service") as active_span:
agent: Agent = request.app.state.agent
prompt: Prompt = request.app.state.prompt

Expand Down