Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ class _SourceRefreshOptions:
class _ExecutionOptions:
max_inflight_rows: int | None = None
max_inflight_bytes: int | None = None
timeout: datetime.timedelta | None = None


class FlowBuilder:
Expand Down
10 changes: 10 additions & 0 deletions python/cocoindex/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from .runtime import to_async_call
from .index import IndexOptions
import datetime


class OpCategory(Enum):
Expand Down Expand Up @@ -154,6 +155,7 @@ class OpArgs:
- max_batch_size: The maximum batch size for the executor. Only valid if `batching` is True.
- behavior_version: The behavior version of the executor. Cache will be invalidated if it
changes. Must be provided if `cache` is True.
- timeout: Timeout in seconds for this function execution. None means use default (300s).
- arg_relationship: It specifies the relationship between an input argument and the output,
e.g. `(ArgRelationship.CHUNKS_BASE_TEXT, "content")` means the output is chunks for the
input argument with name `content`.
Expand All @@ -164,6 +166,7 @@ class OpArgs:
batching: bool = False
max_batch_size: int | None = None
behavior_version: int | None = None
timeout: datetime.timedelta | None = None
arg_relationship: tuple[ArgRelationship, str] | None = None


Expand Down Expand Up @@ -202,6 +205,7 @@ def _register_op_factory(

class _WrappedExecutor:
_executor: Any
_spec: Any
_args_info: list[_ArgInfo]
_kwargs_info: dict[str, _ArgInfo]
_result_encoder: Callable[[Any], Any]
Expand Down Expand Up @@ -391,6 +395,12 @@ def enable_cache(self) -> bool:
def behavior_version(self) -> int | None:
return op_args.behavior_version

def timeout(self) -> datetime.timedelta | None:
if op_args.timeout is not None:
return op_args.timeout

return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can directly write return op_args.timeout here.


def batching_options(self) -> dict[str, Any] | None:
if op_args.batching:
return {
Expand Down
6 changes: 6 additions & 0 deletions rust/cocoindex/src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ pub struct ExecutionOptions {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_inflight_bytes: Option<usize>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout: Option<std::time::Duration>,
}

impl ExecutionOptions {
Expand Down Expand Up @@ -289,6 +292,9 @@ impl fmt::Display for ImportOpSpec {
pub struct TransformOpSpec {
pub inputs: Vec<OpArgBinding>,
pub op: OpSpec,

#[serde(default)]
pub execution_options: ExecutionOptions,
}

impl SpecFormatter for TransformOpSpec {
Expand Down
10 changes: 10 additions & 0 deletions rust/cocoindex/src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use crate::{
};
use futures::future::{BoxFuture, try_join3};
use futures::{FutureExt, future::try_join_all};
use std::time::Duration;
use utils::fingerprint::Fingerprinter;

const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);

#[derive(Debug)]
pub(super) enum ValueTypeBuilder {
Basic(BasicValueType),
Expand Down Expand Up @@ -804,16 +807,22 @@ impl AnalyzerContext {
let output =
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
let op_name = reactive_op_name.clone();
let op_kind = op.op.kind.clone();
let execution_options_timeout = op.execution_options.timeout;
async move {
trace!("Start building executor for transform op `{op_name}`");
let executor = executor.await.with_context(|| {
format!("Preparing for transform op: {op_name}")
})?;
let enable_cache = executor.enable_cache();
let behavior_version = executor.behavior_version();
let timeout = executor.timeout()
.or(execution_options_timeout)
.or(Some(TIMEOUT_THRESHOLD));
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
let function_exec_info = AnalyzedFunctionExecInfo {
enable_cache,
timeout,
behavior_version,
fingerprinter: logic_fingerprinter
.with(&behavior_version)?,
Expand All @@ -828,6 +837,7 @@ impl AnalyzerContext {
}
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
name: op_name,
op_kind,
inputs: input_value_mappings,
function_exec_info,
executor,
Expand Down
1 change: 1 addition & 0 deletions rust/cocoindex/src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ impl FlowBuilder {
})
.collect(),
op: spec,
execution_options: Default::default(),
}),
};

Expand Down
3 changes: 3 additions & 0 deletions rust/cocoindex/src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::base::spec::FieldName;
use crate::prelude::*;

use crate::ops::interface::*;
use std::time::Duration;
use utils::fingerprint::{Fingerprint, Fingerprinter};

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
Expand Down Expand Up @@ -64,6 +65,7 @@ pub struct AnalyzedImportOp {

pub struct AnalyzedFunctionExecInfo {
pub enable_cache: bool,
pub timeout: Option<Duration>,
pub behavior_version: Option<u32>,

/// Fingerprinter of the function's behavior.
Expand All @@ -74,6 +76,7 @@ pub struct AnalyzedFunctionExecInfo {

pub struct AnalyzedTransformOp {
pub name: String,
pub op_kind: String,
pub inputs: Vec<AnalyzedValueMapping>,
pub function_exec_info: AnalyzedFunctionExecInfo,
pub executor: Box<dyn SimpleFunctionExecutor>,
Expand Down
85 changes: 74 additions & 11 deletions rust/cocoindex/src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::prelude::*;

use anyhow::{Context, Ok};
use futures::future::try_join_all;
use log::warn;
use tokio::time::Duration;

use crate::base::value::EstimatedByteSize;
use crate::base::{schema, value};
Expand All @@ -11,6 +13,9 @@ use utils::immutable::RefList;

use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};

const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
const WARNING_THRESHOLD: Duration = Duration::from_secs(30);

#[derive(Debug)]
pub struct ScopeValueBuilder {
// TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity.
Expand Down Expand Up @@ -356,6 +361,43 @@ async fn evaluate_child_op_scope(
})
}

async fn evaluate_with_timeout_and_warning<F, T>(
eval_future: F,
timeout_duration: Duration,
warn_duration: Duration,
op_kind: String,
op_name: String,
) -> Result<T>
where
F: std::future::Future<Output = Result<T>>,
{
let mut eval_future = Box::pin(eval_future);
let mut warned = false;
let timeout_future = tokio::time::sleep(timeout_duration);
tokio::pin!(timeout_future);

loop {
tokio::select! {
res = &mut eval_future => {
return res;
}
_ = &mut timeout_future => {
return Err(anyhow!(
"Function '{}' ({}) timed out after {} seconds",
op_kind, op_name, timeout_duration.as_secs()
));
}
_ = tokio::time::sleep(warn_duration), if !warned => {
warn!(
"Function '{}' ({}) is taking longer than {}s",
op_kind, op_name, WARNING_THRESHOLD.as_secs()
);
warned = true;
}
}
}
}

async fn evaluate_op_scope(
op_scope: &AnalyzedOpScope,
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
Expand All @@ -378,6 +420,12 @@ async fn evaluate_op_scope(
input_values.push(value?);
}

let timeout_duration = op.function_exec_info.timeout.unwrap_or(TIMEOUT_THRESHOLD);
let warn_duration = WARNING_THRESHOLD;

let op_name_for_warning = op.name.clone();
let op_kind_for_warning = op.op_kind.clone();

let result = if op.function_exec_info.enable_cache {
let output_value_cell = memory.get_cache_entry(
|| {
Expand All @@ -391,18 +439,33 @@ async fn evaluate_op_scope(
&op.function_exec_info.output_type,
/*ttl=*/ None,
)?;
evaluate_with_cell(output_value_cell.as_ref(), move || {

let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
op.executor.evaluate(input_values)
})
.await
.and_then(|v| head_scope.define_field(&op.output, &v))
});
let v = evaluate_with_timeout_and_warning(
eval_future,
timeout_duration,
warn_duration,
op_kind_for_warning,
op_name_for_warning,
)
.await?;

head_scope.define_field(&op.output, &v)
} else {
op.executor
.evaluate(input_values)
.await
.and_then(|v| head_scope.define_field(&op.output, &v))
}
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));
let eval_future = op.executor.evaluate(input_values);
let v = evaluate_with_timeout_and_warning(
eval_future,
timeout_duration,
warn_duration,
op_kind_for_warning,
op_name_for_warning,
)
.await?;

head_scope.define_field(&op.output, &v)
};

// Track transform operation completion
if let Some(ref op_stats) = operation_in_process_stats {
Expand All @@ -411,7 +474,7 @@ async fn evaluate_op_scope(
op_stats.finish_processing(&transform_key, 1);
}

result?
result.with_context(|| format!("Evaluating Transform op `{}`", op.name))?
}

AnalyzedReactiveOp::ForEach(op) => {
Expand Down
5 changes: 5 additions & 0 deletions rust/cocoindex/src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ pub trait SimpleFunctionExecutor: Send + Sync {
fn behavior_version(&self) -> Option<u32> {
None
}

/// Returns None to use the default timeout (1800s)
fn timeout(&self) -> Option<std::time::Duration> {
None
}
}

#[async_trait]
Expand Down
16 changes: 15 additions & 1 deletion rust/cocoindex/src/ops/py_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct PyFunctionExecutor {

enable_cache: bool,
behavior_version: Option<u32>,
timeout: Option<std::time::Duration>,
}

impl PyFunctionExecutor {
Expand Down Expand Up @@ -112,6 +113,10 @@ impl interface::SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
fn behavior_version(&self) -> Option<u32> {
self.behavior_version
}

fn timeout(&self) -> Option<std::time::Duration> {
self.timeout
}
}

struct PyBatchedFunctionExecutor {
Expand All @@ -121,6 +126,7 @@ struct PyBatchedFunctionExecutor {

enable_cache: bool,
behavior_version: Option<u32>,
timeout: Option<std::time::Duration>,
batching_options: batching::BatchingOptions,
}

Expand Down Expand Up @@ -240,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
.as_ref()
.ok_or_else(|| anyhow!("Python execution context is missing"))?
.clone();
let (prepare_fut, enable_cache, behavior_version, batching_options) =
let (prepare_fut, enable_cache, behavior_version, timeout, batching_options) =
Python::with_gil(|py| -> anyhow::Result<_> {
let prepare_coro = executor
.call_method(py, "prepare", (), None)
Expand All @@ -260,6 +266,11 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
.call_method(py, "behavior_version", (), None)
.to_result_with_py_trace(py)?
.extract::<Option<u32>>(py)?;
let timeout = executor
.call_method(py, "timeout", (), None)
.to_result_with_py_trace(py)?
.extract::<Option<u64>>(py)?
.map(std::time::Duration::from_secs);
let batching_options = executor
.call_method(py, "batching_options", (), None)
.to_result_with_py_trace(py)?
Expand All @@ -271,6 +282,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
prepare_fut,
enable_cache,
behavior_version,
timeout,
batching_options,
))
})?;
Expand All @@ -284,6 +296,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
result_type,
enable_cache,
behavior_version,
timeout,
batching_options,
}
.into_fn_executor(),
Expand All @@ -297,6 +310,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
result_type,
enable_cache,
behavior_version,
timeout,
}))
};
Ok(executor)
Expand Down
Loading