Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions scylla-server/src/controllers/rule_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ pub async fn get_all_rules_with_client_info(
.await,
))
}

#[debug_handler]
pub async fn check_rule(
Extension(rules_manager): Extension<Arc<RuleManager>>,
Json(rule): Json<Rule>,
) -> Json<bool> {
debug!("Checking if rule exists: {} - {}", rule.topic, rule.expr());
Json(rules_manager.check_rule(&rule.topic, &rule.expr).await)
}
12 changes: 11 additions & 1 deletion scylla-server/src/rule_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ impl Rule {
}
}

pub fn expr(&self) -> &str {
&self.expr
}

/// process an event of seeing this topic, with the given values
fn process_seen(&self, values: &[f32]) -> Option<bool> {
let mut context: HashMapContext<DefaultNumericTypes> =
Expand Down Expand Up @@ -485,7 +489,13 @@ impl RuleManager {
}
}
}

pub async fn check_rule(&self, topic: &Topic, expr: &str) -> bool {
self.rules
.read()
.await
.values()
.any(|rule| rule.topic == *topic && rule.expr() == expr)
}
pub async fn get_all_rules(&self) -> Vec<Rule> {
self.rules
.read()
Expand Down
60 changes: 60 additions & 0 deletions scylla-server/tests/rule_structs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,66 @@ async fn test_concurrent_high_frequency_messages() -> Result<(), RuleManagerErro
}
}
}
#[tokio::test]
async fn test_check_rule_exists() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client = ClientId("test_client".to_string());

let rule = Rule::new(
RuleId("rule_1".to_string()),
Topic("test/topic".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

rule_manager.add_rule(client, rule).await?;

assert!(rule_manager.check_rule(&Topic("test/topic".to_string()), "a > 10").await);
Ok(())
}

#[tokio::test]
async fn test_check_rule_wrong_expr() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client = ClientId("test_client".to_string());

let rule = Rule::new(
RuleId("rule_1".to_string()),
Topic("test/topic".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

rule_manager.add_rule(client, rule).await?;

assert!(!rule_manager.check_rule(&Topic("test/topic".to_string()), "a > 20").await);
Ok(())
}

#[tokio::test]
async fn test_check_rule_wrong_topic() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let client = ClientId("test_client".to_string());

let rule = Rule::new(
RuleId("rule_1".to_string()),
Topic("test/topic".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

rule_manager.add_rule(client, rule).await?;

assert!(!rule_manager.check_rule(&Topic("wrong/topic".to_string()), "a > 10").await);
Ok(())
}

#[tokio::test]
async fn test_check_rule_empty_manager() {
let rule_manager = RuleManager::new();

assert!(!rule_manager.check_rule(&Topic("test/topic".to_string()), "a > 10").await);
}

// Verify system state is unchanged
assert_eq!(rule_manager.get_all_rules().await.len(), num_rules);
Expand Down
Loading