Skip to content

Commit

Permalink
Add polling strategy parameter to poll_messages (#28)
Browse files Browse the repository at this point in the history
Closes #23
  • Loading branch information
mmodzelewski authored Nov 1, 2024
1 parent 59a5ac3 commit 825717e
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 9 deletions.
3 changes: 2 additions & 1 deletion python_examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from loguru import logger

# Assuming there's a Python module for iggy with similar functionalities.
from iggy_py import IggyClient, ReceiveMessage
from iggy_py import IggyClient, ReceiveMessage, PollingStrategy

STREAM_NAME = "sample-stream"
TOPIC_NAME = "sample-topic"
Expand Down Expand Up @@ -36,6 +36,7 @@ async def consume_messages(client: IggyClient):
stream=STREAM_NAME,
topic=TOPIC_NAME,
partition_id=PARTITION_ID,
polling_strategy=PollingStrategy.Next(),
count=messages_per_batch,
auto_commit=True
)
Expand Down
13 changes: 9 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use iggy::clients::client::IggyClient as RustIggyClient;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::consumer::Consumer as RustConsumer;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::messages::poll_messages::PollingStrategy as RustPollingStrategy;
use iggy::messages::send_messages::{Message as RustMessage, Partitioning};
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use pyo3::prelude::*;
use pyo3::types::PyList;
use tokio::runtime::{Builder, Runtime};

use crate::receive_message::ReceiveMessage;
use crate::receive_message::{PollingStrategy, ReceiveMessage};
use crate::send_message::SendMessage;
use crate::stream::StreamDetails;
use crate::topic::TopicDetails;
Expand Down Expand Up @@ -164,7 +164,11 @@ impl IggyClient {
/// Gets topic by stream and id.
///
/// Returns Option of topic details or a PyRuntimeError on failure.
fn get_topic(&self, stream_id: PyIdentifier, topic_id: PyIdentifier) -> PyResult<Option<TopicDetails>> {
fn get_topic(
&self,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
) -> PyResult<Option<TopicDetails>> {
let stream_id = Identifier::from(stream_id);
let topic_id = Identifier::from(topic_id);
let topic_future = self.inner.get_topic(&stream_id, &topic_id);
Expand Down Expand Up @@ -215,13 +219,14 @@ impl IggyClient {
stream: PyIdentifier,
topic: PyIdentifier,
partition_id: u32,
polling_strategy: &PollingStrategy,
count: u32,
auto_commit: bool,
) -> PyResult<Vec<ReceiveMessage>> {
let consumer = RustConsumer::default();
let stream = Identifier::from(stream);
let topic = Identifier::from(topic);
let strategy = PollingStrategy::next();
let strategy: RustPollingStrategy = (*polling_strategy).into();

let poll_messages = self.inner.poll_messages(
&stream,
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod topic;

use client::IggyClient;
use pyo3::prelude::*;
use receive_message::ReceiveMessage;
use receive_message::{PollingStrategy, ReceiveMessage};
use send_message::SendMessage;
use stream::StreamDetails;
use topic::TopicDetails;
Expand All @@ -19,5 +19,6 @@ fn iggy_py(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<IggyClient>()?;
m.add_class::<StreamDetails>()?;
m.add_class::<TopicDetails>()?;
m.add_class::<PollingStrategy>()?;
Ok(())
}
23 changes: 23 additions & 0 deletions src/receive_message.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use iggy::messages::poll_messages::PollingStrategy as RustPollingStrategy;
use iggy::models::messages::PolledMessage as RustReceiveMessage;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
Expand Down Expand Up @@ -35,3 +36,25 @@ impl ReceiveMessage {
self.inner.offset
}
}

#[derive(Clone, Copy)]
#[pyclass]
pub enum PollingStrategy {
Offset { value: u64 },
Timestamp { value: u64 },
First {},
Last {},
Next {},
}

impl From<PollingStrategy> for RustPollingStrategy {
fn from(value: PollingStrategy) -> Self {
match value {
PollingStrategy::Offset { value } => RustPollingStrategy::offset(value),
PollingStrategy::Timestamp { value } => RustPollingStrategy::timestamp(value.into()),
PollingStrategy::First {} => RustPollingStrategy::first(),
PollingStrategy::Last {} => RustPollingStrategy::last(),
PollingStrategy::Next {} => RustPollingStrategy::next(),
}
}
}
2 changes: 1 addition & 1 deletion src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct StreamDetails {

impl From<RustStreamDetails> for StreamDetails {
fn from(stream_details: RustStreamDetails) -> Self {
StreamDetails {
Self {
inner: stream_details,
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct TopicDetails {

impl From<RustTopicDetails> for TopicDetails {
fn from(topic_details: RustTopicDetails) -> Self {
TopicDetails {
Self {
inner: topic_details,
}
}
Expand Down Expand Up @@ -36,4 +36,3 @@ impl TopicDetails {
self.inner.partitions_count
}
}

0 comments on commit 825717e

Please sign in to comment.