Skip to content

Commit

Permalink
Implement a new, high-level SDK for Consumer and Producer (#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Aug 4, 2024
1 parent 3473419 commit 12f1b1a
Show file tree
Hide file tree
Showing 106 changed files with 4,062 additions and 1,176 deletions.
8 changes: 8 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
.config
.github
/assets
/local_data
/certs
/scripts
target
Dockerfile
docker-compose.yml
.dockerignore
.git
.gitignore
25 changes: 21 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 3 additions & 15 deletions bench/src/benchmarks/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
use async_trait::async_trait;
use futures::Future;
use iggy::client::{StreamClient, TopicClient};
use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::clients::client::IggyClient;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::error::IggyError;
use iggy::utils::expiry::IggyExpiry;
Expand Down Expand Up @@ -64,13 +64,7 @@ pub trait Benchmarkable {
let topic_id: u32 = 1;
let partitions_count: u32 = self.args().number_of_partitions();
let client = self.client_factory().create_client().await;
let client = IggyClient::create(
client,
IggyClientBackgroundConfig::default(),
None,
None,
None,
);
let client = IggyClient::create(client, None, None);
login_root(&client).await;
let streams = client.get_streams().await?;
for i in 1..=number_of_streams {
Expand Down Expand Up @@ -106,13 +100,7 @@ pub trait Benchmarkable {
let start_stream_id = self.args().start_stream_id();
let number_of_streams = self.args().number_of_streams();
let client = self.client_factory().create_client().await;
let client = IggyClient::create(
client,
IggyClientBackgroundConfig::default(),
None,
None,
None,
);
let client = IggyClient::create(client, None, None);
login_root(&client).await;
let streams = client.get_streams().await?;
for i in 1..=number_of_streams {
Expand Down
12 changes: 2 additions & 10 deletions bench/src/benchmarks/consumer_group_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::{
};
use async_trait::async_trait;
use iggy::{
client::ConsumerGroupClient,
clients::client::{IggyClient, IggyClientBackgroundConfig},
error::IggyError,
client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError,
utils::byte_size::IggyByteSize,
};
use integration::test_server::{login_root, ClientFactory};
Expand All @@ -33,13 +31,7 @@ impl ConsumerGroupBenchmark {
let start_stream_id = self.args().start_stream_id();
let topic_id: u32 = 1;
let client = self.client_factory().create_client().await;
let client = IggyClient::create(
client,
IggyClientBackgroundConfig::default(),
None,
None,
None,
);
let client = IggyClient::create(client, None, None);
login_root(&client).await;
for i in 1..=consumer_groups_count {
let consumer_group_id = CONSUMER_GROUP_BASE_ID + i;
Expand Down
10 changes: 2 additions & 8 deletions bench/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::args::simple::BenchmarkKind;
use crate::benchmark_result::{BenchmarkResult, LatencyPercentiles};
use iggy::client::{ConsumerGroupClient, MessageClient};
use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::clients::client::IggyClient;
use iggy::consumer::Consumer as IggyConsumer;
use iggy::error::IggyError;
use iggy::messages::poll_messages::PollingStrategy;
Expand Down Expand Up @@ -48,13 +48,7 @@ impl Consumer {
let default_partition_id: u32 = 1;
let total_messages = (self.messages_per_batch * self.message_batches) as u64;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(
client,
IggyClientBackgroundConfig::default(),
None,
None,
None,
);
let client = IggyClient::create(client, None, None);
login_root(&client).await;
let stream_id = self.stream_id.try_into().unwrap();
let topic_id = topic_id.try_into().unwrap();
Expand Down
10 changes: 2 additions & 8 deletions bench/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::args::simple::BenchmarkKind;
use crate::benchmark_result::{BenchmarkResult, LatencyPercentiles};
use iggy::client::MessageClient;
use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::clients::client::IggyClient;
use iggy::error::IggyError;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::utils::duration::IggyDuration;
Expand Down Expand Up @@ -52,13 +52,7 @@ impl Producer {
let default_partition_id: u32 = 1;
let total_messages = (self.messages_per_batch * self.message_batches) as u64;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(
client,
IggyClientBackgroundConfig::default(),
None,
None,
None,
);
let client = IggyClient::create(client, None, None);
login_root(&client).await;
info!(
"Producer #{} → preparing the test messages...",
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cli"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = ["[email protected]"]
repository = "https://github.com/iggy-rs/iggy"
Expand Down
14 changes: 4 additions & 10 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use iggy::cli::{
};
use iggy::cli_command::{CliCommand, PRINT_TARGET};
use iggy::client_provider::{self, ClientProviderConfig};
use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::clients::client::IggyClient;
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use iggy::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
use std::sync::Arc;
Expand Down Expand Up @@ -312,9 +312,9 @@ async fn main() -> Result<(), IggyCmdError> {
// Create credentials based on command line arguments and command
let mut credentials = IggyCredentials::new(&cli_options, &iggy_args, command.login_required())?;

let encryptor: Option<Box<dyn Encryptor>> = match iggy_args.encryption_key.is_empty() {
let encryptor: Option<Arc<dyn Encryptor>> = match iggy_args.encryption_key.is_empty() {
true => None,
false => Some(Box::new(
false => Some(Arc::new(
Aes256GcmEncryptor::from_base64_key(&iggy_args.encryption_key).unwrap(),
)),
};
Expand All @@ -323,13 +323,7 @@ async fn main() -> Result<(), IggyCmdError> {
let client =
client_provider::get_raw_client(client_provider_config, command.connection_required())
.await?;
let client = IggyClient::create(
client,
IggyClientBackgroundConfig::default(),
None,
None,
encryptor,
);
let client = IggyClient::create(client, None, encryptor);

credentials.set_iggy_client(&client);
credentials.login_user().await?;
Expand Down
3 changes: 3 additions & 0 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@
"path": "compatibility"
}
},
"state": {
"enforce_fsync": false
},
"runtime": {
"path": "runtime"
},
Expand Down
6 changes: 6 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ path = "compatibility"
## Specifies the directory where database files are stored, relative to `system.path`.
#path = "database"

[system.state]
# Determines whether to enforce file synchronization on state updates (boolean).
# `true` ensures immediate writing of data to disk for durability.
# `false` allows the OS to manage write operations, which can improve performance.
enforce_fsync = false

# Runtime configuration.
[system.runtime]
# Path for storing runtime data.
Expand Down
17 changes: 17 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,27 @@ path = "src/message-headers/consumer/main.rs"
name = "message-headers-producer"
path = "src/message-headers/producer/main.rs"

[[example]]
name = "multi-tenant-consumer"
path = "src/multi-tenant/consumer/main.rs"

[[example]]
name = "multi-tenant-producer"
path = "src/multi-tenant/producer/main.rs"

[[example]]
name = "new-sdk-consumer"
path = "src/new-sdk/consumer/main.rs"

[[example]]
name = "new-sdk-producer"
path = "src/new-sdk/producer/main.rs"

[dependencies]
anyhow = "1.0.86"
bytes = "1.6.0"
clap = { version = "4.5.4", features = ["derive"] }
futures-util = "0.3.30"
iggy = { path = "../sdk" }
rand = "0.8.5"
serde = { version = "1.0.203", features = ["derive", "rc"] }
Expand Down
1 change: 0 additions & 1 deletion examples/src/basic/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client.as_ref();
system::login_root(client).await;
system::init_by_consumer(&args, client).await;
system::consume_messages(&args, client, &handle_message).await
}
Expand Down
26 changes: 15 additions & 11 deletions examples/src/basic/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ async fn main() -> Result<(), Box<dyn Error>> {
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client.as_ref();
system::login_root(client).await;
system::init_by_producer(&args, client).await?;
produce_messages(&args, client).await
}

async fn produce_messages(args: &Args, client: &dyn Client) -> Result<(), Box<dyn Error>> {
let interval = args.get_interval();

info!(
"Messages will be sent to stream: {}, topic: {}, partition: {} with interval {} ms.",
args.stream_id, args.topic_id, args.partition_id, args.interval
"Messages will be sent to stream: {}, topic: {}, partition: {} with interval {}.",
args.stream_id,
args.topic_id,
args.partition_id,
interval.map_or("none".to_string(), |i| i.as_human_time_string())
);
let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.interval));
let stream_id = args.stream_id.clone().try_into()?;
let topic_id = args.topic_id.clone().try_into()?;
let mut interval = interval.map(|interval| tokio::time::interval(interval.get_duration()));
let mut current_id = 0u64;
let mut sent_batches = 0;
let partitioning = Partitioning::partition_id(args.partition_id);
Expand All @@ -41,6 +47,10 @@ async fn produce_messages(args: &Args, client: &dyn Client) -> Result<(), Box<dy
return Ok(());
}

if let Some(interval) = &mut interval {
interval.tick().await;
}

let mut messages = Vec::new();
let mut sent_messages = Vec::new();
for _ in 0..args.messages_per_batch {
Expand All @@ -51,15 +61,9 @@ async fn produce_messages(args: &Args, client: &dyn Client) -> Result<(), Box<dy
sent_messages.push(payload);
}
client
.send_messages(
&args.stream_id.try_into()?,
&args.topic_id.try_into()?,
&partitioning,
&mut messages,
)
.send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
.await?;
sent_batches += 1;
info!("Sent messages: {:#?}", sent_messages);
interval.tick().await;
}
}
Loading

0 comments on commit 12f1b1a

Please sign in to comment.