Skip to content

Commit 15b316a

Browse files
authored
Refactor update thread (#20)
- removed unused branches - removed unwrap - removed unneeded sleep
2 parents 39416eb + 33f5ba0 commit 15b316a

File tree

1 file changed

+48
-54
lines changed

1 file changed

+48
-54
lines changed

src/client/app_configuration_client.rs

Lines changed: 48 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub use crate::client::feature_proxy::FeatureProxy;
1818
use crate::client::http;
1919
use crate::client::property::Property;
2020
pub use crate::client::property_proxy::PropertyProxy;
21-
use crate::errors::{ConfigurationAccessError, Result};
21+
use crate::errors::{ConfigurationAccessError, Result, Error};
2222
use crate::models::Segment;
2323
use std::collections::{HashMap, HashSet};
2424
use std::net::TcpStream;
@@ -83,7 +83,7 @@ impl AppConfigurationClient {
8383
}
8484

8585
fn get_configuration_snapshot(
86-
access_token: &String,
86+
access_token: &str,
8787
region: &str,
8888
guid: &str,
8989
environment_id: &str,
@@ -100,6 +100,37 @@ impl AppConfigurationClient {
100100
ConfigurationSnapshot::new(environment_id, configuration)
101101
}
102102

103+
fn wait_for_configuration_update(
104+
socket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
105+
access_token: &str,
106+
region: &str,
107+
guid: &str,
108+
collection_id: &str,
109+
environment_id: &str,
110+
) -> Result<ConfigurationSnapshot>{
111+
loop{
112+
// read() blocks until something happens.
113+
match socket.read()? {
114+
Message::Text(text) => match text.as_str() {
115+
"test message" => {} // periodically sent by the server
116+
_ => {
117+
return Self::get_configuration_snapshot(
118+
access_token,
119+
region,
120+
guid,
121+
environment_id,
122+
collection_id,
123+
);
124+
}
125+
},
126+
Message::Close(_) => {
127+
return Err(Error::Other("Connection closed by the server".into()));
128+
},
129+
_ => {}
130+
}
131+
}
132+
}
133+
103134
fn update_configuration_on_change(
104135
mut socket: WebSocket<MaybeTlsStream<TcpStream>>,
105136
latest_config_snapshot: Arc<Mutex<ConfigurationSnapshot>>,
@@ -111,62 +142,25 @@ impl AppConfigurationClient {
111142
) -> std::sync::mpsc::Sender<()> {
112143
let (sender, receiver) = std::sync::mpsc::channel();
113144

114-
thread::spawn(move || loop {
115-
// If the sender has gone (AppConfiguration instance is dropped), then finish this thread
116-
if let Err(e) = receiver.try_recv() {
117-
if e == std::sync::mpsc::TryRecvError::Disconnected {
118-
break;
119-
}
120-
}
121-
122-
// Wait for new data
123-
match socket.read() {
124-
Ok(Message::Text(text)) => match text.as_str() {
125-
"test message" => {
126-
println!("\t*** Test message received.");
127-
}
128-
_ => {
129-
let config_result = Self::get_configuration_snapshot(
130-
&access_token,
131-
&region,
132-
&guid,
133-
&environment_id,
134-
&collection_id,
135-
);
136-
let mut config_snapshot = latest_config_snapshot.lock().unwrap();
137-
match config_result {
138-
Ok(config) => *config_snapshot = config,
139-
Err(e) => println!("Error getting config snapshot: {}", e),
140-
}
145+
thread::spawn(move || {
146+
loop {
147+
// If the sender has gone (AppConfiguration instance is dropped), then finish this thread
148+
if let Err(e) = receiver.try_recv() {
149+
if e == std::sync::mpsc::TryRecvError::Disconnected {
150+
break;
141151
}
142-
},
143-
Ok(Message::Close(_)) => {
144-
println!("Connection closed by the server.");
145-
break;
146-
}
147-
Ok(Message::Binary(data)) => {
148-
println!("\t*** Received a message that has binary data {:?}", data);
149-
}
150-
Ok(Message::Ping(data)) => {
151-
println!("\t*** Received a ping message {:?}", data);
152152
}
153-
Ok(Message::Pong(data)) => {
154-
println!("\t*** Received a pong message {:?}", data);
155-
}
156-
Ok(Message::Frame(frame)) => {
157-
println!("\t*** Received a frame message {:?}", frame);
158-
}
159-
Err(e) => {
160-
// TODO: how to handle temporary connectivity issues / errors?
161-
// In current implementation we would terminate this thread.
162-
// Effectively freezing the configuration.
163-
println!("Error: {}", e);
164-
break;
153+
154+
let config_snapshot = Self::wait_for_configuration_update(&mut socket, &access_token, &region, &guid, &collection_id, &environment_id);
155+
156+
match config_snapshot{
157+
Ok(config_snapshot) => *latest_config_snapshot.lock()? = config_snapshot,
158+
Err(e) => {println!("Waiting for configuration update failed. Stopping to monitor for changes.: {e}"); break;}
165159
}
166160
}
167-
168-
thread::sleep(Duration::from_millis(100));
169-
});
161+
Ok::<(), Error>(())
162+
}
163+
);
170164

171165
sender
172166
}

0 commit comments

Comments
 (0)