diff --git a/Cargo.lock b/Cargo.lock index c74e09086..b55696ba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6365,8 +6365,10 @@ dependencies = [ "base64 0.22.1", "bytes", "derive_builder", + "derive_more", "metrics", "opentelemetry", + "parking_lot", "rdkafka", "restate-bifrost", "restate-core", diff --git a/crates/ingress-kafka/Cargo.toml b/crates/ingress-kafka/Cargo.toml index 1623051b7..210d8383d 100644 --- a/crates/ingress-kafka/Cargo.toml +++ b/crates/ingress-kafka/Cargo.toml @@ -24,8 +24,10 @@ anyhow = { workspace = true } base64 = { workspace = true } bytes = { workspace = true } derive_builder = { workspace = true } +derive_more = { workspace = true } metrics = { workspace = true } opentelemetry = { workspace = true } +parking_lot = { workspace = true } rdkafka = { git = "https://github.com/restatedev/rust-rdkafka", rev = "4b5946309bdb669eb0c884cd9b7ad05578a0f6c6", features = ["libz-static", "cmake-build", "ssl-vendored"] } schemars = { workspace = true, optional = true } serde = { workspace = true } diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index fdc624da3..0680893c0 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -8,27 +8,28 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::fmt; -use std::sync::Arc; +use std::fmt::{self, Display}; +use std::sync::{Arc, OnceLock, Weak}; use base64::Engine; use bytes::Bytes; use metrics::counter; use opentelemetry::trace::TraceContextExt; use rdkafka::consumer::stream_consumer::StreamPartitionQueue; -use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer}; +use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::message::BorrowedMessage; -use rdkafka::{ClientConfig, Message}; -use tokio::sync::oneshot; -use tracing::{debug, info, info_span, Instrument}; +use rdkafka::topic_partition_list::TopicPartitionListElem; +use rdkafka::types::RDKafkaErrorCode; +use rdkafka::{ClientConfig, ClientContext, Message}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, info, info_span, warn, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::dispatcher::{DispatchKafkaEvent, KafkaIngressDispatcher, KafkaIngressEvent}; use crate::metric_definitions::KAFKA_INGRESS_REQUESTS; -use restate_core::{cancellation_watcher, TaskCenter, TaskId, TaskKind}; +use restate_core::{TaskCenter, TaskHandle, TaskKind}; use restate_types::invocation::{Header, SpanRelation}; use restate_types::message::MessageIndex; use restate_types::schema::subscriptions::{ @@ -51,11 +52,11 @@ pub enum Error { }, #[error("ingress dispatcher channel is closed")] IngressDispatcherClosed, - #[error("topic {0} partition {1} queue split didn't succeed")] - TopicPartitionSplit(String, i32), + #[error("received a message on the main partition queue for topic {0} partition {1} despite partitioned queues")] + UnexpectedMainQueueMessage(String, i32), } -type MessageConsumer = StreamConsumer; +type MessageConsumer = StreamConsumer; #[derive(Debug, Hash)] pub struct KafkaDeduplicationId { @@ -241,98 +242,221 @@ impl ConsumerTask { self.topics, self.client_config ); - let consumer: Arc = Arc::new(self.client_config.create()?); + let (failures_tx, failures_rx) = mpsc::unbounded_channel(); + + let rebalance_context = RebalanceContext { + task_center: self.task_center.clone(), + consumer: OnceLock::new(), + topic_partition_tasks: parking_lot::Mutex::new(HashMap::new()), + failures_tx, + sender: self.sender.clone(), + consumer_group_id, + }; + let consumer: Arc = + Arc::new(self.client_config.create_with_context(rebalance_context)?); + // this OnceLock dance is needed because the rebalance callbacks don't get a handle on the consumer, + // which is strange because practically everything you'd want to do with them involves the consumer. + _ = consumer.context().consumer.set(Arc::downgrade(&consumer)); + + // ensure partitioned tasks are cancelled when this function exits/stops being polled + let consumer = ConsumerDrop(consumer); + let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect(); consumer.subscribe(&topics)?; - let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default(); - - let result = loop { - tokio::select! { - res = consumer.recv() => { - let msg = match res { - Ok(msg) => msg, - Err(e) => break Err(e.into()) - }; - let topic = msg.topic().to_owned(); - let partition = msg.partition(); - let offset = msg.offset(); - - // If we didn't split the queue, let's do it and start the topic partition consumer - if let Entry::Vacant(e) = topic_partition_tasks.entry((topic.clone(), partition)) { - let topic_partition_consumer = match consumer - .split_partition_queue(&topic, partition) { - Some(q) => q, - None => break Err(Error::TopicPartitionSplit(topic.clone(), partition)) - }; - - debug!( - restate.subscription.id = %self.sender.subscription.id(), - messaging.consumer.group.name = consumer_group_id, - "Starting topic '{topic}' partition '{partition}' consumption loop from offset '{offset}'" - ); - - let task = topic_partition_queue_consumption_loop( - self.sender.clone(), - topic.clone(), partition, - topic_partition_consumer, - Arc::clone(&consumer), - consumer_group_id.clone() - ); - - if let Ok(task_id) = TaskCenter::spawn_child(TaskKind::Ingress, "partition-queue", task) { - e.insert(task_id); - } else { - break Ok(()); - } - } + let mut failures_rx = std::pin::pin!(failures_rx); + + tokio::select! { + // we have to poll the main consumer for callbacks to be processed, but we expect to only see messages on the partitioned queues + res = consumer.recv() => { + match res { + // We shouldn't see any messages on the main consumer loop, because we split the queues into partitioned queues before they + // are ever assigned. Messages here should be treated as a bug in our assumptions. + Ok(msg) => Err(Error::UnexpectedMainQueueMessage(msg.topic().into(), msg.partition())), + Err(e) => Err(e.into()), + } + } + // watch for errors in the partitioned consumers - they should only ever abort, not return errors + Some(err) = failures_rx.recv() => { + Err(err) + } + _ = &mut rx => { + Ok(()) + } + } + } +} + +#[derive(derive_more::Deref)] +struct ConsumerDrop(Arc); + +impl Drop for ConsumerDrop { + fn drop(&mut self) { + debug!( + "Stopping consumer with id {}", + self.context().consumer_group_id + ); + + // we have to clear this because the partitioned tasks themselves hold a reference to MessageConsumer + self.context().topic_partition_tasks.lock().clear(); + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +struct TopicPartition(String, i32); + +impl<'a> From> for TopicPartition { + fn from(value: TopicPartitionListElem<'a>) -> Self { + Self(value.topic().into(), value.partition()) + } +} + +impl Display for TopicPartition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.0, self.1) + } +} + +struct RebalanceContext { + task_center: TaskCenter, + consumer: OnceLock>, + topic_partition_tasks: parking_lot::Mutex>, + failures_tx: mpsc::UnboundedSender, + sender: MessageSender, + consumer_group_id: String, +} + +impl ClientContext for RebalanceContext {} + +// This callback is called synchronously with the poll of the main queue, so we don't want to block here. +// Once the pre balance steps finish assign() will be called. If we have not split at this point, +// then queues will be created defaulting to forward to the main loop - which we don't want. +// However, if we have split the partition before assign is called, the queue will be created +// with a flag RD_KAFKA_Q_F_FWD_APP and this flag will ensure that the queue will not be sent to the +// main loop. Therefore its critical that the splits happen synchronously before the pre_rebalance ends. +// +// On non-cooperative rebalance during assign all the existing partitions are revoked, +// and their queues are destroyed. Split partition queues will stop working in this case. We should ensure +// that they are not polled again after the assign. Then there will be a further rebalance callback after the revoke +// and we will set up new split partition streams before the assign. +impl ConsumerContext for RebalanceContext { + fn pre_rebalance(&self, rebalance: &Rebalance<'_>) { + let mut topic_partition_tasks = self.topic_partition_tasks.lock(); + let consumer = self + .consumer + .get() + .expect("consumer must have been set in context at rebalance time"); + + let Some(consumer) = consumer.upgrade() else { + // if the consumer has been dropped, we don't need to maintain tasks any more + return; + }; + + match rebalance { + Rebalance::Assign(partitions) => { + for partition in partitions.elements() { + let partition: TopicPartition = partition.into(); - // We got this message, let's send it through - if let Err(e) = self.sender.send(&consumer_group_id, msg).await { - break Err(e) + if let Some(task_id) = topic_partition_tasks.remove(&partition) { + // This probably implies a problem in our assumptions, because librdkafka shouldn't be assigning us a partition again without having revoked it. + // However its fair to assume that the existing partitioned consumer is now invalid. + warn!("Kafka informed us of an assigned partition {partition} which we already consider assigned, cancelling the existing partitioned consumer"); + drop(task_id); } - // This method tells rdkafka that we have processed this message, - // so its offset can be safely committed. - // rdkafka periodically commits these offsets asynchronously, with a period configurable - // with auto.commit.interval.ms - if let Err(e) = consumer.store_offset(&topic, partition, offset) { - break Err(e.into()) + match consumer.split_partition_queue(&partition.0, partition.1) { + Some(queue) => { + let task = topic_partition_queue_consumption_loop( + self.sender.clone(), + partition.clone(), + queue, + Arc::clone(&consumer), + self.consumer_group_id.clone(), + self.failures_tx.clone(), + ); + + if let Ok(task_handle) = self.task_center.spawn_unmanaged( + TaskKind::Ingress, + "kafka-partition-ingest", + None, + task, + ) { + topic_partition_tasks.insert(partition, AbortOnDrop(task_handle)); + } else { + // shutting down + return; + } + } + None => { + warn!("Invalid partition {partition} given to us in rebalance, ignoring it"); + continue; + } + } + } + } + Rebalance::Revoke(partitions) => { + for partition in partitions.elements() { + let partition = partition.into(); + match topic_partition_tasks.remove(&partition) + { + Some(task_id) => { + debug!("Stopping partitioned consumer for partition {partition} due to rebalance"); + // The partitioned queue will not be polled again. + // It might be mid-poll right now, but if so its result will not be sent anywhere. + drop(task_id); } + None => warn!("Kafka informed us of a revoked partition {partition} which we had no consumer task for"), + } } - _ = &mut rx => { - break Ok(()); + + match consumer.commit_consumer_state(CommitMode::Async) { + Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => { + // Success + } + Err(error) => warn!("Failed to commit the current consumer state: {error}"), } } - }; - for task_id in topic_partition_tasks.into_values() { - TaskCenter::cancel_task(task_id); + Rebalance::Error(_) => {} } - result + } +} + +struct AbortOnDrop(TaskHandle<()>); + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + self.0.abort(); } } async fn topic_partition_queue_consumption_loop( sender: MessageSender, - topic: String, - partition: i32, - topic_partition_consumer: StreamPartitionQueue, + partition: TopicPartition, + topic_partition_consumer: StreamPartitionQueue, consumer: Arc, consumer_group_id: String, -) -> Result<(), anyhow::Error> { - let mut shutdown = std::pin::pin!(cancellation_watcher()); - - loop { - tokio::select! { - res = topic_partition_consumer.recv() => { - let msg = res?; - let offset = msg.offset(); - sender.send(&consumer_group_id, msg).await?; - consumer.store_offset(&topic, partition, offset)?; - } - _ = &mut shutdown => { - return Ok(()) - } + failed: mpsc::UnboundedSender, +) { + debug!( + restate.subscription.id = %sender.subscription.id(), + messaging.consumer.group.name = consumer_group_id, + "Starting topic '{topic}' partition '{partition}' consumption loop from offset '{offset}'" + ); + // this future will be aborted when the partition is no longer needed, so any exit is a failure + let err = loop { + let res = topic_partition_consumer.recv().await; + let msg = match res { + Ok(msg) => msg, + Err(err) => break err.into(), + }; + let offset = msg.offset(); + if let Err(err) = sender.send(&consumer_group_id, msg).await { + break err; } - } + if let Err(err) = consumer.store_offset(&partition.0, partition.1, offset) { + break err.into(); + } + }; + + _ = failed.send(err); } diff --git a/crates/ingress-kafka/src/subscription_controller.rs b/crates/ingress-kafka/src/subscription_controller.rs index 6d51ae1c4..52ce61631 100644 --- a/crates/ingress-kafka/src/subscription_controller.rs +++ b/crates/ingress-kafka/src/subscription_controller.rs @@ -254,32 +254,41 @@ mod task_orchestrator { &mut self, result: Result<(task::Id, Result<(), consumer_task::Error>), JoinError>, ) { - match result { - Ok((id, Ok(_))) => { - warn!("Consumer unexpectedly closed"); - self.start_retry_timer(id); - } - Ok((id, Err(e))) => { - warn!("Consumer unexpectedly closed with reason: {e}"); - self.start_retry_timer(id); - } - Err(e) => { - warn!("Consumer unexpectedly panicked with reason: {e}"); - self.start_retry_timer(e.id()); - } + let task_id = match result { + Ok((id, _)) => id, + Err(ref err) => err.id(), }; - } - fn start_retry_timer(&mut self, task_id: task::Id) { let subscription_id = if let Some(subscription_id) = self.running_tasks_to_subscriptions.remove(&task_id) { subscription_id } else { - // No need to do anything, as it's a correct closure + match result { + Ok((_, Ok(_))) => {} // the normal case; a removed subscription should exit cleanly + Ok((_, Err(e))) => { + warn!("Consumer task for removed subscription unexpectedly returned error: {e}"); + } + Err(e) => { + warn!("Consumer task for removed subscription unexpectedly panicked: {e}"); + } + } + // no need to retry a subscription we don't care about any more return; }; + match result { + Ok((_, Ok(_))) => { + warn!("Consumer task for subscription {subscription_id} unexpectedly closed"); + } + Ok((_, Err(e))) => { + warn!("Consumer task for subscription {subscription_id} unexpectedly returned error: {e}"); + } + Err(e) => { + warn!("Consumer task for subscription {subscription_id} unexpectedly panicked: {e}"); + } + }; + let task_state = self .subscription_id_to_task_state .get_mut(&subscription_id) @@ -289,7 +298,7 @@ mod task_orchestrator { self.timer_queue .sleep_until(SystemTime::now() + next_timer, subscription_id); } else { - warn!("Not going to retry because retry limit exhausted."); + warn!("Not going to retry consumer task for subscription {subscription_id} because retry limit exhausted."); self.subscription_id_to_task_state.remove(&subscription_id); } }