diff --git a/rust/cocoindex/src/ops/py_factory.rs b/rust/cocoindex/src/ops/py_factory.rs index 94ad4c88..01c3775c 100644 --- a/rust/cocoindex/src/ops/py_factory.rs +++ b/rust/cocoindex/src/ops/py_factory.rs @@ -81,7 +81,8 @@ impl PyFunctionExecutor { .transpose()? .as_ref(), ) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| format!("while calling user-configured function"))?; Ok(result.into_bound(py)) } } @@ -230,7 +231,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { PyTuple::new(py, args.into_iter())?, Some(&kwargs.into_py_dict(py)?), ) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| format!("while building user-configured function"))?; let (result_type, executor) = result .extract::<(crate::py::Pythonized, Py)>(py)?; Ok(( @@ -253,7 +255,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { Python::with_gil(|py| -> anyhow::Result<_> { let prepare_coro = executor .call_method(py, "prepare", (), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| format!("while preparing user-configured function"))?; let prepare_fut = from_py_future( py, &pyo3_async_runtimes::TaskLocals::new( @@ -360,6 +363,7 @@ impl interface::SourceExecutor for PySourceExecutor { py_source_executor .call_method(py, "list_async", (pythonize(py, options)?,), None) .to_result_with_py_trace(py) + .with_context(|| format!("while listing user-configured source")) })?; // Create a stream that pulls from the Python async iterator one item at a time @@ -399,7 +403,13 @@ impl interface::SourceExecutor for PySourceExecutor { ), None, ) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while fetching user-configured source for key: {:?}", + &key_clone + ) + })?; let task_locals = pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone()); Ok(from_py_future( @@ -439,7 +449,8 @@ impl PySourceExecutor { let next_item_coro = Python::with_gil(|py| -> Result<_> { let coro = py_async_iter .call_method0(py, "__anext__") - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| format!("while iterating over user-configured source"))?; let task_locals = pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone()); Ok(from_py_future(py, &task_locals, coro.into_bound(py))?) @@ -572,7 +583,7 @@ impl PySourceExecutor { impl interface::SourceFactory for PySourceConnectorFactory { async fn build( self: Arc, - _source_name: &str, + source_name: &str, spec: serde_json::Value, context: Arc, ) -> Result<( @@ -590,7 +601,13 @@ impl interface::SourceFactory for PySourceConnectorFactory { let value_type_result = self .py_source_connector .call_method(py, "get_table_type", (), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while fetching table type from user-configured source `{}`", + source_name + ) + })?; let table_type: schema::EnrichedValueType = depythonize(&value_type_result.into_bound(py))?; Ok(table_type) @@ -619,14 +636,20 @@ impl interface::SourceFactory for PySourceConnectorFactory { table_type.typ ), }; - + let source_name = source_name.to_string(); let executor_fut = async move { // Create the executor using the async create_executor method let create_future = Python::with_gil(|py| -> Result<_> { let create_coro = self .py_source_connector .call_method(py, "create_executor", (pythonize(py, &spec)?,), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while constructing executor for user-configured source `{}`", + source_name + ) + })?; let task_locals = pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone()); let create_future = from_py_future(py, &task_locals, create_coro.into_bound(py))?; @@ -637,13 +660,25 @@ impl interface::SourceFactory for PySourceConnectorFactory { let (py_source_executor_context, provides_ordinal) = Python::with_gil(|py| -> Result<_> { - let executor_context = - py_executor_context_result.to_result_with_py_trace(py)?; + let executor_context = py_executor_context_result + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while getting executor context for user-configured source `{}`", + source_name + ) + })?; // Get provides_ordinal from the executor context let provides_ordinal = executor_context .call_method(py, "provides_ordinal", (), None) - .to_result_with_py_trace(py)? + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while calling provides_ordinal for user-configured source `{}`", + source_name + ) + })? .extract::(py)?; Ok((executor_context, provides_ordinal)) @@ -743,20 +778,38 @@ impl interface::TargetFactory for PyExportTargetFactory { ), None, ) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while setting up export context for user-configured target `{}`", + &data_collection.name + ) + })?; // Call the `get_persistent_key` method to get the persistent key. let persistent_key = self .py_target_connector .call_method(py, "get_persistent_key", (&py_export_ctx,), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while getting persistent key for user-configured target `{}`", + &data_collection.name + ) + })?; let persistent_key: serde_json::Value = depythonize(&persistent_key.into_bound(py))?; let setup_state = self .py_target_connector .call_method(py, "get_setup_state", (&py_export_ctx,), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while getting setup state for user-configured target `{}`", + &data_collection.name + ) + })?; let setup_state: serde_json::Value = depythonize(&setup_state.into_bound(py))?; anyhow::Ok((py_export_ctx, persistent_key, setup_state)) @@ -770,7 +823,13 @@ impl interface::TargetFactory for PyExportTargetFactory { let prepare_coro = factory .py_target_connector .call_method(py, "prepare_async", (&py_export_ctx,), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!( + "while preparing user-configured target `{}`", + &data_collection.name + ) + })?; let task_locals = pyo3_async_runtimes::TaskLocals::new( py_exec_ctx.event_loop.bind(py).clone(), ); @@ -839,7 +898,10 @@ impl interface::TargetFactory for PyExportTargetFactory { ), None, ) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!("while calling check_state_compatibility in user-configured target") + })?; let compatibility: SetupStateCompatibility = depythonize(&result.into_bound(py))?; Ok(compatibility) })?; @@ -851,7 +913,10 @@ impl interface::TargetFactory for PyExportTargetFactory { let result = self .py_target_connector .call_method(py, "describe_resource", (pythonize(py, key)?,), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!("while calling describe_resource in user-configured target") + })?; let description = result.extract::(py)?; Ok(description) }) @@ -908,7 +973,10 @@ impl interface::TargetFactory for PyExportTargetFactory { (pythonize(py, &setup_changes)?,), None, ) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| { + format!("while calling apply_setup_changes_async in user-configured target") + })?; let task_locals = pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone()); Ok(from_py_future( @@ -918,7 +986,11 @@ impl interface::TargetFactory for PyExportTargetFactory { )?) })? .await; - Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?; + Python::with_gil(move |py| { + py_result + .to_result_with_py_trace(py) + .with_context(|| format!("while applying setup changes in user-configured target")) + })?; Ok(()) } @@ -969,7 +1041,8 @@ impl interface::TargetFactory for PyExportTargetFactory { let result_coro = self .py_target_connector .call_method(py, "mutate_async", (py_args,), None) - .to_result_with_py_trace(py)?; + .to_result_with_py_trace(py) + .with_context(|| format!("while calling mutate_async in user-configured target"))?; let task_locals = pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone()); Ok(from_py_future( @@ -980,7 +1053,11 @@ impl interface::TargetFactory for PyExportTargetFactory { })? .await; - Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?; + Python::with_gil(move |py| { + py_result + .to_result_with_py_trace(py) + .with_context(|| format!("while applying mutations in user-configured target")) + })?; Ok(()) } }