Skip to content

Commit

Permalink
fu: add database schema v1 for subscription to support future migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
bshifter committed Feb 1, 2025
1 parent 7fb6a43 commit f8134d6
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 7 deletions.
18 changes: 11 additions & 7 deletions common/src/database/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::heartbeat::{HeartbeatData, HeartbeatKey, HeartbeatValue, HeartbeatsCa
use crate::subscription::{
SubscriptionData, SubscriptionMachine, SubscriptionMachineState, SubscriptionStatsCounters
};
use crate::database::schema::redis::v1::subscription::*;
use futures_util::stream::StreamExt;

use super::schema::{Migration, MigrationBase, Version};
Expand Down Expand Up @@ -440,11 +441,11 @@ impl Database for RedisDatabase {
let mut subscriptions = Vec::new();

for key in keys {
let subscription_json: Option<String> = conn.get(&key).await.context("Failed to get subscription data")?;
let subscription_redis_data: Option<String> = conn.get(&key).await.context("Failed to get subscription data")?;

if let Some(subscription_json) = subscription_json {
match serde_json::from_str::<SubscriptionData>(&subscription_json) {
Ok(subscription) => subscriptions.push(subscription),
if let Some(subscription_redis_data) = subscription_redis_data {
match serde_json::from_str::<SubscriptionRedisData>(&subscription_redis_data) {
Ok(subscription) => subscriptions.push(subscription.into_subscription_data()),
Err(err) => {
log::warn!("Failed to deserialize subscription data for key {}: {}", key, err);
}
Expand Down Expand Up @@ -473,8 +474,8 @@ impl Database for RedisDatabase {
if !filtered.is_empty() {
let result: Option<String> = conn.get(&filtered[0]).await.context("Failed to get subscription data")?;
if result.is_some() {
let subscription: SubscriptionData = serde_json::from_str(&result.unwrap()).context("Failed to deserialize subscription data")?;
return Ok(Some(subscription));
let subscription_redis_data: SubscriptionRedisData = serde_json::from_str(&result.unwrap()).context("Failed to deserialize subscription data")?;
return Ok(Some(subscription_redis_data.into_subscription_data()));
}
}
Ok(None)
Expand All @@ -489,8 +490,11 @@ impl Database for RedisDatabase {
let _:() = conn.del(keys).await?;
}


let subscription_redis_data = SubscriptionRedisData::from_subscription_data(subscription);

let key = format!("{}:{}:{}", RedisDomain::Subscription, subscription.uuid().to_string().to_uppercase(), subscription.name());
let value = serde_json::to_string(subscription).context("Failed to serialize subscription data")?;
let value = serde_json::to_string(&subscription_redis_data).context("Failed to serialize subscription data")?;
let _ : String = conn.set(key, value).await.context("Failed to store subscription data")?;
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions common/src/database/schema/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::database::redis::RedisDatabase;

pub mod v1;

pub fn register_migrations(_redis_db: &mut RedisDatabase) {
// for future changes
}
3 changes: 3 additions & 0 deletions common/src/database/schema/redis/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod subscription;

pub const VERSION: &str = module_path!();
45 changes: 45 additions & 0 deletions common/src/database/schema/redis/v1/subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use serde::{Deserialize, Serialize};
use crate::subscription::*;

use super::VERSION;

#[derive(Debug, PartialEq, Clone, Eq, Serialize, Deserialize)]
pub struct SubscriptionRedisData {
version: String,
uuid: SubscriptionUuid,
internal_version: InternalVersion,
revision: Option<String>,
uri: Option<String>,
enabled: bool,
princs_filter: PrincsFilter,
parameters: SubscriptionParameters,
outputs: Vec<SubscriptionOutput>,
}

impl SubscriptionRedisData {
pub fn from_subscription_data(from: &SubscriptionData) -> Self {
Self {
version: VERSION.to_string(),
uuid: *from.uuid(),
internal_version: from.internal_version(),
revision: from.revision().cloned(),
uri: from.uri().cloned(),
enabled: from.enabled(),
princs_filter: from.princs_filter().clone(),
parameters: from.parameters().clone(),
outputs: from.outputs().to_vec(),
}
}
pub fn into_subscription_data(self) -> SubscriptionData {
let mut sd = SubscriptionData::new(&self.parameters.name, &self.parameters.query);
sd.set_revision(self.revision).
set_uuid(self.uuid).
set_uri(self.uri).
set_enabled(self.enabled).
set_princs_filter(self.princs_filter).
set_parameters(self.parameters).
set_outputs(self.outputs);
sd.set_internal_version(self.internal_version);
sd
}
}
9 changes: 9 additions & 0 deletions common/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,15 @@ impl SubscriptionData {
Ok(PublicVersion(Uuid::from_u64_pair(result, result)))
}

pub fn parameters(&self) -> &SubscriptionParameters {
&self.parameters
}

pub fn set_parameters(&mut self, parameters: SubscriptionParameters) -> &mut SubscriptionData {
self.parameters = parameters;
self
}

/// Get a reference to the subscription's name.
pub fn name(&self) -> &str {
self.parameters.name.as_ref()
Expand Down

0 comments on commit f8134d6

Please sign in to comment.