Skip to content

Commit da75706

Browse files
authored
test(connectors): add random source liveness helper (#3377)
1 parent 10ca430 commit da75706

3 files changed

Lines changed: 69 additions & 95 deletions

File tree

core/integration/tests/connectors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod mongodb;
3030
mod postgres;
3131
mod quickwit;
3232
mod random;
33+
mod random_source_liveness;
3334
mod runtime;
3435
mod stdout;
3536

core/integration/tests/connectors/random/random_source.rs

Lines changed: 2 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -17,107 +17,14 @@
1717
* under the License.
1818
*/
1919

20-
use iggy_common::MessageClient;
21-
use iggy_common::{Consumer, Identifier, PollingStrategy};
20+
use crate::connectors::random_source_liveness;
2221
use integration::harness::seeds;
2322
use integration::iggy_harness;
24-
use std::time::Duration;
25-
use tokio::time::sleep;
2623

2724
#[iggy_harness(
2825
server(connectors_runtime(config_path = "tests/connectors/random/source.toml")),
2926
seed = seeds::connector_stream
3027
)]
3128
async fn random_source_produces_messages(harness: &TestHarness) {
32-
sleep(Duration::from_secs(1)).await;
33-
34-
let client = harness.root_client().await.unwrap();
35-
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
36-
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
37-
let consumer_id: Identifier = "test_consumer".try_into().unwrap();
38-
39-
let messages = client
40-
.poll_messages(
41-
&stream_id,
42-
&topic_id,
43-
None,
44-
&Consumer::new(consumer_id),
45-
&PollingStrategy::next(),
46-
10,
47-
true,
48-
)
49-
.await
50-
.expect("Failed to poll messages");
51-
52-
assert!(
53-
!messages.messages.is_empty(),
54-
"No messages received from random source"
55-
);
56-
assert!(
57-
messages.current_offset > 0,
58-
"Current offset should be greater than 0"
59-
);
60-
}
61-
62-
#[iggy_harness(
63-
server(connectors_runtime(config_path = "tests/connectors/random/source.toml")),
64-
seed = seeds::connector_stream
65-
)]
66-
async fn state_persists_across_connector_restart(harness: &mut TestHarness) {
67-
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
68-
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
69-
let consumer_id: Identifier = "state_test_consumer".try_into().unwrap();
70-
71-
sleep(Duration::from_secs(1)).await;
72-
73-
let client = harness.root_client().await.unwrap();
74-
let offset_before = {
75-
let messages = client
76-
.poll_messages(
77-
&stream_id,
78-
&topic_id,
79-
None,
80-
&Consumer::new(consumer_id.clone()),
81-
&PollingStrategy::next(),
82-
100,
83-
true,
84-
)
85-
.await
86-
.expect("Failed to poll messages before restart");
87-
assert!(
88-
messages.current_offset > 0,
89-
"Should have messages before restart"
90-
);
91-
messages.current_offset
92-
};
93-
94-
harness
95-
.server_mut()
96-
.stop_dependents()
97-
.expect("Failed to stop connectors");
98-
harness
99-
.server_mut()
100-
.start_dependents()
101-
.await
102-
.expect("Failed to restart connectors");
103-
sleep(Duration::from_secs(1)).await;
104-
105-
let offset_after = client
106-
.poll_messages(
107-
&stream_id,
108-
&topic_id,
109-
None,
110-
&Consumer::new(consumer_id),
111-
&PollingStrategy::next(),
112-
100,
113-
true,
114-
)
115-
.await
116-
.expect("Failed to poll messages after restart")
117-
.current_offset;
118-
119-
assert!(
120-
offset_after > offset_before,
121-
"After restart, offset {offset_after} should be greater than before {offset_before}"
122-
);
29+
random_source_liveness::assert_produces_messages(harness).await;
12330
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
use std::time::Duration;
21+
22+
use iggy_common::{Consumer, Identifier, MessageClient, PollingStrategy};
23+
use integration::harness::{TestHarness, seeds};
24+
use tokio::time::{sleep, timeout};
25+
26+
const CONSUMER_NAME: &str = "random_source_liveness_consumer";
27+
const POLL_BATCH: u32 = 100;
28+
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
29+
const POLL_TIMEOUT: Duration = Duration::from_secs(5);
30+
31+
pub(crate) async fn assert_produces_messages(harness: &TestHarness) {
32+
let client = harness.root_client().await.expect("root client");
33+
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
34+
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
35+
let consumer_id: Identifier = CONSUMER_NAME.try_into().unwrap();
36+
37+
let poll = async {
38+
loop {
39+
if let Ok(polled) = client
40+
.poll_messages(
41+
&stream_id,
42+
&topic_id,
43+
None,
44+
&Consumer::new(consumer_id.clone()),
45+
&PollingStrategy::next(),
46+
POLL_BATCH,
47+
true,
48+
)
49+
.await
50+
{
51+
if !polled.messages.is_empty() {
52+
return;
53+
}
54+
}
55+
56+
sleep(RETRY_INTERVAL).await;
57+
}
58+
};
59+
60+
timeout(POLL_TIMEOUT, poll).await.unwrap_or_else(|_| {
61+
panic!(
62+
"random source liveness timed out after {:?} waiting for messages",
63+
POLL_TIMEOUT
64+
)
65+
})
66+
}

0 commit comments

Comments
 (0)