Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nativelink-config/examples/stores-config.json5
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@
"endpoints": [
{"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
],
"connections_per_endpoint": "5",
"rpc_timeout_s": "5m",
"store_type": "ac"
}
},
Expand Down
4 changes: 2 additions & 2 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ pub struct GrpcSpec {
/// Limit the number of simultaneous upstream requests to this many. A
/// value of zero is treated as unlimited. If the limit is reached the
/// request is queued.
#[serde(default)]
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub max_concurrent_requests: usize,

/// The number of connections to make to each specified endpoint to balance
/// the load over multiple TCP connections. Default 1.
#[serde(default)]
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub connections_per_endpoint: usize,
}

Expand Down
117 changes: 117 additions & 0 deletions nativelink-config/src/serde_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,43 @@ pub fn convert_string_with_shellexpand<'de, D: Deserializer<'de>>(
Ok((*(shellexpand::env(&value).map_err(de::Error::custom)?)).to_string())
}

pub fn convert_boolean_with_shellexpand<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
T: TryFrom<bool>,
<T as TryFrom<bool>>::Error: fmt::Display,
{
struct BooleanExpandVisitor<T: TryFrom<bool>>(PhantomData<T>);

impl<T> Visitor<'_> for BooleanExpandVisitor<T>
where
T: TryFrom<bool>,
<T as TryFrom<bool>>::Error: fmt::Display,
{
type Value = T;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a boolean or a shell-expandable string that is a boolean")
}

fn visit_bool<E: de::Error>(self, v: bool) -> Result<Self::Value, E> {
T::try_from(v).map_err(de::Error::custom)
}

fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
if v.is_empty() {
return Err(de::Error::custom("empty string is not a valid number"));
}
let expanded = shellexpand::env(v).map_err(de::Error::custom)?;
let s = expanded.as_ref().trim().to_lowercase();
let parsed = s.parse::<bool>().map_err(de::Error::custom)?;
T::try_from(parsed).map_err(de::Error::custom)
}
}

deserializer.deserialize_any(BooleanExpandVisitor::<T>(PhantomData))
}

/// Same as `convert_string_with_shellexpand`, but supports `Vec<String>`.
///
/// # Errors
Expand Down Expand Up @@ -249,6 +286,86 @@ where
deserializer.deserialize_any(DataSizeVisitor::<T>(PhantomData))
}

/// # Errors
///
/// Will return `Err` if deserialization fails.
pub fn convert_optional_data_size_with_shellexpand<'de, D, T>(
deserializer: D,
) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
T: TryFrom<u128>,
<T as TryFrom<u128>>::Error: fmt::Display,
{
struct DataSizeVisitor<T: TryFrom<u128>>(PhantomData<T>);

impl<'de, T> Visitor<'de> for DataSizeVisitor<T>
where
T: TryFrom<u128>,
<T as TryFrom<u128>>::Error: fmt::Display,
{
type Value = Option<T>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("an optional number of bytes as an integer, or a string with a data size format (e.g., \"1GB\", \"500MB\", \"1.5TB\")")
}

fn visit_none<E: de::Error>(self) -> Result<Self::Value, E> {
Ok(None)
}

fn visit_unit<E: de::Error>(self) -> Result<Self::Value, E> {
Ok(None)
}

fn visit_some<D2: Deserializer<'de>>(
self,
deserializer: D2,
) -> Result<Self::Value, D2::Error> {
deserializer.deserialize_any(self)
}

fn visit_u64<E: de::Error>(self, v: u64) -> Result<Self::Value, E> {
T::try_from(u128::from(v))
.map(Some)
.map_err(de::Error::custom)
}

fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
if v < 0 {
return Err(de::Error::custom("Negative data size is not allowed"));
}
let v_u128 = u128::try_from(v).map_err(de::Error::custom)?;
T::try_from(v_u128).map(Some).map_err(de::Error::custom)
}

fn visit_u128<E: de::Error>(self, v: u128) -> Result<Self::Value, E> {
T::try_from(v).map(Some).map_err(de::Error::custom)
}

fn visit_i128<E: de::Error>(self, v: i128) -> Result<Self::Value, E> {
if v < 0 {
return Err(de::Error::custom("Negative data size is not allowed"));
}
let v_u128 = u128::try_from(v).map_err(de::Error::custom)?;
T::try_from(v_u128).map(Some).map_err(de::Error::custom)
}

fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
let expanded = shellexpand::env(v).map_err(de::Error::custom)?;
let s = expanded.as_ref().trim();
if v.is_empty() {
return Err(de::Error::custom("Missing value in a size field"));
}
let byte_size = Byte::parse_str(s, true).map_err(de::Error::custom)?;
let bytes = byte_size.as_u128();
T::try_from(bytes).map(Some).map_err(de::Error::custom)
}
}

deserializer.deserialize_option(DataSizeVisitor::<T>(PhantomData))
}

/// # Errors
///
/// Will return `Err` if deserialization fails.
Expand Down
61 changes: 41 additions & 20 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use rand::Rng;
use serde::{Deserialize, Serialize};

use crate::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand,
convert_boolean_with_shellexpand, convert_data_size_with_shellexpand,
convert_duration_with_shellexpand, convert_numeric_with_shellexpand,
convert_optional_data_size_with_shellexpand, convert_optional_numeric_with_shellexpand,
convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
convert_vec_string_with_shellexpand,
};
Expand Down Expand Up @@ -472,6 +473,8 @@ pub enum StoreSpec {
/// "endpoints": [
/// {"address": "grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"}
/// ],
/// "connections_per_endpoint": "5",
/// "rpc_timeout_s": "5m",
/// "store_type": "ac"
/// }
/// ```
Expand Down Expand Up @@ -542,6 +545,7 @@ pub struct ShardConfig {
/// all the store's weights divided by the individual store's weight.
///
/// Default: 1
#[serde(deserialize_with = "convert_optional_numeric_with_shellexpand")]
pub weight: Option<u32>,
}

Expand Down Expand Up @@ -618,7 +622,7 @@ pub struct FilesystemSpec {
/// runtime.
/// A value of 0 means unlimited (no concurrency limit).
/// Default: 0
#[serde(default)]
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub max_concurrent_writes: usize,
}

Expand All @@ -632,7 +636,7 @@ pub struct ExperimentalOntapS3Spec {
pub vserver_name: String,
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub bucket: String,
#[serde(default)]
#[serde(default, deserialize_with = "convert_optional_string_with_shellexpand")]
pub root_certificates: Option<String>,

/// Common retry and upload configuration
Expand Down Expand Up @@ -786,15 +790,15 @@ pub struct VerifySpec {
/// an upload of data.
///
/// This should be set to false for AC, but true for CAS stores.
#[serde(default)]
#[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
pub verify_size: bool,

/// If the data should be hashed and verify that the key matches the
/// computed hash. The hash function is automatically determined based
/// request and if not set will use the global default.
///
/// This should be set to false for AC, but true for CAS stores.
#[serde(default)]
#[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
pub verify_hash: bool,
}

Expand Down Expand Up @@ -930,24 +934,28 @@ pub struct ExperimentalGcsSpec {
/// Chunk size for resumable uploads.
///
/// Default: 2MB
#[serde(
default,
deserialize_with = "convert_optional_data_size_with_shellexpand"
)]
pub resumable_chunk_size: Option<usize>,

/// Common retry and upload configuration
#[serde(flatten)]
pub common: CommonObjectSpec,

/// Error if authentication was not found.
#[serde(default)]
#[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
pub authentication_required: bool,

/// Connection timeout in milliseconds.
/// Default: 3000
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub connection_timeout_s: u64,

/// Read timeout in milliseconds.
/// Default: 3000
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub read_timeout_s: u64,
}

Expand Down Expand Up @@ -981,17 +989,26 @@ pub struct CommonObjectSpec {
/// upload will be aborted and the client will likely receive an error.
///
/// Default: 5MB.
#[serde(
default,
deserialize_with = "convert_optional_data_size_with_shellexpand"
)]
pub max_retry_buffer_per_request: Option<usize>,

/// Maximum number of concurrent `UploadPart` requests per `MultipartUpload`.
///
/// Default: 10.
///
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub multipart_max_concurrent_uploads: Option<usize>,

/// Allow unencrypted HTTP connections. Only use this for local testing.
///
/// Default: false
#[serde(default)]
#[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
pub insecure_allow_http: bool,

/// Disable http/2 connections and only use http/1.1. Default client
Expand All @@ -1001,7 +1018,7 @@ pub struct CommonObjectSpec {
/// underlying network environment, S3, or GCS API servers specify otherwise.
///
/// Default: false
#[serde(default)]
#[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
pub disable_http2: bool,
}

Expand Down Expand Up @@ -1050,29 +1067,33 @@ pub struct GrpcEndpoint {
/// The TLS configuration to use to connect to the endpoint (if grpcs).
pub tls_config: Option<ClientTlsConfig>,
/// The maximum concurrency to allow on this endpoint.
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub concurrency_limit: Option<usize>,

/// Timeout for establishing a TCP connection to the endpoint (seconds).
/// If not set or 0, defaults to 30 seconds.
#[serde(default)]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub connect_timeout_s: u64,

/// TCP keepalive interval (seconds). Sends TCP keepalive probes at this
/// interval to detect dead connections at the OS level.
/// If not set or 0, defaults to 30 seconds.
#[serde(default)]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub tcp_keepalive_s: u64,

/// HTTP/2 keepalive interval (seconds). Sends HTTP/2 PING frames at this
/// interval to detect dead connections at the application level.
/// If not set or 0, defaults to 30 seconds.
#[serde(default)]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub http2_keepalive_interval_s: u64,

/// HTTP/2 keepalive timeout (seconds). If a PING response is not received
/// within this duration, the connection is considered dead.
/// If not set or 0, defaults to 20 seconds.
#[serde(default)]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub http2_keepalive_timeout_s: u64,
}

Expand All @@ -1096,20 +1117,20 @@ pub struct GrpcSpec {
/// Limit the number of simultaneous upstream requests to this many. A
/// value of zero is treated as unlimited. If the limit is reached the
/// request is queued.
#[serde(default)]
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub max_concurrent_requests: usize,

/// The number of connections to make to each specified endpoint to balance
/// the load over multiple TCP connections. Default 1.
#[serde(default)]
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub connections_per_endpoint: usize,

/// Maximum time (seconds) allowed for a single RPC request (e.g. a
/// ByteStream.Write call) before it is cancelled. This prevents
/// individual RPCs from hanging forever on dead connections.
///
/// Default: 120 (seconds)
#[serde(default)]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub rpc_timeout_s: u64,
}

Expand Down Expand Up @@ -1175,7 +1196,7 @@ pub struct RedisSpec {
/// organize your data according to the shared prefix.
///
/// Default: (Empty String / No Prefix)
#[serde(default)]
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub key_prefix: String,

/// Set the mode Redis is operating in.
Expand Down Expand Up @@ -1396,7 +1417,7 @@ pub struct ExperimentalMongoSpec {
/// Enable `MongoDB` change streams for real-time updates.
/// Required for scheduler subscriptions.
/// Default: false
#[serde(default)]
#[serde(default, deserialize_with = "convert_boolean_with_shellexpand")]
pub enable_change_streams: bool,

/// Write concern 'w' parameter.
Expand Down
Loading
Loading