Skip to content

Commit f18be9c

Browse files
authored
Fix outbound race condition where we don't have the source workload from XDS yet (#1092)
* Wait without demand (fixes #51193) Signed-off-by: Benjamin Leggett <[email protected]> * Add some tests Signed-off-by: Benjamin Leggett <[email protected]> * Fixup Signed-off-by: Benjamin Leggett <[email protected]> --------- Signed-off-by: Benjamin Leggett <[email protected]>
1 parent 718f6f0 commit f18be9c

File tree

5 files changed

+182
-9
lines changed

5 files changed

+182
-9
lines changed

src/proxy/outbound.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::net::{IpAddr, SocketAddr};
1616
use std::sync::Arc;
1717

18-
use std::time::Instant;
18+
use std::time::{Duration, Instant};
1919

2020
use drain::Watch;
2121

@@ -464,9 +464,17 @@ impl OutboundConnection {
464464
network: self.pi.cfg.network.clone(),
465465
address: downstream,
466466
};
467-
let source_workload = match self.pi.state.fetch_workload(&downstream_network_addr).await {
467+
468+
let source_workload = match self
469+
.pi
470+
.state
471+
.wait_for_workload(&downstream_network_addr, Duration::from_millis(500))
472+
.await
473+
{
468474
Some(wl) => wl,
469-
None => return Err(Error::UnknownSource(downstream)),
475+
None => {
476+
return Err(Error::UnknownSource(downstream));
477+
}
470478
};
471479
if let Some(ref wl_info) = self.pi.proxy_workload_info {
472480
// make sure that the workload we fetched matches the workload info we got over ZDS.

src/state.rs

+139
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use std::fmt;
4444
use std::net::{IpAddr, SocketAddr};
4545
use std::str::FromStr;
4646
use std::sync::{Arc, RwLock, RwLockReadGuard};
47+
use std::time::Duration;
4748
use tracing::{debug, error, trace, warn};
4849

4950
use self::workload::ApplicationTunnel;
@@ -559,6 +560,47 @@ impl DemandProxyState {
559560
fetch(addr)
560561
}
561562

563+
// same as fetch_workload, but if the caller knows the workload is enroute already,
564+
// will retry on cache miss for a configured amount of time - returning the workload
565+
// when we get it, or nothing if the timeout is exceeded, whichever happens first
566+
pub async fn wait_for_workload(
567+
&self,
568+
addr: &NetworkAddress,
569+
deadline: Duration,
570+
) -> Option<Arc<Workload>> {
571+
debug!(%addr, "wait for workload");
572+
573+
// Take a watch listener *before* checking state (so we don't miss anything)
574+
let mut wl_sub = self.state.read().unwrap().workloads.new_subscriber();
575+
576+
debug!(%addr, "got sub, waiting for workload");
577+
578+
if let Some(wl) = self.fetch_workload(addr).await {
579+
return Some(wl);
580+
}
581+
582+
// We didn't find the workload we expected, so
583+
// loop until the subscriber wakes us on new workload,
584+
// or we hit the deadline timeout and give up
585+
let timeout = tokio::time::sleep(deadline);
586+
let subscriber = wl_sub.changed();
587+
tokio::pin!(timeout);
588+
tokio::pin!(subscriber);
589+
loop {
590+
tokio::select! {
591+
_ = &mut timeout => {
592+
warn!("timed out waiting for workload from xds");
593+
break None;
594+
},
595+
_ = &mut subscriber => {
596+
if let Some(wl) = self.fetch_workload(addr).await {
597+
break Some(wl);
598+
}
599+
}
600+
};
601+
}
602+
}
603+
562604
// only support workload
563605
pub async fn fetch_workload(&self, addr: &NetworkAddress) -> Option<Arc<Workload>> {
564606
// Wait for it on-demand, *if* needed
@@ -805,6 +847,103 @@ mod tests {
805847
use crate::{strng, test_helpers};
806848
use test_case::test_case;
807849

850+
#[tokio::test]
851+
async fn test_wait_for_workload() {
852+
let mut state = ProxyState::default();
853+
let delayed_wl = Arc::new(test_helpers::test_default_workload());
854+
state.workloads.insert(delayed_wl.clone(), true);
855+
856+
let mut registry = Registry::default();
857+
let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry));
858+
let mock_proxy_state = DemandProxyState::new(
859+
Arc::new(RwLock::new(state)),
860+
None,
861+
ResolverConfig::default(),
862+
ResolverOpts::default(),
863+
metrics,
864+
);
865+
866+
// Some from Address
867+
let dst = NetworkAddress {
868+
network: strng::EMPTY,
869+
address: IpAddr::V4(Ipv4Addr::LOCALHOST),
870+
};
871+
872+
test_helpers::assert_eventually(
873+
Duration::from_secs(1),
874+
|| mock_proxy_state.wait_for_workload(&dst, Duration::from_millis(50)),
875+
Some(delayed_wl),
876+
)
877+
.await;
878+
}
879+
880+
#[tokio::test]
881+
async fn test_wait_for_workload_delay_fails() {
882+
let state = ProxyState::default();
883+
884+
let mut registry = Registry::default();
885+
let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry));
886+
let mock_proxy_state = DemandProxyState::new(
887+
Arc::new(RwLock::new(state)),
888+
None,
889+
ResolverConfig::default(),
890+
ResolverOpts::default(),
891+
metrics,
892+
);
893+
894+
// Some from Address
895+
let dst = NetworkAddress {
896+
network: strng::EMPTY,
897+
address: IpAddr::V4(Ipv4Addr::LOCALHOST),
898+
};
899+
900+
test_helpers::assert_eventually(
901+
Duration::from_millis(10),
902+
|| mock_proxy_state.wait_for_workload(&dst, Duration::from_millis(5)),
903+
None,
904+
)
905+
.await;
906+
}
907+
908+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
909+
async fn test_wait_for_workload_eventually() {
910+
let state = ProxyState::default();
911+
let wrap_state = Arc::new(RwLock::new(state));
912+
let delayed_wl = Arc::new(test_helpers::test_default_workload());
913+
914+
let mut registry = Registry::default();
915+
let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry));
916+
let mock_proxy_state = DemandProxyState::new(
917+
wrap_state.clone(),
918+
None,
919+
ResolverConfig::default(),
920+
ResolverOpts::default(),
921+
metrics,
922+
);
923+
924+
// Some from Address
925+
let dst = NetworkAddress {
926+
network: strng::EMPTY,
927+
address: IpAddr::V4(Ipv4Addr::LOCALHOST),
928+
};
929+
930+
let expected_wl = delayed_wl.clone();
931+
let t = tokio::spawn(async move {
932+
test_helpers::assert_eventually(
933+
Duration::from_millis(500),
934+
|| mock_proxy_state.wait_for_workload(&dst, Duration::from_millis(250)),
935+
Some(expected_wl),
936+
)
937+
.await;
938+
});
939+
wrap_state
940+
.write()
941+
.unwrap()
942+
.workloads
943+
.insert(delayed_wl, true);
944+
t.await.expect("should not fail");
945+
}
946+
808947
#[tokio::test]
809948
async fn lookup_address() {
810949
let mut state = ProxyState::default();

src/state/workload.rs

+30-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use std::str::FromStr;
3333
use std::sync::Arc;
3434
use std::{fmt, net};
3535
use thiserror::Error;
36+
use tokio::sync::watch::{Receiver, Sender};
3637
use tracing::{error, trace};
3738
use xds::istio::workload::ApplicationTunnel as XdsApplicationTunnel;
3839
use xds::istio::workload::GatewayAddress as XdsGatewayAddress;
@@ -589,8 +590,13 @@ pub fn network_addr(network: Strng, vip: IpAddr) -> NetworkAddress {
589590
}
590591

591592
/// A WorkloadStore encapsulates all information about workloads in the mesh
592-
#[derive(Default, Debug)]
593+
#[derive(Debug)]
593594
pub struct WorkloadStore {
595+
// TODO this could be expanded to Sender<Workload> + a full subscriber/streaming
596+
// model, but for now just notifying watchers to wake when _any_ insert happens
597+
// is simpler (and only requires a channelsize of 1)
598+
insert_notifier: Sender<()>,
599+
594600
/// byAddress maps workload network addresses to workloads
595601
by_addr: HashMap<NetworkAddress, Arc<Workload>>,
596602
/// byUid maps workload UIDs to workloads
@@ -601,7 +607,26 @@ pub struct WorkloadStore {
601607
by_identity: HashMap<Identity, HashSet<Strng>>,
602608
}
603609

610+
impl Default for WorkloadStore {
611+
fn default() -> Self {
612+
WorkloadStore {
613+
insert_notifier: Sender::new(()),
614+
by_addr: Default::default(),
615+
by_hostname: Default::default(),
616+
by_identity: Default::default(),
617+
by_uid: Default::default(),
618+
}
619+
}
620+
}
621+
604622
impl WorkloadStore {
623+
// Returns a new subscriber. Note that subscribers are only guaranteed to be notified on
624+
// new values sent _after_ their creation, so callers should create, check current state,
625+
// then sub.
626+
pub fn new_subscriber(&self) -> Receiver<()> {
627+
self.insert_notifier.subscribe()
628+
}
629+
605630
pub fn insert(&mut self, w: Arc<Workload>, track_identity: bool) {
606631
// First, remove the entry entirely to make sure things are cleaned up properly.
607632
self.remove(&w.uid);
@@ -621,6 +646,10 @@ impl WorkloadStore {
621646
.or_default()
622647
.insert(w.uid.clone());
623648
}
649+
650+
// We have stored a newly inserted workload, notify watchers
651+
// (if any) to wake.
652+
self.insert_notifier.send_replace(());
624653
}
625654

626655
pub fn remove(&mut self, uid: &Strng) -> Option<Workload> {

src/telemetry.rs

-3
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,6 @@ where
222222
mut writer: Writer<'_>,
223223
event: &Event<'_>,
224224
) -> std::fmt::Result {
225-
use tracing_log::NormalizeEvent;
226-
use tracing_subscriber::fmt::time::FormatTime;
227-
use tracing_subscriber::fmt::time::SystemTime;
228225
let normalized_meta = event.normalized_metadata();
229226
SystemTime.format_time(&mut writer)?;
230227
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());

src/xds/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -848,8 +848,8 @@ mod tests {
848848
};
849849
while start_time.elapsed().unwrap() < TEST_TIMEOUT && !matched {
850850
sleep(POLL_RATE).await;
851-
let wl = source.fetch_workload(&ip_network_addr).await;
852-
matched = wl.as_deref() == converted.as_ref(); // Option<Workload> is Ok to compare without needing to unwrap
851+
let wl = source.fetch_workload(&ip_network_addr);
852+
matched = wl.await.as_deref() == converted.as_ref(); // Option<Workload> is Ok to compare without needing to unwrap
853853
}
854854
}
855855

0 commit comments

Comments
 (0)