Skip to content

Commit 96fafb2

Browse files
authored
Fix BaseProducer::flush behavior and tests (#777)
Fix BaseProducer::flush behavior and tests. The flush function now correctly calls poll until all messages have been processed. Also: * Make it easier to pass extra arguments to the test suite (`./test_suite.sh -- --nocapture` for example); * Implement fmt::Debug for Metadata; * Set up logging during producer tests. * Fix docs warnings and format. * Apply suggestions from code review * Fix version number format in cicd config
1 parent 5f36b93 commit 96fafb2

File tree

17 files changed

+140
-73
lines changed

17 files changed

+140
-73
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,10 @@ jobs:
7575
max-parallel: 1
7676
matrix:
7777
include:
78-
- confluent-version: 7.9.1
79-
kafka-version: 3.9
80-
- confluent-version: 7.7.0
81-
kafka-version: 3.7
82-
- confluent-version: 7.5.1
83-
kafka-version: 3.6
84-
- confluent-version: 7.5.1
85-
kafka-version: 3.5
78+
- kafka-version: "4.0"
79+
- kafka-version: "3.9"
80+
- kafka-version: "3.8"
81+
- kafka-version: "3.7"
8682
runs-on: ubuntu-24.04
8783
steps:
8884
- uses: actions/checkout@v4
@@ -94,6 +90,5 @@ jobs:
9490
# - run: sudo apt-get install -qy valgrind # Valgrind currently disabled in testing
9591
- run: ./test_suite.sh
9692
env:
97-
CONFLUENT_VERSION: ${{ matrix.confluent-version }}
9893
KAFKA_VERSION: ${{ matrix.kafka-version }}
9994
TERM: xterm-256color

docker-compose.yaml

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
services:
22
kafka:
3-
image: confluentinc/cp-kafka:${CONFLUENT_VERSION:-7.9.1}
3+
image: bitnami/kafka:${KAFKA_VERSION:-4.0}
44
environment:
5-
CLUSTER_ID: "test-cluster"
6-
KAFKA_NODE_ID: 0
7-
KAFKA_PROCESS_ROLES: broker,controller
8-
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
9-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
10-
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
11-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
12-
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
13-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
14-
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
15-
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
16-
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
17-
KAFKA_NUM_PARTITIONS: 3
5+
# Enable KRaft mode (combined broker and controller)
6+
- KAFKA_CFG_NODE_ID=0
7+
- KAFKA_CFG_BROKER_ID=0 # In KRaft, this should be the same as the node ID
8+
- KAFKA_CFG_PROCESS_ROLES=broker,controller
9+
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
10+
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
11+
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
12+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
13+
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093
14+
15+
# Bitnami defaults
16+
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
17+
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
18+
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
19+
- KAFKA_CFG_NUM_PARTITIONS=3
20+
21+
# This is a Bitnami-specific variable to disable ZooKeeper
22+
- KAFKA_KRAFT_ENABLED=true
1823
ports: ["9092:9092"]

rdkafka-sys/build.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ fn main() {
7979
println!("cargo:rustc-link-lib=static=rdkafka");
8080
println!("cargo:root={}", rdkafka_dir);
8181
} else {
82-
eprintln!("Path to DEP_LIBRDKAFKA_STATIC_ROOT not set. Static linking failed. Exiting.");
82+
eprintln!(
83+
"Path to DEP_LIBRDKAFKA_STATIC_ROOT not set. Static linking failed. Exiting."
84+
);
8385
process::exit(1);
8486
}
8587
eprintln!("librdkafka will be linked statically using prebuilt binaries");
86-
87-
}
88-
else {
88+
} else {
8989
// Ensure that we are in the right directory
9090
let rdkafkasys_root = Path::new("rdkafka-sys");
9191
if rdkafkasys_root.exists() {
@@ -226,9 +226,9 @@ fn build_librdkafka() {
226226
// Since we're not actually installing to /usr or /usr/local, there's no
227227
// harm to always using "lib" here.
228228
.define("CMAKE_INSTALL_LIBDIR", "lib")
229-
// CMake 4.0.0 drops support for 3.2 compatibility, which is
230-
// required by librdkafka 2.3.0.
231-
.define("CMAKE_POLICY_VERSION_MINIMUM", "3.5");
229+
// CMake 4.0.0 drops support for 3.2 compatibility, which is
230+
// required by librdkafka 2.3.0.
231+
.define("CMAKE_POLICY_VERSION_MINIMUM", "3.5");
232232

233233
if env::var("CARGO_FEATURE_LIBZ").is_ok() {
234234
config.define("WITH_ZLIB", "1");

src/admin.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ impl<C: ClientContext> AdminClient<C> {
224224
/// The provided `offsets` is a topic partition list specifying which
225225
/// records to delete from a list of topic partitions. For each entry in the
226226
/// list, the messages at offsets before the specified offsets (exclusive)
227-
/// in the specified partition will be deleted. Use offset [`Offset::End`]
227+
/// in the specified partition will be deleted. Use offset [`crate::Offset::End`]
228228
/// to delete all records in the partition.
229229
///
230230
/// Returns a topic partition list describing the result of the deletion. If

src/consumer/stream_consumer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! High-level consumers with a [`Stream`](futures_util::Stream) interface.
1+
//! High-level consumers with a [`Stream`] interface.
22
33
use std::marker::PhantomData;
44
use std::os::raw::c_void;
@@ -172,16 +172,16 @@ impl<'a, C: ConsumerContext> Stream for MessageStream<'a, C> {
172172
}
173173
}
174174

175-
impl<'a, C: ConsumerContext> Drop for MessageStream<'a, C> {
175+
impl<C: ConsumerContext> Drop for MessageStream<'_, C> {
176176
fn drop(&mut self) {
177177
self.wakers.unregister(self.slot);
178178
}
179179
}
180180

181-
/// A high-level consumer with a [`Stream`](futures_util::Stream) interface.
181+
/// A high-level consumer with a [`Stream`] interface.
182182
///
183183
/// This consumer doesn't need to be polled explicitly. Extracting an item from
184-
/// the stream returned by the [`stream`](StreamConsumer::stream) will
184+
/// the stream returned by [`stream`](StreamConsumer::stream) will
185185
/// implicitly poll the underlying Kafka consumer.
186186
///
187187
/// If you activate the consumer group protocol by calling

src/message.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ unsafe impl KafkaDrop for RDKafkaMessage {
320320
const DROP: unsafe extern "C" fn(*mut Self) = no_op;
321321
}
322322

323-
impl<'a> fmt::Debug for BorrowedMessage<'a> {
323+
impl fmt::Debug for BorrowedMessage<'_> {
324324
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325325
write!(
326326
f,
@@ -421,7 +421,7 @@ impl<'a> BorrowedMessage<'a> {
421421
}
422422
}
423423

424-
impl<'a> Message for BorrowedMessage<'a> {
424+
impl Message for BorrowedMessage<'_> {
425425
type Headers = BorrowedHeaders;
426426

427427
fn key(&self) -> Option<&[u8]> {
@@ -488,8 +488,8 @@ impl<'a> Message for BorrowedMessage<'a> {
488488
}
489489
}
490490

491-
unsafe impl<'a> Send for BorrowedMessage<'a> {}
492-
unsafe impl<'a> Sync for BorrowedMessage<'a> {}
491+
unsafe impl Send for BorrowedMessage<'_> {}
492+
unsafe impl Sync for BorrowedMessage<'_> {}
493493

494494
//
495495
// ********** OWNED MESSAGE **********

src/metadata.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Cluster metadata.
22
33
use std::ffi::CStr;
4+
use std::fmt;
45
use std::slice;
56

67
use rdkafka_sys as rdsys;
@@ -33,6 +34,16 @@ impl MetadataBroker {
3334
}
3435
}
3536

37+
impl fmt::Debug for MetadataBroker {
38+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39+
f.debug_struct("MetadataBroker")
40+
.field("id", &self.id())
41+
.field("host", &self.host())
42+
.field("port", &self.port())
43+
.finish()
44+
}
45+
}
46+
3647
/// Partition metadata information.
3748
pub struct MetadataPartition(RDKafkaMetadataPartition);
3849

@@ -69,6 +80,21 @@ impl MetadataPartition {
6980
}
7081
}
7182

83+
impl fmt::Debug for MetadataPartition {
84+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85+
let mut debug_struct = f.debug_struct("MetadataPartition");
86+
debug_struct.field("id", &self.id());
87+
if let Some(err) = self.error() {
88+
debug_struct.field("error", &err);
89+
}
90+
debug_struct
91+
.field("leader", &self.leader())
92+
.field("replicas", &self.replicas())
93+
.field("isr", &self.isr()) // In-Sync Replicas
94+
.finish()
95+
}
96+
}
97+
7298
/// Topic metadata information.
7399
pub struct MetadataTopic(RDKafkaMetadataTopic);
74100

@@ -103,6 +129,18 @@ impl MetadataTopic {
103129
}
104130
}
105131

132+
impl fmt::Debug for MetadataTopic {
133+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134+
let mut debug_struct = f.debug_struct("MetadataTopic");
135+
debug_struct.field("name", &self.name());
136+
if let Some(err) = self.error() {
137+
debug_struct.field("error", &err);
138+
}
139+
debug_struct.field("partitions", &self.partitions());
140+
debug_struct.finish()
141+
}
142+
}
143+
106144
/// Metadata container.
107145
///
108146
/// This structure wraps the metadata pointer returned by rdkafka-sys, and
@@ -159,5 +197,16 @@ impl Metadata {
159197
}
160198
}
161199

200+
impl fmt::Debug for Metadata {
201+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202+
f.debug_struct("Metadata")
203+
.field("orig_broker_name", &self.orig_broker_name())
204+
.field("orig_broker_id", &self.orig_broker_id())
205+
.field("brokers", &self.brokers())
206+
.field("topics", &self.topics())
207+
.finish()
208+
}
209+
}
210+
162211
unsafe impl Send for Metadata {}
163212
unsafe impl Sync for Metadata {}

src/mocking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ where
383383
}
384384
}
385385

386-
impl<'c, C> Drop for MockCluster<'c, C>
386+
impl<C> Drop for MockCluster<'_, C>
387387
where
388388
C: ClientContext,
389389
{

src/producer/base_producer.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -507,23 +507,24 @@ where
507507

508508
// As this library uses the rdkafka Event API, flush will not call rd_kafka_poll() but instead wait for
509509
// the librdkafka-handled message count to reach zero. Runs until value reaches zero or timeout.
510+
// https://github.com/confluentinc/librdkafka/blob/c024ac13daf98667de2b8724986e97f489644c15/src/rdkafka.c#L4542-L4551
510511
fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
511512
let deadline: Deadline = timeout.into().into();
512-
while self.in_flight_count() > 0 && !deadline.elapsed() {
513-
let ret: RDKafkaRespErr = unsafe {
514-
rdsys::rd_kafka_flush(self.native_ptr(), deadline.remaining_millis_i32())
515-
};
516-
if let Deadline::Never = &deadline {
517-
self.poll(Timeout::After(Duration::ZERO));
518-
} else {
519-
self.poll(&deadline);
520-
}
521-
522-
if ret.is_error() {
523-
return Err(KafkaError::Flush(ret.into()));
513+
loop {
514+
match unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) } {
515+
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR_NO_ERROR => {
516+
// Flush completed
517+
return Ok(());
518+
}
519+
to @ rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__TIMED_OUT => {
520+
if deadline.elapsed() {
521+
return Err(KafkaError::Flush(to.into()));
522+
}
523+
self.poll(deadline.remaining().min(Duration::from_millis(100)));
524+
}
525+
e => return Err(KafkaError::Flush(e.into())),
524526
};
525527
}
526-
Ok(())
527528
}
528529

529530
fn purge(&self, flags: PurgeConfig) {

src/producer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ pub trait ProducerContext<Part: Partitioner = NoCustomPartitioner>: ClientContex
205205
/// If custom partitioner is not used then `partitioner` configuration property is used (or its default).
206206
///
207207
/// sticky.partitioning.linger.ms must be 0 to run custom partitioner for messages with null key.
208-
/// See https://github.com/confluentinc/librdkafka/blob/081fd972fa97f88a1e6d9a69fc893865ffbb561a/src/rdkafka_msg.c#L1192-L1196
208+
/// See <https://github.com/confluentinc/librdkafka/blob/081fd972fa97f88a1e6d9a69fc893865ffbb561a/src/rdkafka_msg.c#L1192-L1196>
209209
fn get_custom_partitioner(&self) -> Option<&Part> {
210210
None
211211
}

0 commit comments

Comments
 (0)