diff --git a/python_examples/consumer_with_identifier.py b/python_examples/consumer_with_identifier.py new file mode 100644 index 0000000..547d438 --- /dev/null +++ b/python_examples/consumer_with_identifier.py @@ -0,0 +1,53 @@ +import asyncio + +# Assuming there's a Python module for iggy with similar functionalities. +from iggy_py import IggyClient, ReceiveMessage, Consumer, Identifier + +STREAM_ID = 1 +TOPIC_ID = 1 +PARTITION_ID = 1 + +async def main(): + client = IggyClient() # Assuming default constructor has similar functionality. + try: + client.connect() + client.login_user("iggy", "iggy") + await consume_messages(client) + except Exception as e: + print("exception: {}", e) + +async def consume_messages(client: IggyClient): + interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep + print(f"Messages will be consumed from stream: {STREAM_ID}, topic: {TOPIC_ID}, partition: {PARTITION_ID} with interval {interval * 1000} ms.") + + offset = 0 + messages_per_batch = 10 + while True: + try: + polled_messages = client.poll_messages( + consumer=Consumer(Identifier(0)), + stream_id=STREAM_ID, + topic_id=TOPIC_ID, + partition_id=PARTITION_ID, + count=messages_per_batch, + auto_commit=True, + ) + if not polled_messages: + print("No messages found.") + await asyncio.sleep(interval) + continue + + offset += len(polled_messages) + for message in polled_messages: + handle_message(message) + await asyncio.sleep(interval) + except Exception as e: + print("exception: {}", e) + break + +def handle_message(message: ReceiveMessage): + payload = message.payload().decode('utf-8') + print(f"Handling message at offset: {message.offset()}, payload: {payload}...") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/client.rs b/src/client.rs index 267a6c8..955f1a7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -152,9 +152,17 @@ impl IggyClient { partition_id: u32, count: u32, auto_commit: bool, + consumer: Option>, ) -> PyResult> { + let consumer = match consumer { + Some(consumer) => RustConsumer { + kind: consumer.kind.clone().into(), + id: consumer.id.clone().into(), + }, + None => RustConsumer::default(), + }; let poll_message_cmd = PollMessages { - consumer: RustConsumer::default(), + consumer: consumer, stream_id: Identifier::numeric(stream_id).unwrap(), topic_id: Identifier::numeric(topic_id).unwrap(), partition_id: Some(partition_id), diff --git a/src/consumer.rs b/src/consumer.rs new file mode 100644 index 0000000..f2ccbec --- /dev/null +++ b/src/consumer.rs @@ -0,0 +1,45 @@ +use pyo3; +use crate::pyclass; +use crate::pymethods; +use crate::identifier; + +/// Maps to iggy::consumer::ConsumerKind +#[pyclass] +#[derive(Default, Clone)] +pub enum ConsumerKind { + #[default] + Consumer, + ConsumerGroup, +} +impl Into for ConsumerKind { + fn into(self) -> iggy::consumer::ConsumerKind { + match self { + ConsumerKind::Consumer => iggy::consumer::ConsumerKind::Consumer, + ConsumerKind::ConsumerGroup => iggy::consumer::ConsumerKind::ConsumerGroup, + } + } +} +/// Maps to iggy::consumer::Consumer +#[pyclass] +pub struct Consumer { + pub kind: ConsumerKind, + pub id: crate::identifier::Identifier, +} +impl Into for Consumer { + fn into(self) -> iggy::consumer::Consumer { + iggy::consumer::Consumer { + kind: self.kind.into(), + id: self.id.into(), + } + } +} +#[pymethods] +impl Consumer { + #[new] + pub fn new(id: pyo3::PyRef) -> Self { + Self { + kind: ConsumerKind::Consumer, + id: id.clone(), + } + } +} diff --git a/src/identifier.rs b/src/identifier.rs new file mode 100644 index 0000000..9c4d73c --- /dev/null +++ b/src/identifier.rs @@ -0,0 +1,48 @@ +use crate::pyclass; +use crate::pymethods; + +/// Maps to iggy::identifier::IdKind +#[pyclass] +#[derive(Default, Clone)] +pub enum IdKind { + #[default] + Numeric, + String, +} +impl Into for IdKind { + fn into(self) -> iggy::identifier::IdKind { + match self { + IdKind::Numeric => iggy::identifier::IdKind::Numeric, + IdKind::String => iggy::identifier::IdKind::String, + } + } +} + +/// Maps to iggy::identifier::Identifier +#[pyclass] +#[derive(Default, Clone)] +pub struct Identifier { + pub kind: IdKind, + pub length: u8, + pub value: Vec, +} +impl Into for Identifier { + fn into(self) -> iggy::identifier::Identifier { + iggy::identifier::Identifier { + kind: self.kind.into(), + length: self.length, + value: self.value, + } + } +} +#[pymethods] +impl Identifier { + #[new] + pub fn new(value: u32) -> Self { + Self { + kind: IdKind::Numeric, + length: 4, + value: value.to_le_bytes().to_vec(), + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 25dc692..b99b0b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ mod client; +mod consumer; +mod identifier; mod receive_message; mod send_message; @@ -13,5 +15,7 @@ fn iggy_py(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) }