Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ from your [IBMCloud account](https://cloud.ibm.com/).
Create your client with the context (environment and collection) you want to connect to

```rust
use appconfiguration_rust_sdk::{AppConfigurationClient, Entity, Result, Value, Feature};
use appconfiguration_rust_sdk::{
AppConfigurationClient, AppConfigurationClientIBMCloud,
Entity, Result, Value, Feature
};

// Create the client connecting to the server
let client = AppConfigurationClient::new(&apikey, &region, &guid, &environment_id, &collection_id)?;
let client = AppConfigurationClientIBMCloud::new(&apikey, &region, &guid, &environment_id, &collection_id)?;

// Get the feature you want to evaluate for your entities
let feature = client.get_feature("AB_testing_feature")?;
Expand Down
11 changes: 8 additions & 3 deletions examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::{collections::HashMap, env, thread, time::Duration};

use appconfiguration_rust_sdk::{AppConfigurationClient, Entity, Feature, Property, Value};
use appconfiguration_rust_sdk::{AppConfigurationClient, AppConfigurationClientIBMCloud, Entity, Feature, Property, Value};
use dotenvy::dotenv;
use std::error::Error;

Expand Down Expand Up @@ -48,8 +48,13 @@ fn main() -> std::result::Result<(), Box<dyn Error>> {
let feature_id = env::var("FEATURE_ID").expect("FEATURE_ID should be set.");
let property_id = env::var("PROPERTY_ID").expect("PROPERTY_ID should be set.");

let client =
AppConfigurationClient::new(&apikey, &region, &guid, &environment_id, &collection_id)?;
let client = AppConfigurationClientIBMCloud::new(
&apikey,
&region,
&guid,
&environment_id,
&collection_id,
)?;

let entity = CustomerEntity {
id: "user123".to_string(),
Expand Down
308 changes: 12 additions & 296 deletions src/client/app_configuration_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,316 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::client::cache::ConfigurationSnapshot;
use crate::Result;

use crate::client::feature_proxy::FeatureProxy;
use crate::client::feature_snapshot::FeatureSnapshot;
pub use crate::client::feature_proxy::FeatureProxy;
use crate::client::http;
use crate::client::property_proxy::PropertyProxy;
use crate::client::property_snapshot::PropertySnapshot;
pub use crate::client::property_proxy::PropertyProxy;
use crate::errors::{ConfigurationAccessError, Error, Result};
use crate::models::Segment;
use std::collections::{HashMap, HashSet};
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tungstenite::stream::MaybeTlsStream;
use tungstenite::Message;
use tungstenite::WebSocket;

/// App Configuration client for browsing, and evaluating features and
/// properties.
#[derive(Debug)]
pub struct AppConfigurationClient {
pub(crate) latest_config_snapshot: Arc<Mutex<ConfigurationSnapshot>>,
pub(crate) _thread_terminator: std::sync::mpsc::Sender<()>,
}
Comment on lines -32 to -38
Copy link
Collaborator Author

@jgsogo jgsogo Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this, and the implementation, has been moved to src/client/app_configuration_ibm_cloud.rs. There are no actual changes.


impl AppConfigurationClient {
/// Creates a client to retrieve configurations for a specific collection.
/// To uniquely address a collection the following is required:
/// - `region`
/// - `guid`: Identifies an instance
/// - `environment_id`
/// - `collection_id`
/// In addition `api_key` is required for authentication
pub fn new(
apikey: &str,
region: &str,
guid: &str,
environment_id: &str,
collection_id: &str,
) -> Result<Self> {
let access_token = http::get_access_token(&apikey)?;

// Populate initial configuration
let latest_config_snapshot: Arc<Mutex<ConfigurationSnapshot>> =
Arc::new(Mutex::new(Self::get_configuration_snapshot(
&access_token,
region,
guid,
environment_id,
collection_id,
)?));

// start monitoring configuration
let terminator = Self::update_cache_in_background(
latest_config_snapshot.clone(),
apikey,
region,
guid,
environment_id,
collection_id,
)?;

let client = AppConfigurationClient {
latest_config_snapshot,
_thread_terminator: terminator,
};

Ok(client)
}

fn get_configuration_snapshot(
access_token: &str,
region: &str,
guid: &str,
environment_id: &str,
collection_id: &str,
) -> Result<ConfigurationSnapshot> {
let configuration = http::get_configuration(
// TODO: access_token might expire. This will cause issues with long-running apps
&access_token,
&region,
&guid,
&collection_id,
&environment_id,
)?;
ConfigurationSnapshot::new(environment_id, configuration)
}

fn wait_for_configuration_update(
socket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
access_token: &str,
region: &str,
guid: &str,
collection_id: &str,
environment_id: &str,
) -> Result<ConfigurationSnapshot> {
loop {
// read() blocks until something happens.
match socket.read()? {
Message::Text(text) => match text.as_str() {
"test message" => {} // periodically sent by the server
_ => {
return Self::get_configuration_snapshot(
access_token,
region,
guid,
environment_id,
collection_id,
);
}
},
Message::Close(_) => {
return Err(Error::Other("Connection closed by the server".into()));
}
_ => {}
}
}
}

fn update_configuration_on_change(
mut socket: WebSocket<MaybeTlsStream<TcpStream>>,
latest_config_snapshot: Arc<Mutex<ConfigurationSnapshot>>,
access_token: String,
region: String,
guid: String,
collection_id: String,
environment_id: String,
) -> std::sync::mpsc::Sender<()> {
let (sender, receiver) = std::sync::mpsc::channel();

thread::spawn(move || {
loop {
// If the sender has gone (AppConfiguration instance is dropped), then finish this thread
if let Err(e) = receiver.try_recv() {
if e == std::sync::mpsc::TryRecvError::Disconnected {
break;
}
}

let config_snapshot = Self::wait_for_configuration_update(
&mut socket,
&access_token,
&region,
&guid,
&collection_id,
&environment_id,
);

match config_snapshot {
Ok(config_snapshot) => *latest_config_snapshot.lock()? = config_snapshot,
Err(e) => {
println!("Waiting for configuration update failed. Stopping to monitor for changes.: {e}");
break;
}
}
}
Ok::<(), Error>(())
});

sender
}
/// AppConfiguration client for browsing, and evaluating features and properties.
pub trait AppConfigurationClient {
fn get_feature_ids(&self) -> Result<Vec<String>>;

pub fn get_feature_ids(&self) -> Result<Vec<String>> {
Ok(self
.latest_config_snapshot
.lock()?
.features
.keys()
.cloned()
.collect())
}

pub fn get_feature(&self, feature_id: &str) -> Result<FeatureSnapshot> {
let config_snapshot = self.latest_config_snapshot.lock()?;

// Get the feature from the snapshot
let feature = config_snapshot.get_feature(feature_id)?;

// Get the segment rules that apply to this feature
let segments = {
let all_segment_ids = feature
.segment_rules
.iter()
.flat_map(|targeting_rule| {
targeting_rule
.rules
.iter()
.flat_map(|segment| &segment.segments)
})
.cloned()
.collect::<HashSet<String>>();
let segments: HashMap<String, Segment> = config_snapshot
.segments
.iter()
.filter(|&(key, _)| all_segment_ids.contains(key))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

// Integrity DB check: all segment_ids should be available in the snapshot
if all_segment_ids.len() != segments.len() {
return Err(ConfigurationAccessError::MissingSegments {
resource_id: feature_id.to_string(),
}
.into());
}

segments
};

Ok(FeatureSnapshot::new(feature.clone(), segments))
}
fn get_feature(&self, feature_id: &str) -> Result<FeatureSnapshot>;

/// Searches for the feature `feature_id` inside the current configured
/// collection, and environment.
///
/// Return `Ok(feature)` if the feature exists or `Err` if it does not.
pub fn get_feature_proxy<'a>(&'a self, feature_id: &str) -> Result<FeatureProxy<'a>> {
// FIXME: there is and was no validation happening if the feature exists.
// Comments and error messages in FeatureProxy suggest that this should happen here.
// same applies for properties.
Ok(FeatureProxy::new(self, feature_id.to_string()))
}

pub fn get_property_ids(&self) -> Result<Vec<String>> {
Ok(self
.latest_config_snapshot
.lock()
.map_err(|_| ConfigurationAccessError::LockAcquisitionError)?
.properties
.keys()
.cloned()
.collect())
}
fn get_feature_proxy<'a>(&'a self, feature_id: &str) -> Result<FeatureProxy<'a>>;

pub fn get_property(&self, property_id: &str) -> Result<PropertySnapshot> {
let config_snapshot = self.latest_config_snapshot.lock()?;
fn get_property_ids(&self) -> Result<Vec<String>>;

// Get the property from the snapshot
let property = config_snapshot.get_property(property_id)?;

// Get the segment rules that apply to this property
let segments = {
let all_segment_ids = property
.segment_rules
.iter()
.flat_map(|targeting_rule| {
targeting_rule
.rules
.iter()
.flat_map(|segment| &segment.segments)
})
.cloned()
.collect::<HashSet<String>>();
let segments: HashMap<String, Segment> = config_snapshot
.segments
.iter()
.filter(|&(key, _)| all_segment_ids.contains(key))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

// Integrity DB check: all segment_ids should be available in the snapshot
if all_segment_ids.len() != segments.len() {
// FIXME: Return some kind of DBIntegrity error
return Err(ConfigurationAccessError::MissingSegments {
resource_id: property_id.to_string(),
}
.into());
}

segments
};

Ok(PropertySnapshot::new(property.clone(), segments))
}
fn get_property(&self, property_id: &str) -> Result<PropertySnapshot>;

/// Searches for the property `property_id` inside the current configured
/// collection, and environment.
///
/// Return `Ok(property)` if the feature exists or `Err` if it does not.
pub fn get_property_proxy(&self, property_id: &str) -> Result<PropertyProxy> {
Ok(PropertyProxy::new(self, property_id.to_string()))
}

fn update_cache_in_background(
latest_config_snapshot: Arc<Mutex<ConfigurationSnapshot>>,
apikey: &str,
region: &str,
guid: &str,
environment_id: &str,
collection_id: &str,
) -> Result<std::sync::mpsc::Sender<()>> {
let access_token = http::get_access_token(&apikey)?;
let (socket, _response) = http::get_configuration_monitoring_websocket(
&access_token,
&region,
&guid,
&collection_id,
&environment_id,
)?;

let sender = Self::update_configuration_on_change(
socket,
latest_config_snapshot,
access_token,
region.to_string(),
guid.to_string(),
collection_id.to_string(),
environment_id.to_string(),
);

Ok(sender)
}
fn get_property_proxy(&self, property_id: &str) -> Result<PropertyProxy>;
}
Loading
Loading