Skip to content

Commit

Permalink
Adding initial redis output support
Browse files Browse the repository at this point in the history
  • Loading branch information
dvas0004 authored Sep 29, 2023
1 parent fd35e09 commit 8cb71d2
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
/.vscode
/.idea
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add Kerberos principals filter to subscriptions (#18)
- Add a setting to configure `heartbeats_queue_size` (#37)
- Add Tls support for encryption and authentication (#36)
- Add support for output events to redis list (#45)

### Changed

- Server log responses payload in TRACE level (#37)
- Remove `OperationID` from responses because we don't support "Robust Connection" (#37)
- Clear in-memory subscriptions when a SIGHUP signal is received, resulting in all file descriptors used by subscriptions being closed (#37)
- `heartbeats_queue_size` now defauts to 2048 instead of 32 (#37)
- `heartbeats_queue_size` now defaults to 2048 instead of 32 (#37)

## [0.1.0] - 2023-05-30

Expand Down
72 changes: 72 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ async fn main() {
.arg(arg!(<addr> "Remote IP address"))
.arg(arg!(<port> "TCP port").value_parser(value_parser!(u16)))
)
.subcommand(
Command::new("redis")
.about("Redis output")
.arg(arg!(<addr> "Redis address "))
.arg(arg!(<list> "Redis list"))
)
.subcommand(
Command::new("kafka")
.about("Kafka output")
Expand Down
16 changes: 15 additions & 1 deletion cli/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use common::{
subscription::{
ContentFormat, FileConfiguration, KafkaConfiguration, PrincsFilter, PrincsFilterOperation,
SubscriptionData, SubscriptionMachineState, SubscriptionOutput, SubscriptionOutputFormat,
TcpConfiguration,
TcpConfiguration, RedisConfiguration,
},
};
use roxmltree::{Document, Node};
Expand Down Expand Up @@ -630,6 +630,7 @@ async fn outputs_add(subscription: &mut SubscriptionData, matches: &ArgMatches)
};
let output = match matches.subcommand() {
Some(("tcp", matches)) => SubscriptionOutput::Tcp(format, outputs_add_tcp(matches)?, true),
Some(("redis", matches)) => SubscriptionOutput::Redis(format, outputs_add_redis(matches)?, true),
Some(("kafka", matches)) => {
SubscriptionOutput::Kafka(format, outputs_add_kafka(matches)?, true)
}
Expand All @@ -656,6 +657,19 @@ fn outputs_add_tcp(matches: &ArgMatches) -> Result<TcpConfiguration> {
Ok(TcpConfiguration::new(addr.clone(), *port))
}

fn outputs_add_redis(matches: &ArgMatches) -> Result<RedisConfiguration> {
let addr = matches
.get_one::<String>("addr")
.ok_or_else(|| anyhow!("Missing Redis server address"))?;

let list = matches
.get_one::<String>("list")
.ok_or_else(|| anyhow!("Missing Redis list"))?;

info!("Adding Redis output : address: {}, list {}", addr, list);
Ok(RedisConfiguration::new(addr.clone(), list.clone()))
}

fn outputs_add_kafka(matches: &ArgMatches) -> Result<KafkaConfiguration> {
let topic = matches
.get_one::<String>("topic")
Expand Down
36 changes: 36 additions & 0 deletions common/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ impl KafkaConfiguration {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct RedisConfiguration {
addr: String,
list: String,
}

impl RedisConfiguration {
pub fn new(addr:String, list: String) -> Self {
RedisConfiguration { addr, list }
}

/// Get a reference to the redis configuration's list.
pub fn list(&self) -> &str {
self.list.as_ref()
}

/// Get a reference to the redis configuration's server address.
pub fn addr(&self) -> &str {
self.addr.as_ref()
}

}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct TcpConfiguration {
addr: String,
Expand Down Expand Up @@ -104,6 +127,7 @@ pub enum SubscriptionOutput {
Files(SubscriptionOutputFormat, FileConfiguration, bool),
Kafka(SubscriptionOutputFormat, KafkaConfiguration, bool),
Tcp(SubscriptionOutputFormat, TcpConfiguration, bool),
Redis(SubscriptionOutputFormat, RedisConfiguration, bool),
}

impl SubscriptionOutput {
Expand All @@ -112,6 +136,7 @@ impl SubscriptionOutput {
SubscriptionOutput::Files(format, _, _) => format,
SubscriptionOutput::Kafka(format, _, _) => format,
SubscriptionOutput::Tcp(format, _, _) => format,
SubscriptionOutput::Redis(format, _, _) => format,
}
}

Expand All @@ -120,6 +145,7 @@ impl SubscriptionOutput {
SubscriptionOutput::Files(_, _, enabled) => *enabled,
SubscriptionOutput::Kafka(_, _, enabled) => *enabled,
SubscriptionOutput::Tcp(_, _, enabled) => *enabled,
SubscriptionOutput::Redis(_, _, enabled) => *enabled,
}
}

Expand All @@ -134,6 +160,9 @@ impl SubscriptionOutput {
SubscriptionOutput::Tcp(format, config, _) => {
SubscriptionOutput::Tcp(format.clone(), config.clone(), value)
}
SubscriptionOutput::Redis(format, config, _) => {
SubscriptionOutput::Redis(format.clone(), config.clone(), value)
}
}
}
}
Expand Down Expand Up @@ -162,6 +191,13 @@ impl Display for SubscriptionOutput {
enabled, format, config.addr, config.port
)
}
SubscriptionOutput::Redis(format, config, enabled) => {
write!(
f,
"Enabled: {:?}, Format: {}, Output: Redis({:?})",
enabled, format, config
)
}
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions doc/outputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,29 @@ You must provide an IP address or a hostname and a port to connect to.
$ openwec subscriptions edit my-subscription outputs add --format <format> tcp my.server.windomain.local 12000
```

### Redis

This output type sends events to a Redis list using the [LPUSH command](https://redis.io/commands/lpush/)

You must provide:
- a redis server address containing the IP and port to connect to.
- a list name

TODO:

- [ ] implement TLS connections to redis
- [ ] support redis auth
- [ ] ...

#### Examples

* Send events to a redis server into a list named "wec":

```
$ openwec subscriptions edit my-test-subscription outputs add --format <format> redis 127.0.0.1:6377 wec
```


## How to add a new output type ?

TODO
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ tokio-rustls = "0.24.1"
x509-parser = "0.15.1"
sha1 = "0.10.5"
hex = "0.4.3"
redis = { version = "0.23.3", features = ["tokio-comp", "aio"]}
6 changes: 5 additions & 1 deletion server/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use common::subscription::{FileConfiguration, KafkaConfiguration, SubscriptionOutput};
use common::subscription::{FileConfiguration, KafkaConfiguration, SubscriptionOutput, RedisConfiguration};

use crate::{event::EventMetadata, formatter::Format};

#[derive(Debug, Clone)]
pub enum OutputType {
Files(Format, FileConfiguration, bool),
Kafka(Format, KafkaConfiguration, bool),
Redis(Format, RedisConfiguration, bool),
Tcp(Format, String, u16, bool),
}

Expand All @@ -22,6 +23,9 @@ impl From<&SubscriptionOutput> for OutputType {
SubscriptionOutput::Kafka(sof, config, enabled) => {
OutputType::Kafka(sof.into(), config.clone(), *enabled)
}
SubscriptionOutput::Redis(sof, config, enabled) => {
OutputType::Redis(sof.into(), config.clone(), *enabled)
}
SubscriptionOutput::Tcp(sof, config, enabled) => OutputType::Tcp(
sof.into(),
config.addr().to_string(),
Expand Down
1 change: 1 addition & 0 deletions server/src/outputs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod file;
pub mod kafka;
pub mod tcp;
pub mod redis;
Loading

0 comments on commit 8cb71d2

Please sign in to comment.