Skip to content

Commit

Permalink
ref(rust): Remove unused CLI option max_bytes_before_external_group_by (
Browse files Browse the repository at this point in the history
#6651)

depends on successful rollout of
getsentry/ops#13279
  • Loading branch information
untitaker authored Jan 2, 2025
1 parent aa9f9fa commit e45fa84
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 68 deletions.
1 change: 0 additions & 1 deletion rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ fn create_factory(
},
stop_at_timestamp: None,
batch_write_timeout: None,
max_bytes_before_external_group_by: None,
};
Box::new(factory)
}
Expand Down
4 changes: 0 additions & 4 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub fn consumer(
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
max_bytes_before_external_group_by: Option<usize>,
max_dlq_buffer_length: Option<usize>,
) -> usize {
py.allow_threads(|| {
Expand All @@ -64,7 +63,6 @@ pub fn consumer(
health_check_file,
stop_at_timestamp,
batch_write_timeout_ms,
max_bytes_before_external_group_by,
mutations_mode,
max_dlq_buffer_length,
)
Expand All @@ -87,7 +85,6 @@ pub fn consumer_impl(
health_check_file: Option<&str>,
stop_at_timestamp: Option<i64>,
batch_write_timeout_ms: Option<u64>,
max_bytes_before_external_group_by: Option<usize>,
mutations_mode: bool,
max_dlq_buffer_length: Option<usize>,
) -> usize {
Expand Down Expand Up @@ -278,7 +275,6 @@ pub fn consumer_impl(
accountant_topic_config: consumer_config.accountant_topic,
stop_at_timestamp,
batch_write_timeout,
max_bytes_before_external_group_by,
};

StreamProcessor::with_kafka(config, factory, topic, dlq_policy)
Expand Down
2 changes: 0 additions & 2 deletions rust_snuba/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub struct ConsumerStrategyFactory {
pub accountant_topic_config: config::TopicConfig,
pub stop_at_timestamp: Option<i64>,
pub batch_write_timeout: Option<Duration>,
pub max_bytes_before_external_group_by: Option<usize>,
}

impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
Expand Down Expand Up @@ -121,7 +120,6 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactory {
&self.storage_config.clickhouse_cluster.password,
self.async_inserts,
self.batch_write_timeout,
self.max_bytes_before_external_group_by,
);

let accumulator = Arc::new(
Expand Down
49 changes: 0 additions & 49 deletions rust_snuba/src/strategies/clickhouse/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ impl BatchFactory {
clickhouse_password: &str,
async_inserts: bool,
batch_write_timeout: Option<Duration>,
max_bytes_before_external_group_by: Option<usize>,
) -> Self {
let mut headers = HeaderMap::with_capacity(5);
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
Expand Down Expand Up @@ -71,12 +70,6 @@ impl BatchFactory {
}
}

if let Some(max_bytes_before_external_group_by) = max_bytes_before_external_group_by {
let mut query_segment: String = "&max_bytes_before_external_group_by=".to_owned();
query_segment.push_str(&max_bytes_before_external_group_by.to_string());
query_params.push_str(&query_segment)
}

let url = format!("http://{hostname}:{http_port}?{query_params}");
let query = format!("INSERT INTO {table} FORMAT JSONEachRow");

Expand Down Expand Up @@ -275,7 +268,6 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -311,43 +303,6 @@ mod tests {
"",
true,
None,
None,
);

let mut batch = factory.new_batch();

batch
.write_rows(&RowData::from_encoded_rows(vec![
br#"{"hello": "world"}"#.to_vec()
]))
.unwrap();

concurrency.handle().block_on(batch.finish()).unwrap();

mock.assert();
}

#[test]
fn test_write_with_external_groupby() {
crate::testutils::initialize_python();
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/").body("{\"hello\": \"world\"}\n");
then.status(200).body("hi");
});

let concurrency = ConcurrencyConfig::new(1);
let factory = BatchFactory::new(
&server.host(),
server.port(),
"testtable",
"testdb",
&concurrency,
"default",
"",
true,
None,
Some(500_000),
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -382,7 +337,6 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -415,7 +369,6 @@ mod tests {
"",
false,
None,
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -452,7 +405,6 @@ mod tests {
// pass in an unreasonably short timeout
// which prevents the client request from reaching Clickhouse
Some(Duration::from_millis(0)),
None,
);

let mut batch = factory.new_batch();
Expand Down Expand Up @@ -487,7 +439,6 @@ mod tests {
true,
// pass in a reasonable timeout
Some(Duration::from_millis(1000)),
None,
);

let mut batch = factory.new_batch();
Expand Down
12 changes: 0 additions & 12 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,6 @@
default=None,
help="Optional timeout for batch writer client connecting and sending request to Clickhouse",
)
@click.option(
"--max-bytes-before-external-group-by",
type=int,
default=None,
help="""
Allow batching on disk for GROUP BY queries. This is a test mitigation for whether a
materialized view is causing OOM on inserts. If successful, this should be set in storage config.
If not successful, this option should be removed.
""",
)
def rust_consumer(
*,
storage_names: Sequence[str],
Expand Down Expand Up @@ -217,7 +207,6 @@ def rust_consumer(
enforce_schema: bool,
stop_at_timestamp: Optional[int],
batch_write_timeout_ms: Optional[int],
max_bytes_before_external_group_by: Optional[int],
mutations_mode: bool,
max_dlq_buffer_length: Optional[int]
) -> None:
Expand Down Expand Up @@ -271,7 +260,6 @@ def rust_consumer(
health_check_file,
stop_at_timestamp,
batch_write_timeout_ms,
max_bytes_before_external_group_by,
max_dlq_buffer_length,
)

Expand Down

0 comments on commit e45fa84

Please sign in to comment.