Skip to content

Commit

Permalink
subscription: implement ClientFilterType and ClientFilterFlags
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAnno committed Dec 11, 2024
1 parent 07ed61c commit 183c7b5
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 67 deletions.
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ async fn edit_filter(subscription: &mut SubscriptionData, matches: &ArgMatches)
warn!("'{}' filter has been set without principals making this subscription apply to nothing.", op)
}

subscription.set_client_filter(Some(ClientFilter::new(op, princs)));
subscription.set_client_filter(Some(ClientFilter::new_legacy(op, princs)));
return Ok(());
}

Expand Down
4 changes: 3 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ deadpool-sqlite = "0.5.0"
openssl = "0.10.66"
postgres-openssl = "0.5.0"
strum = { version = "0.26.1", features = ["derive"] }
bitflags = { version = "2.6.0", features = ["serde"] }
glob = "0.3.1"

[dev-dependencies]
tempfile = "3.14.0"
Expand Down Expand Up @@ -68,4 +70,4 @@ assets = [
{ source = "../openwec.conf.sample.toml", dest = "/usr/share/doc/openwec/", mode = "0644", doc = true },
{ source = "../README.md", dest = "/usr/share/doc/openwec/", mode = "0644", doc = true },
{ source = "../doc/*", dest = "/usr/share/doc/openwec/doc/", mode = "0644", doc = true },
]
]
38 changes: 20 additions & 18 deletions common/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub mod tests {
assert_eq!(toto.ignore_channel_error(), DEFAULT_IGNORE_CHANNEL_ERROR);
assert_eq!(toto.client_filter(), None);
assert_eq!(toto.is_active(), false);
assert_eq!(toto.is_active_for("couscous"), false);
assert_eq!(toto.is_active_for("couscous", None), false);
assert_eq!(toto.revision(), None);
assert_eq!(toto.data_locale(), None);
assert_eq!(toto.locale(), None);
Expand All @@ -196,6 +196,8 @@ pub mod tests {
.set_ignore_channel_error(false)
.set_client_filter(Some(ClientFilter::from(
"Only".to_string(),
"KerberosPrinc".to_string(),
None,
Some("couscous,boulette".to_string()),
)?))
.set_outputs(vec![
Expand Down Expand Up @@ -232,7 +234,7 @@ pub mod tests {
);
assert_eq!(
tata.client_filter().unwrap().targets(),
&HashSet::from(["couscous".to_string(), "boulette".to_string()])
HashSet::from(["couscous", "boulette"])
);

assert_eq!(
Expand All @@ -251,10 +253,10 @@ pub mod tests {
],
);
assert_eq!(tata.is_active(), true);
assert_eq!(tata.is_active_for("couscous"), true);
assert_eq!(tata.is_active_for("couscous", None), true);
// Filter is case-sensitive
assert_eq!(tata.is_active_for("Couscous"), false);
assert_eq!(tata.is_active_for("semoule"), false);
assert_eq!(tata.is_active_for("Couscous", None), false);
assert_eq!(tata.is_active_for("semoule", None), false);
assert_eq!(tata.revision(), Some("1472".to_string()).as_ref());
assert_eq!(tata.locale(), Some("fr-FR".to_string()).as_ref());
assert_eq!(tata.data_locale(), Some("en-US".to_string()).as_ref());
Expand Down Expand Up @@ -308,14 +310,14 @@ pub mod tests {
);
assert_eq!(
tata2.client_filter().unwrap().targets(),
&HashSet::from([
"couscous".to_string(),
"boulette".to_string(),
"semoule".to_string()
HashSet::from([
"couscous",
"boulette",
"semoule"
])
);
assert_eq!(tata2.is_active_for("couscous"), true);
assert_eq!(tata2.is_active_for("semoule"), true);
assert_eq!(tata2.is_active_for("couscous", None), true);
assert_eq!(tata2.is_active_for("semoule", None), true);
assert_eq!(tata2.revision(), Some("1890".to_string()).as_ref());
assert_eq!(tata2.locale(), Some("fr-FR".to_string()).as_ref()); // Unchanged
assert_eq!(tata2.data_locale(), Some("fr-FR".to_string()).as_ref());
Expand All @@ -342,12 +344,12 @@ pub mod tests {
);
assert_eq!(
tata2_clone.client_filter().unwrap().targets(),
&HashSet::from(["boulette".to_string(), "semoule".to_string()])
HashSet::from(["boulette", "semoule"])
);

assert_eq!(tata2_clone.is_active_for("couscous"), true);
assert_eq!(tata2_clone.is_active_for("semoule"), false);
assert_eq!(tata2_clone.is_active_for("boulette"), false);
assert_eq!(tata2_clone.is_active_for("couscous", None), true);
assert_eq!(tata2_clone.is_active_for("semoule", None), false);
assert_eq!(tata2_clone.is_active_for("boulette", None), false);

tata2_clone.set_client_filter(None);

Expand All @@ -358,9 +360,9 @@ pub mod tests {
.await?
.unwrap();
assert_eq!(tata2_clone_clone.client_filter(), None);
assert_eq!(tata2_clone_clone.is_active_for("couscous"), true);
assert_eq!(tata2_clone_clone.is_active_for("semoule"), true);
assert_eq!(tata2_clone_clone.is_active_for("boulette"), true);
assert_eq!(tata2_clone_clone.is_active_for("couscous", None), true);
assert_eq!(tata2_clone_clone.is_active_for("semoule", None), true);
assert_eq!(tata2_clone_clone.is_active_for("boulette", None), true);

db.delete_subscription(&toto3.uuid_string()).await?;
ensure!(
Expand Down
16 changes: 13 additions & 3 deletions common/src/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
let client_filter_op: Option<String> = row.try_get("client_filter_op")?;

let client_filter = match client_filter_op {
Some(op) => Some(ClientFilter::from(op, row.try_get("client_filter_value")?)?),
Some(op) => {
let client_filter_type: Option<_> = row.try_get("client_filter_type").unwrap();
let client_filter_type = client_filter_type.unwrap_or("KerberosPrinc".to_owned());
Some(ClientFilter::from(op, client_filter_type, row.try_get("client_filter_flags")?, row.try_get("client_filter_value")?)?)
},
None => None
};

Expand Down Expand Up @@ -628,6 +632,8 @@ impl Database for PostgresDatabase {

let max_envelope_size: i32 = subscription.max_envelope_size().try_into()?;
let client_filter_op: Option<String> = subscription.client_filter().map(|f| f.operation().to_string());
let client_filter_type = subscription.client_filter().map(|f| f.kind().to_string());
let client_filter_flags = subscription.client_filter().map(|f| f.flags().to_string());
let client_filter_value = subscription.client_filter().and_then(|f| f.targets_to_opt_string());

let count = self
Expand All @@ -638,9 +644,9 @@ impl Database for PostgresDatabase {
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, client_filter_op, client_filter_value, outputs, locale,
ignore_channel_error, client_filter_op, client_filter_type, client_filter_flags, client_filter_value, outputs, locale,
data_locale)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)
ON CONFLICT (uuid) DO UPDATE SET
version = excluded.version,
revision = excluded.revision,
Expand All @@ -658,6 +664,8 @@ impl Database for PostgresDatabase {
content_format = excluded.content_format,
ignore_channel_error = excluded.ignore_channel_error,
client_filter_op = excluded.client_filter_op,
client_filter_type = excluded.client_filter_type,
client_filter_flags = excluded.client_filter_flags,
client_filter_value = excluded.client_filter_value,
outputs = excluded.outputs,
locale = excluded.locale,
Expand All @@ -680,6 +688,8 @@ impl Database for PostgresDatabase {
&subscription.content_format().to_string(),
&subscription.ignore_channel_error(),
&client_filter_op,
&client_filter_type,
&client_filter_flags,
&client_filter_value,
&serde_json::to_string(subscription.outputs())?.as_str(),
&subscription.locale(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ impl PostgresMigration for AlterClientFilterInSubscriptionsTable {
async fn up(&self, tx: &mut Transaction) -> Result<()> {
tx.execute("ALTER TABLE subscriptions RENAME COLUMN princs_filter_op TO client_filter_op", &[]).await?;
tx.execute("ALTER TABLE subscriptions RENAME COLUMN princs_filter_value TO client_filter_value", &[]).await?;
tx.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_type TEXT", &[]).await?;
tx.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_flags TEXT", &[]).await?;
Ok(())
}

async fn down(&self, tx: &mut Transaction) -> Result<()> {
tx.execute("ALTER TABLE subscriptions RENAME COLUMN client_filter_op TO princs_filter_op", &[]).await?;
tx.execute("ALTER TABLE subscriptions RENAME COLUMN client_filter_value TO princs_filter_value", &[]).await?;
tx.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_type", &[]).await?;
tx.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_flags", &[]).await?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ impl SQLiteMigration for AlterClientFilterInSubscriptionsTable {
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
conn.execute("ALTER TABLE subscriptions RENAME COLUMN princs_filter_value TO client_filter_value", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
conn.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_type TEXT", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
conn.execute("ALTER TABLE subscriptions ADD COLUMN client_filter_flags TEXT", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
Ok(())
}

Expand All @@ -25,6 +29,10 @@ impl SQLiteMigration for AlterClientFilterInSubscriptionsTable {
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
conn.execute("ALTER TABLE subscriptions RENAME COLUMN client_filter_value TO princs_filter_value", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
conn.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_type", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
conn.execute("ALTER TABLE subscriptions DROP COLUMN client_filter_flags", [])
.map_err(|err| anyhow!("SQLiteError: {}", err))?;
Ok(())
}
}
16 changes: 13 additions & 3 deletions common/src/database/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@ fn row_to_subscription(row: &Row) -> Result<SubscriptionData> {
let client_filter_op: Option<String> = row.get("client_filter_op")?;

let client_filter = match client_filter_op {
Some(op) => Some(ClientFilter::from(op, row.get("client_filter_value")?)?),
Some(op) => {
let client_filter_type: Option<_> = row.get("client_filter_type")?;
let client_filter_type = client_filter_type.unwrap_or("KerberosPrinc".to_owned());
Some(ClientFilter::from(op, client_filter_type, row.get("client_filter_flags")?, row.get("client_filter_value")?)?)
},
None => None
};

Expand Down Expand Up @@ -553,6 +557,8 @@ impl Database for SQLiteDatabase {
async fn store_subscription(&self, subscription: &SubscriptionData) -> Result<()> {
let subscription = subscription.clone();
let client_filter_op: Option<String> = subscription.client_filter().map(|f| f.operation().to_string());
let client_filter_type = subscription.client_filter().map(|f| f.kind().to_string());
let client_filter_flags = subscription.client_filter().map(|f| f.flags().to_string());
let client_filter_value = subscription.client_filter().and_then(|f| f.targets_to_opt_string());

let count = self
Expand All @@ -564,12 +570,12 @@ impl Database for SQLiteDatabase {
r#"INSERT INTO subscriptions (uuid, version, revision, name, uri, query,
heartbeat_interval, connection_retry_count, connection_retry_interval,
max_time, max_elements, max_envelope_size, enabled, read_existing_events, content_format,
ignore_channel_error, client_filter_op, client_filter_value, outputs, locale,
ignore_channel_error, client_filter_op, client_filter_type, client_filter_flags, client_filter_value, outputs, locale,
data_locale)
VALUES (:uuid, :version, :revision, :name, :uri, :query,
:heartbeat_interval, :connection_retry_count, :connection_retry_interval,
:max_time, :max_elements, :max_envelope_size, :enabled, :read_existing_events, :content_format,
:ignore_channel_error, :client_filter_op, :client_filter_value, :outputs,
:ignore_channel_error, :client_filter_op, :client_filter_type, :client_filter_flags, :client_filter_value, :outputs,
:locale, :data_locale)
ON CONFLICT (uuid) DO UPDATE SET
version = excluded.version,
Expand All @@ -588,6 +594,8 @@ impl Database for SQLiteDatabase {
content_format = excluded.content_format,
ignore_channel_error = excluded.ignore_channel_error,
client_filter_op = excluded.client_filter_op,
client_filter_type = excluded.client_filter_type,
client_filter_flags = excluded.client_filter_flags,
client_filter_value = excluded.client_filter_value,
outputs = excluded.outputs,
locale = excluded.locale,
Expand All @@ -610,6 +618,8 @@ impl Database for SQLiteDatabase {
":content_format": subscription.content_format().to_string(),
":ignore_channel_error": subscription.ignore_channel_error(),
":client_filter_op": client_filter_op,
":client_filter_type": client_filter_type,
":client_filter_flags": client_filter_flags,
":client_filter_value": client_filter_value,
":outputs": serde_json::to_string(subscription.outputs())?,
":locale": subscription.locale(),
Expand Down
10 changes: 8 additions & 2 deletions common/src/models/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ impl From<ClientFilterOperation> for crate::subscription::ClientFilterOperation
#[serde(deny_unknown_fields)]
struct ClientFilter {
pub operation: ClientFilterOperation,
#[serde(rename = "type", default)]
pub kind: crate::subscription::ClientFilterType,
#[serde(default)]
pub flags: crate::subscription::ClientFilterFlags,
#[serde(alias = "cert_subjects", alias = "princs")]
pub targets: HashSet<String>,
}
Expand All @@ -186,7 +190,7 @@ impl TryFrom<ClientFilter> for crate::subscription::ClientFilter {
type Error = anyhow::Error;

fn try_from(value: ClientFilter) -> std::prelude::v1::Result<Self, Self::Error> {
Ok(crate::subscription::ClientFilter::new(value.operation.into(), value.targets))
crate::subscription::ClientFilter::try_new(value.operation.into(), value.kind, value.flags, value.targets)
}
}

Expand Down Expand Up @@ -506,7 +510,9 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end"
let mut targets = HashSet::new();
targets.insert("[email protected]".to_string());
targets.insert("[email protected]".to_string());
let filter = crate::subscription::ClientFilter::new(crate::subscription::ClientFilterOperation::Only, targets);
let kind = crate::subscription::ClientFilterType::KerberosPrinc;
let flags = crate::subscription::ClientFilterFlags::empty();
let filter = crate::subscription::ClientFilter::try_new(crate::subscription::ClientFilterOperation::Only, kind, flags, targets)?;

expected.set_client_filter(Some(filter));

Expand Down
12 changes: 7 additions & 5 deletions common/src/models/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ mod v1 {
return None;
};

Some(crate::subscription::ClientFilter::new(op.into(), self.princs))
Some(crate::subscription::ClientFilter::new_legacy(op.into(), self.princs))
}
}

Expand Down Expand Up @@ -549,15 +549,15 @@ pub mod v2 {
return None;
};

Some(crate::subscription::ClientFilter::new(op.into(), self.princs))
Some(crate::subscription::ClientFilter::new_legacy(op.into(), self.princs))
}
}

impl From<Option<crate::subscription::ClientFilter>> for PrincsFilter {
fn from(value: Option<crate::subscription::ClientFilter>) -> Self {
Self {
operation: value.as_ref().and_then(|f| Some(f.operation().clone().into())),
princs: value.map_or(HashSet::new(), |f| f.targets().clone()),
princs: value.map_or(HashSet::new(), |f| f.targets().iter().cloned().map(String::from).collect()),
}
}
}
Expand Down Expand Up @@ -712,10 +712,12 @@ mod tests {
.set_max_elements(Some(100))
.set_read_existing_events(false)
.set_uri(Some("toto".to_string()))
.set_client_filter(Some(crate::subscription::ClientFilter::new(
.set_client_filter(Some(crate::subscription::ClientFilter::try_new(
crate::subscription::ClientFilterOperation::Except,
crate::subscription::ClientFilterType::KerberosPrinc,
crate::subscription::ClientFilterFlags::CaseSensitive,
targets,
)))
)?))
.set_outputs(vec![crate::subscription::SubscriptionOutput::new(
crate::subscription::SubscriptionOutputFormat::Json,
crate::subscription::SubscriptionOutputDriver::Tcp(
Expand Down
Loading

0 comments on commit 183c7b5

Please sign in to comment.