Skip to content

Commit

Permalink
🐛✨ Fix cannot get stream error and add support for connection string
Browse files Browse the repository at this point in the history
  • Loading branch information
lsabi authored Oct 26, 2024
1 parent 519ba21 commit 97b4ca1
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::str::FromStr;
use iggy::client::TopicClient;
use iggy::client::{Client, MessageClient, StreamClient, UserClient};
use iggy::clients::client::IggyClient as RustIggyClient;
use iggy::clients::builder::IggyClientBuilder;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::consumer::Consumer as RustConsumer;
use iggy::identifier::Identifier;
Expand Down Expand Up @@ -50,15 +51,21 @@ impl IggyClient {
/// This initializes a new runtime for asynchronous operations.
/// Future versions might utilize asyncio for more Pythonic async.
#[new]
fn new() -> Self {
#[pyo3(signature = (conn=None))]
fn new(conn: Option<String>) -> Self {
// TODO: use asyncio
let runtime = Builder::new_multi_thread()
.worker_threads(4) // number of worker threads
.enable_all() // enables all available Tokio features
.build()
.unwrap();
let client = IggyClientBuilder::new()
.with_tcp()
.with_server_address(conn.unwrap_or("127.0.0.1:8090".to_string()))
.build()
.unwrap();
IggyClient {
inner: RustIggyClient::default(),
inner: client,
runtime,
}
}
Expand Down Expand Up @@ -103,23 +110,27 @@ impl IggyClient {
///
/// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure.
#[pyo3(
signature = (stream_id, name, partitions_count, compression_algorithm, topic_id = None, replication_factor = None)
signature = (stream_name, name, partitions_count, compression_algorithm = None, topic_id = None, replication_factor = None)
)]
fn create_topic(
&self,
stream_id: PyIdentifier,
stream: PyIdentifier,
name: String,
partitions_count: u32,
compression_algorithm: String,
compression_algorithm: Option<String>,
topic_id: Option<u32>,
replication_factor: Option<u8>,
) -> PyResult<()> {
let stream_id = Identifier::from(stream_id);
let compression_algorithm = CompressionAlgorithm::from_str(&compression_algorithm)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;

let compression_algorithm = match compression_algorithm {
Some(algo) => CompressionAlgorithm::from_str(&algo)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?,
None => CompressionAlgorithm::default()
};

let stream = stream.try_into().unwrap();
let create_topic_future = self.inner.create_topic(
&stream_id,
&stream,
&name,
partitions_count,
compression_algorithm,
Expand All @@ -132,6 +143,7 @@ impl IggyClient {
.runtime
.block_on(async move { create_topic_future.await })
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;
println!("Topic created with {:?}", _create_topic);
PyResult::Ok(())
}

Expand All @@ -140,8 +152,8 @@ impl IggyClient {
/// Returns Ok(()) on successful sending or a PyRuntimeError on failure.
fn send_messages(
&self,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
stream: PyIdentifier,
topic: PyIdentifier,
partitioning: u32,
messages: &Bound<'_, PyList>,
) -> PyResult<()> {
Expand All @@ -154,13 +166,15 @@ impl IggyClient {
.map(|message| message.inner)
.collect::<Vec<_>>();

let stream_id = Identifier::from(stream_id);
let topic_id = Identifier::from(topic_id);
// let stream_id = Identifier::from(stream_id);
// let topic_id = Identifier::from(topic_id);
let stream = stream_id.try_into().unwrap();
let topic = topic_id.try_into().unwrap();
let partitioning = Partitioning::partition_id(partitioning);

let send_message_future =
self.inner
.send_messages(&stream_id, &topic_id, &partitioning, messages.as_mut());
.send_messages(&stream, &topic, &partitioning, messages.as_mut());
self.runtime
.block_on(async move { send_message_future.await })
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;
Expand Down

0 comments on commit 97b4ca1

Please sign in to comment.