diff --git a/common/src/database/redis.rs b/common/src/database/redis.rs index 58275bc..72e762a 100644 --- a/common/src/database/redis.rs +++ b/common/src/database/redis.rs @@ -41,6 +41,7 @@ use crate::heartbeat::{HeartbeatData, HeartbeatKey, HeartbeatValue, HeartbeatsCa use crate::subscription::{ SubscriptionData, SubscriptionMachine, SubscriptionMachineState, SubscriptionStatsCounters }; +use crate::database::schema::redis::v1::subscription::*; use futures_util::stream::StreamExt; use super::schema::{Migration, MigrationBase, Version}; @@ -440,11 +441,11 @@ impl Database for RedisDatabase { let mut subscriptions = Vec::new(); for key in keys { - let subscription_json: Option = conn.get(&key).await.context("Failed to get subscription data")?; + let subscription_redis_data: Option = conn.get(&key).await.context("Failed to get subscription data")?; - if let Some(subscription_json) = subscription_json { - match serde_json::from_str::(&subscription_json) { - Ok(subscription) => subscriptions.push(subscription), + if let Some(subscription_redis_data) = subscription_redis_data { + match serde_json::from_str::(&subscription_redis_data) { + Ok(subscription) => subscriptions.push(subscription.into_subscription_data()), Err(err) => { log::warn!("Failed to deserialize subscription data for key {}: {}", key, err); } @@ -473,8 +474,8 @@ impl Database for RedisDatabase { if !filtered.is_empty() { let result: Option = conn.get(&filtered[0]).await.context("Failed to get subscription data")?; if result.is_some() { - let subscription: SubscriptionData = serde_json::from_str(&result.unwrap()).context("Failed to deserialize subscription data")?; - return Ok(Some(subscription)); + let subscription_redis_data: SubscriptionRedisData = serde_json::from_str(&result.unwrap()).context("Failed to deserialize subscription data")?; + return Ok(Some(subscription_redis_data.into_subscription_data())); } } Ok(None) @@ -489,8 +490,11 @@ impl Database for RedisDatabase { let _:() = conn.del(keys).await?; } + + let subscription_redis_data = SubscriptionRedisData::from_subscription_data(subscription); + let key = format!("{}:{}:{}", RedisDomain::Subscription, subscription.uuid().to_string().to_uppercase(), subscription.name()); - let value = serde_json::to_string(subscription).context("Failed to serialize subscription data")?; + let value = serde_json::to_string(&subscription_redis_data).context("Failed to serialize subscription data")?; let _ : String = conn.set(key, value).await.context("Failed to store subscription data")?; Ok(()) } diff --git a/common/src/database/schema/redis/mod.rs b/common/src/database/schema/redis/mod.rs index 3cf4c39..8bd0bc9 100644 --- a/common/src/database/schema/redis/mod.rs +++ b/common/src/database/schema/redis/mod.rs @@ -1,5 +1,7 @@ use crate::database::redis::RedisDatabase; +pub mod v1; + pub fn register_migrations(_redis_db: &mut RedisDatabase) { // for future changes } diff --git a/common/src/database/schema/redis/v1/mod.rs b/common/src/database/schema/redis/v1/mod.rs new file mode 100644 index 0000000..8e061c6 --- /dev/null +++ b/common/src/database/schema/redis/v1/mod.rs @@ -0,0 +1,3 @@ +pub mod subscription; + +pub const VERSION: &str = module_path!(); diff --git a/common/src/database/schema/redis/v1/subscription.rs b/common/src/database/schema/redis/v1/subscription.rs new file mode 100644 index 0000000..10d5f3c --- /dev/null +++ b/common/src/database/schema/redis/v1/subscription.rs @@ -0,0 +1,45 @@ +use serde::{Deserialize, Serialize}; +use crate::subscription::*; + +use super::VERSION; + +#[derive(Debug, PartialEq, Clone, Eq, Serialize, Deserialize)] +pub struct SubscriptionRedisData { + version: String, + uuid: SubscriptionUuid, + internal_version: InternalVersion, + revision: Option, + uri: Option, + enabled: bool, + princs_filter: PrincsFilter, + parameters: SubscriptionParameters, + outputs: Vec, +} + +impl SubscriptionRedisData { + pub fn from_subscription_data(from: &SubscriptionData) -> Self { + Self { + version: VERSION.to_string(), + uuid: *from.uuid(), + internal_version: from.internal_version(), + revision: from.revision().cloned(), + uri: from.uri().cloned(), + enabled: from.enabled(), + princs_filter: from.princs_filter().clone(), + parameters: from.parameters().clone(), + outputs: from.outputs().to_vec(), + } + } + pub fn into_subscription_data(self) -> SubscriptionData { + let mut sd = SubscriptionData::new(&self.parameters.name, &self.parameters.query); + sd.set_revision(self.revision). + set_uuid(self.uuid). + set_uri(self.uri). + set_enabled(self.enabled). + set_princs_filter(self.princs_filter). + set_parameters(self.parameters). + set_outputs(self.outputs); + sd.set_internal_version(self.internal_version); + sd + } +} diff --git a/common/src/subscription.rs b/common/src/subscription.rs index fe0060c..e003f90 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -661,6 +661,15 @@ impl SubscriptionData { Ok(PublicVersion(Uuid::from_u64_pair(result, result))) } + pub fn parameters(&self) -> &SubscriptionParameters { + &self.parameters + } + + pub fn set_parameters(&mut self, parameters: SubscriptionParameters) -> &mut SubscriptionData { + self.parameters = parameters; + self + } + /// Get a reference to the subscription's name. pub fn name(&self) -> &str { self.parameters.name.as_ref()