diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index 0680893c0..2f953e9b2 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::HashMap; -use std::fmt::{self, Display}; +use std::fmt; use std::sync::{Arc, OnceLock, Weak}; use base64::Engine; @@ -29,7 +29,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::dispatcher::{DispatchKafkaEvent, KafkaIngressDispatcher, KafkaIngressEvent}; use crate::metric_definitions::KAFKA_INGRESS_REQUESTS; -use restate_core::{TaskCenter, TaskHandle, TaskKind}; +use restate_core::{task_center, TaskCenter, TaskHandle, TaskKind}; use restate_types::invocation::{Header, SpanRelation}; use restate_types::message::MessageIndex; use restate_types::schema::subscriptions::{ @@ -245,7 +245,7 @@ impl ConsumerTask { let (failures_tx, failures_rx) = mpsc::unbounded_channel(); let rebalance_context = RebalanceContext { - task_center: self.task_center.clone(), + task_center_handle: TaskCenter::current(), consumer: OnceLock::new(), topic_partition_tasks: parking_lot::Mutex::new(HashMap::new()), failures_tx, @@ -311,14 +311,14 @@ impl<'a> From> for TopicPartition { } } -impl Display for TopicPartition { +impl fmt::Display for TopicPartition { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}/{}", self.0, self.1) } } struct RebalanceContext { - task_center: TaskCenter, + task_center_handle: task_center::Handle, consumer: OnceLock>, topic_partition_tasks: parking_lot::Mutex>, failures_tx: mpsc::UnboundedSender, @@ -375,10 +375,9 @@ impl ConsumerContext for RebalanceContext { self.failures_tx.clone(), ); - if let Ok(task_handle) = self.task_center.spawn_unmanaged( + if let Ok(task_handle) = self.task_center_handle.spawn_unmanaged( TaskKind::Ingress, "kafka-partition-ingest", - None, task, ) { topic_partition_tasks.insert(partition, AbortOnDrop(task_handle)); @@ -431,7 +430,7 @@ impl Drop for AbortOnDrop { async fn topic_partition_queue_consumption_loop( sender: MessageSender, - partition: TopicPartition, + topic_partition: TopicPartition, topic_partition_consumer: StreamPartitionQueue, consumer: Arc, consumer_group_id: String, @@ -440,7 +439,9 @@ async fn topic_partition_queue_consumption_loop( debug!( restate.subscription.id = %sender.subscription.id(), messaging.consumer.group.name = consumer_group_id, - "Starting topic '{topic}' partition '{partition}' consumption loop from offset '{offset}'" + "Starting topic '{}' partition '{}' consumption loop", + topic_partition.0, + topic_partition.1 ); // this future will be aborted when the partition is no longer needed, so any exit is a failure let err = loop { @@ -453,7 +454,7 @@ async fn topic_partition_queue_consumption_loop( if let Err(err) = sender.send(&consumer_group_id, msg).await { break err; } - if let Err(err) = consumer.store_offset(&partition.0, partition.1, offset) { + if let Err(err) = consumer.store_offset(&topic_partition.0, topic_partition.1, offset) { break err.into(); } };