Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 90 additions & 36 deletions grpc/src/client/load_balancing/child_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState,
WeakSubchannel, WorkScheduler,
};
use crate::client::name_resolution::{Address, ResolverUpdate};
use crate::client::name_resolution::{Address, Endpoint, ResolverUpdate};
use crate::client::ConnectivityState;
use crate::rt::Runtime;

Expand All @@ -50,6 +50,7 @@
update_sharder: Box<dyn ResolverUpdateSharder<T>>,
pending_work: Arc<Mutex<HashSet<usize>>>,
runtime: Arc<dyn Runtime>,
updated: bool,
}

struct Child<T> {
Expand Down Expand Up @@ -81,6 +82,47 @@
) -> Result<Box<dyn Iterator<Item = ChildUpdate<T>>>, Box<dyn Error + Send + Sync>>;
}

/// EndpointSharder shards a resolver update into individual endpoints,
/// with each endpoint serving as the unique identifier for a child.
///
/// The EndpointSharder implements the ResolverUpdateSharder trait,
/// allowing any load-balancing (LB) policy that uses the ChildManager
/// to split a resolver update into individual endpoints, with one endpoint for each child.
pub struct EndpointSharder {
pub builder: Arc<dyn LbPolicyBuilder>,
}

// Creates a ChildUpdate for each endpoint received.
impl ResolverUpdateSharder<Endpoint> for EndpointSharder {
fn shard_update(
&self,
resolver_update: ResolverUpdate,
) -> Result<Box<dyn Iterator<Item = ChildUpdate<Endpoint>>>, Box<dyn Error + Send + Sync>> {
let update: Vec<_> = resolver_update
.endpoints
.unwrap()
.into_iter()
.map(|e| ChildUpdate {
child_identifier: e.clone(),
child_policy_builder: self.builder.clone(),
child_update: ResolverUpdate {
attributes: resolver_update.attributes.clone(),
endpoints: Ok(vec![e.clone()]),
service_config: resolver_update.service_config.clone(),
resolution_note: resolver_update.resolution_note.clone(),
},
})
.collect();
Ok(Box::new(update.into_iter()))
}
}

impl EndpointSharder {
pub fn new(builder: Arc<dyn LbPolicyBuilder>) -> Self {
Self { builder }
}
}

impl<T> ChildManager<T> {
/// Creates a new ChildManager LB policy. shard_update is called whenever a
/// resolver_update operation occurs.
Expand All @@ -94,6 +136,7 @@
children: Default::default(),
pending_work: Default::default(),
runtime,
updated: false,
}
}

Expand Down Expand Up @@ -158,8 +201,44 @@
// Update the tracked state if the child produced an update.
if let Some(state) = channel_controller.picker_update {
self.children[child_idx].state = state;
self.updated = true;
};
}

// Called to update all accounting in the ChildManager from operations
// performed by a child policy on the WrappedController that was created for
// it. child_idx is an index into the children map for the relevant child.
//
// TODO: this post-processing step can be eliminated by capturing the right
// state inside the WrappedController, however it is fairly complex. Decide
// which way is better.
pub(crate) fn forward_update_to_children(
&mut self,
channel_controller: &mut dyn ChannelController,
resolver_update: ResolverUpdate,
config: Option<&LbConfig>,
) {
for child_idx in 0..self.children.len() {
let child = &mut self.children[child_idx];
let mut channel_controller = WrappedController::new(channel_controller);
let _ = child.policy.resolver_update(
resolver_update.clone(),
config,
&mut channel_controller,
);
self.resolve_child_controller(channel_controller, child_idx);
}
}

/// Checks whether a child has produced an update.
pub fn has_updated(&mut self) -> bool {
mem::take(&mut self.updated)
}

/// Returns true if ChildManager has children.
pub fn has_children(&self) -> bool {
!self.children.is_empty()
}
}

impl<T: PartialEq + Hash + Eq + Send + Sync + 'static> LbPolicy for ChildManager<T> {
Expand Down Expand Up @@ -306,8 +385,13 @@
}
}

fn exit_idle(&mut self, _channel_controller: &mut dyn ChannelController) {
todo!("implement exit_idle")
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
for child_idx in 0..self.children.len() {
let child = &mut self.children[child_idx];
let mut channel_controller = WrappedController::new(channel_controller);
child.policy.exit_idle(&mut channel_controller);
self.resolve_child_controller(channel_controller, child_idx);
}
}
}

Expand Down Expand Up @@ -360,7 +444,8 @@
#[cfg(test)]
mod test {
use crate::client::load_balancing::child_manager::{
Child, ChildManager, ChildUpdate, ChildWorkScheduler, ResolverUpdateSharder,
Child, ChildManager, ChildUpdate, ChildWorkScheduler, EndpointSharder,
ResolverUpdateSharder,
};
use crate::client::load_balancing::test_utils::{
self, StubPolicy, StubPolicyData, StubPolicyFuncs, TestChannelController, TestEvent,
Expand All @@ -384,37 +469,6 @@
use tokio::sync::mpsc;
use tonic::metadata::MetadataMap;

// TODO: This needs to be moved to a common place that can be shared between
// round_robin and this test. This EndpointSharder maps endpoints to
// children policies.
struct EndpointSharder {
builder: Arc<dyn LbPolicyBuilder>,
}

impl ResolverUpdateSharder<Endpoint> for EndpointSharder {
fn shard_update(
&self,
resolver_update: ResolverUpdate,
) -> Result<Box<dyn Iterator<Item = ChildUpdate<Endpoint>>>, Box<dyn Error + Send + Sync>>
{
let mut sharded_endpoints = Vec::new();
for endpoint in resolver_update.endpoints.unwrap().iter() {
let child_update = ChildUpdate {
child_identifier: endpoint.clone(),
child_policy_builder: self.builder.clone(),
child_update: ResolverUpdate {
attributes: resolver_update.attributes.clone(),
endpoints: Ok(vec![endpoint.clone()]),
service_config: resolver_update.service_config.clone(),
resolution_note: resolver_update.resolution_note.clone(),
},
};
sharded_endpoints.push(child_update);
}
Ok(Box::new(sharded_endpoints.into_iter()))
}
}

// Sets up the test environment.
//
// Performs the following:
Expand Down Expand Up @@ -444,7 +498,7 @@
let (tx_events, rx_events) = mpsc::unbounded_channel::<TestEvent>();
let tcc = Box::new(TestChannelController { tx_events });
let builder: Arc<dyn LbPolicyBuilder> = GLOBAL_LB_REGISTRY.get_policy(test_name).unwrap();
let endpoint_sharder = EndpointSharder { builder };
let endpoint_sharder = EndpointSharder::new(builder);
let child_manager = ChildManager::new(Box::new(endpoint_sharder), default_runtime());
(rx_events, Box::new(child_manager), tcc)
}
Expand Down Expand Up @@ -536,7 +590,7 @@
picker: Arc::new(QueuingPicker {}),
});
}),
..Default::default()

Check warning on line 593 in grpc/src/client/load_balancing/child_manager.rs

View workflow job for this annotation

GitHub Actions / clippy

struct update has no effect, all the fields in the struct have already been specified
}
}

Expand Down
1 change: 0 additions & 1 deletion grpc/src/client/load_balancing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub mod pick_first;
pub mod round_robin;
#[cfg(test)]
pub mod test_utils;
pub mod utils;

pub(crate) mod registry;
use super::{service_config::LbConfig, subchannel::SubchannelStateWatcher};
Expand Down
Loading
Loading