Skip to content

Commit 0557b0c

Browse files
committed
implement waiting for config and online (first draft)
Signed-off-by: Travis <[email protected]>
1 parent 6754120 commit 0557b0c

File tree

7 files changed

+128
-33
lines changed

7 files changed

+128
-33
lines changed

src/client/app_configuration_client.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ pub trait ConfigurationProvider {
6767
/// For remote configurations, it returns whether it's connected to the
6868
/// remote or not
6969
fn is_online(&self) -> Result<bool>;
70+
71+
/// For remote configurations: Blocks until it's connected to the remote.
72+
fn wait_until_online(&self);
73+
74+
/// Blocks until a configuration is available.
75+
/// Note: This is different than wait_until_online, as configuration could be available
76+
/// through alternate sources (Cache / Fallback)
77+
fn wait_until_configuration_is_available(&self);
7078
}
7179

7280
/// AppConfiguration client for browsing, and evaluating features and properties.

src/client/app_configuration_http.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ impl<T: LiveConfiguration> ConfigurationProvider for AppConfigurationClientHttp<
9292
fn is_online(&self) -> Result<bool> {
9393
self.live_configuration.is_online()
9494
}
95+
96+
fn wait_until_online(&self) {
97+
self.live_configuration.wait_until_online();
98+
}
99+
100+
fn wait_until_configuration_is_available(&self) {
101+
self.live_configuration
102+
.wait_until_configuration_is_available();
103+
}
95104
}
96105

97106
#[cfg(test)]

src/client/app_configuration_ibm_cloud.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ impl ConfigurationProvider for AppConfigurationClientIBMCloud {
8686
fn is_online(&self) -> Result<bool> {
8787
self.client.is_online()
8888
}
89+
90+
fn wait_until_online(&self) {
91+
self.client.wait_until_online();
92+
}
93+
94+
fn wait_until_configuration_is_available(&self) {
95+
self.client.wait_until_configuration_is_available();
96+
}
8997
}
9098

9199
#[cfg(test)]

src/client/app_configuration_offline.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use crate::errors::Result;
1616
use crate::models::{Configuration, FeatureSnapshot, PropertySnapshot};
1717
use crate::ConfigurationProvider;
18+
use log::error;
1819

1920
/// AppConfiguration client using a local file with a configuration snapshot
2021
#[derive(Debug)]
@@ -55,4 +56,14 @@ impl ConfigurationProvider for AppConfigurationOffline {
5556
fn is_online(&self) -> Result<bool> {
5657
Ok(false)
5758
}
59+
60+
fn wait_until_online(&self) {
61+
error!("Waiting for AppConfigurationOffline to get online. This will never happen.");
62+
std::thread::park(); // block forever
63+
}
64+
65+
fn wait_until_configuration_is_available(&self) {
66+
// No wait required:
67+
// AppConfigurationOffline always has a configuration available.
68+
}
5869
}

src/models/configuration.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::ConfigurationDataError;
2222
use super::feature_snapshot::FeatureSnapshot;
2323
use super::property_snapshot::PropertySnapshot;
2424
use crate::ConfigurationProvider;
25+
use log::error;
2526

2627
/// Represents all the configuration data needed for the client to perform
2728
/// feature/propery evaluation.
@@ -192,6 +193,15 @@ impl ConfigurationProvider for Configuration {
192193
fn is_online(&self) -> Result<bool> {
193194
Ok(false)
194195
}
196+
197+
fn wait_until_online(&self) {
198+
error!("Waiting for Configuration to get online. This will never happen.");
199+
std::thread::park(); // block forever
200+
}
201+
202+
fn wait_until_configuration_is_available(&self) {
203+
// No wait required
204+
}
195205
}
196206

197207
#[cfg(test)]

src/network/live_configuration/live_configuration.rs

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::{Arc, Mutex};
15+
use std::sync::{Arc, Condvar, Mutex};
1616

1717
use super::current_mode::CurrentModeOfflineReason;
1818
use super::update_thread_worker::UpdateThreadWorker;
1919
use super::{CurrentMode, Error, OfflineMode, Result};
2020
use crate::models::Configuration;
2121
use crate::network::http_client::ServerClient;
22+
use crate::network::live_configuration::current_mode;
2223
use crate::utils::{ThreadHandle, ThreadStatus};
2324
use crate::{ConfigurationId, ConfigurationProvider};
2425

@@ -37,10 +38,10 @@ pub trait LiveConfiguration: ConfigurationProvider {
3738
pub(crate) struct LiveConfigurationImpl {
3839
/// Configuration object that will be returned to consumers. This is also the object
3940
/// that the thread in the backend will be updating.
40-
configuration: Arc<Mutex<Option<Configuration>>>,
41+
configuration: Arc<(Mutex<Option<Configuration>>, Condvar)>,
4142

4243
/// Current operation mode.
43-
current_mode: Arc<Mutex<CurrentMode>>,
44+
current_mode: Arc<(Mutex<CurrentMode>, Condvar)>,
4445

4546
/// Handler to the internal thread that takes care of updating the [`LiveConfigurationImpl::configuration`].
4647
update_thread: ThreadHandle<Result<()>>,
@@ -57,10 +58,11 @@ impl LiveConfigurationImpl {
5758
server_client: T,
5859
configuration_id: ConfigurationId,
5960
) -> Self {
60-
let configuration = Arc::new(Mutex::new(None));
61-
let current_mode = Arc::new(Mutex::new(CurrentMode::Offline(
62-
CurrentModeOfflineReason::Initializing,
63-
)));
61+
let configuration = Arc::new((Mutex::new(None), Condvar::new()));
62+
let current_mode = Arc::new((
63+
Mutex::new(CurrentMode::Offline(CurrentModeOfflineReason::Initializing)),
64+
Condvar::new(),
65+
));
6466

6567
let worker = UpdateThreadWorker::new(
6668
server_client,
@@ -84,20 +86,25 @@ impl LiveConfigurationImpl {
8486
/// configured for this object.
8587
fn get_configuration(&self) -> Result<Configuration> {
8688
// TODO: Can we return a reference instead?
87-
match &*self.current_mode.lock()? {
89+
let (current_mode_mutex, _) = &*self.current_mode;
90+
match &*current_mode_mutex.lock()? {
8891
CurrentMode::Online => {
89-
match &*self.configuration.lock()? {
92+
let (configuration_mutex, _) = &*self.configuration;
93+
match &*configuration_mutex.lock()? {
9094
// We store the configuration retrieved from the server into the Arc<Mutex> before switching the flag to Online
9195
None => unreachable!(),
9296
Some(configuration) => Ok(configuration.clone()),
9397
}
9498
}
9599
CurrentMode::Offline(current_mode_offline_reason) => match &self.offline_mode {
96100
OfflineMode::Fail => Err(Error::Offline(current_mode_offline_reason.clone())),
97-
OfflineMode::Cache => match &*self.configuration.lock()? {
98-
None => Err(Error::ConfigurationNotYetAvailable),
99-
Some(configuration) => Ok(configuration.clone()),
100-
},
101+
OfflineMode::Cache => {
102+
let (configuration_mutex, _) = &*self.configuration;
103+
match &*configuration_mutex.lock()? {
104+
None => Err(Error::ConfigurationNotYetAvailable),
105+
Some(configuration) => Ok(configuration.clone()),
106+
}
107+
}
101108
OfflineMode::FallbackData(app_configuration_offline) => {
102109
Ok(app_configuration_offline.config_snapshot.clone())
103110
}
@@ -107,13 +114,16 @@ impl LiveConfigurationImpl {
107114
"Thread finished with status: {:?}",
108115
result
109116
))),
110-
OfflineMode::Cache => match &*self.configuration.lock()? {
111-
None => Err(Error::UnrecoverableError(format!(
112-
"Initial configuration failed to retrieve: {:?}",
113-
result
114-
))),
115-
Some(configuration) => Ok(configuration.clone()),
116-
},
117+
OfflineMode::Cache => {
118+
let (configuration_mutex, _) = &*self.configuration;
119+
match &*configuration_mutex.lock()? {
120+
None => Err(Error::UnrecoverableError(format!(
121+
"Initial configuration failed to retrieve: {:?}",
122+
result
123+
))),
124+
Some(configuration) => Ok(configuration.clone()),
125+
}
126+
}
117127
OfflineMode::FallbackData(app_configuration_offline) => {
118128
Ok(app_configuration_offline.config_snapshot.clone())
119129
}
@@ -142,6 +152,24 @@ impl ConfigurationProvider for LiveConfigurationImpl {
142152
fn is_online(&self) -> crate::Result<bool> {
143153
Ok(self.get_current_mode()? == CurrentMode::Online)
144154
}
155+
156+
fn wait_until_configuration_is_available(&self) {
157+
let (configuration_mutex, condition_variable) = &*self.configuration;
158+
let configuration_guard = configuration_mutex.lock().unwrap();
159+
condition_variable
160+
.wait_while(configuration_guard, |configuration| configuration.is_none())
161+
.unwrap();
162+
}
163+
164+
fn wait_until_online(&self) {
165+
let (current_mode_mutex, condition_variable) = &*self.current_mode;
166+
let current_mode_guard = current_mode_mutex.lock().unwrap();
167+
condition_variable
168+
.wait_while(current_mode_guard, |current_mode| {
169+
*current_mode == CurrentMode::Online
170+
})
171+
.unwrap();
172+
}
145173
}
146174

147175
impl LiveConfiguration for LiveConfigurationImpl {
@@ -150,7 +178,8 @@ impl LiveConfiguration for LiveConfigurationImpl {
150178
}
151179

152180
fn get_current_mode(&self) -> Result<CurrentMode> {
153-
Ok(self.current_mode.lock()?.clone())
181+
let (current_mode_mutex, _) = &*self.current_mode;
182+
Ok(current_mode_mutex.lock()?.clone())
154183
}
155184
}
156185

src/network/live_configuration/update_thread_worker.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
// limitations under the License.
1414

1515
use std::sync::mpsc::Receiver;
16-
use std::sync::{Arc, Mutex};
16+
use std::sync::{Arc, Condvar, Mutex};
1717

1818
use super::current_mode::CurrentModeOfflineReason;
1919
use super::CurrentMode;
2020
use super::{Error, Result};
2121
use crate::models::Configuration;
2222
use crate::network::http_client::{ServerClient, WebsocketReader};
23+
use crate::network::live_configuration::current_mode;
2324
use crate::network::NetworkError;
2425
use crate::ConfigurationId;
2526

@@ -28,16 +29,16 @@ pub(crate) const SERVER_HEARTBEAT: &str = "test message";
2829
pub(crate) struct UpdateThreadWorker<T: ServerClient> {
2930
server_client: T,
3031
configuration_id: ConfigurationId,
31-
configuration: Arc<Mutex<Option<Configuration>>>,
32-
current_mode: Arc<Mutex<CurrentMode>>,
32+
configuration: Arc<(Mutex<Option<Configuration>>, Condvar)>,
33+
current_mode: Arc<(Mutex<CurrentMode>, Condvar)>,
3334
}
3435

3536
impl<T: ServerClient> UpdateThreadWorker<T> {
3637
pub(crate) fn new(
3738
server_client: T,
3839
configuration_id: ConfigurationId,
39-
configuration: Arc<Mutex<Option<Configuration>>>,
40-
current_mode: Arc<Mutex<CurrentMode>>,
40+
configuration: Arc<(Mutex<Option<Configuration>>, Condvar)>,
41+
current_mode: Arc<(Mutex<CurrentMode>, Condvar)>,
4142
) -> Self {
4243
Self {
4344
server_client,
@@ -93,7 +94,10 @@ impl<T: ServerClient> UpdateThreadWorker<T> {
9394
/// [`UpdateThreadWorker::current_mode`] is set to [`CurrentMode::Defunct`].
9495
pub(crate) fn run(&self, thread_termination_receiver: Receiver<()>) -> Result<()> {
9596
let result = self.run_internal(thread_termination_receiver);
96-
*self.current_mode.lock().unwrap() = CurrentMode::Defunct(result.clone());
97+
let (current_mode_mutex, condition_variable) = &*self.current_mode;
98+
let mut current_mode = current_mode_mutex.lock().unwrap();
99+
*current_mode = CurrentMode::Defunct(result.clone());
100+
condition_variable.notify_all();
97101
result
98102
}
99103

@@ -103,14 +107,27 @@ impl<T: ServerClient> UpdateThreadWorker<T> {
103107
fn update_configuration_from_server_and_current_mode(&self) -> Result<()> {
104108
match self.server_client.get_configuration(&self.configuration_id) {
105109
Ok(config) => {
106-
*self.configuration.lock()? = Some(config);
107-
*self.current_mode.lock()? = CurrentMode::Online;
110+
let (current_config_mutex, condition_variable) = &*self.configuration;
111+
let mut current_config = current_config_mutex.lock()?;
112+
*current_config = Some(config);
113+
condition_variable.notify_all();
114+
115+
let (current_mode_mutex, condition_variable) = &*self.current_mode;
116+
let mut current_mode = current_mode_mutex.lock()?;
117+
*current_mode = CurrentMode::Online;
118+
condition_variable.notify_all();
119+
108120
Ok(())
109121
}
110122
Err(e) => {
111123
Self::recoverable_error(e)?;
112-
*self.current_mode.lock()? =
124+
125+
let (current_mode_mutex, condition_variable) = &*self.current_mode;
126+
let mut current_mode = current_mode_mutex.lock()?;
127+
*current_mode =
113128
CurrentMode::Offline(CurrentModeOfflineReason::FailedToGetNewConfiguration);
129+
condition_variable.notify_all();
130+
114131
Ok(())
115132
}
116133
}
@@ -125,10 +142,11 @@ impl<T: ServerClient> UpdateThreadWorker<T> {
125142
/// there is any error receiving the messages. It's up to the caller to implement
126143
/// the recovery procedure for these scenarios.
127144
fn handle_websocket_message<WS: WebsocketReader>(&self, mut socket: WS) -> Result<Option<WS>> {
145+
let (current_mode_mutex, condition_variable) = &*self.current_mode;
128146
match socket.read_msg() {
129147
Ok(msg) => match msg {
130148
tungstenite::Message::Text(utf8_bytes) => {
131-
let current_mode_clone = self.current_mode.lock()?.clone();
149+
let current_mode_clone = current_mode_mutex.lock()?.clone();
132150
match (utf8_bytes.as_str(), current_mode_clone) {
133151
(SERVER_HEARTBEAT, CurrentMode::Offline(_)) => {
134152
self.update_configuration_from_server_and_current_mode()?;
@@ -141,8 +159,9 @@ impl<T: ServerClient> UpdateThreadWorker<T> {
141159
Ok(Some(socket))
142160
}
143161
tungstenite::Message::Close(_) => {
144-
*self.current_mode.lock()? =
162+
*current_mode_mutex.lock()? =
145163
CurrentMode::Offline(CurrentModeOfflineReason::WebsocketClosed);
164+
condition_variable.notify_all();
146165
Ok(None)
147166
}
148167
_ => {
@@ -151,8 +170,9 @@ impl<T: ServerClient> UpdateThreadWorker<T> {
151170
}
152171
},
153172
Err(_) => {
154-
*self.current_mode.lock()? =
173+
*current_mode_mutex.lock()? =
155174
CurrentMode::Offline(CurrentModeOfflineReason::WebsocketError);
175+
condition_variable.notify_all();
156176
Ok(None)
157177
}
158178
}

0 commit comments

Comments
 (0)