Skip to content

Commit

Permalink
Rebase changes
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Dec 19, 2024
1 parent f5350d2 commit 51d0c63
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::fmt::{self, Display};
use std::fmt;
use std::sync::{Arc, OnceLock, Weak};

use base64::Engine;
Expand All @@ -29,7 +29,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::dispatcher::{DispatchKafkaEvent, KafkaIngressDispatcher, KafkaIngressEvent};
use crate::metric_definitions::KAFKA_INGRESS_REQUESTS;
use restate_core::{TaskCenter, TaskHandle, TaskKind};
use restate_core::{task_center, TaskCenter, TaskHandle, TaskKind};
use restate_types::invocation::{Header, SpanRelation};
use restate_types::message::MessageIndex;
use restate_types::schema::subscriptions::{
Expand Down Expand Up @@ -245,7 +245,7 @@ impl ConsumerTask {
let (failures_tx, failures_rx) = mpsc::unbounded_channel();

let rebalance_context = RebalanceContext {
task_center: self.task_center.clone(),
task_center_handle: TaskCenter::current(),
consumer: OnceLock::new(),
topic_partition_tasks: parking_lot::Mutex::new(HashMap::new()),
failures_tx,
Expand Down Expand Up @@ -311,14 +311,14 @@ impl<'a> From<TopicPartitionListElem<'a>> for TopicPartition {
}
}

impl Display for TopicPartition {
impl fmt::Display for TopicPartition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.0, self.1)
}
}

struct RebalanceContext {
task_center: TaskCenter,
task_center_handle: task_center::Handle,
consumer: OnceLock<Weak<MessageConsumer>>,
topic_partition_tasks: parking_lot::Mutex<HashMap<TopicPartition, AbortOnDrop>>,
failures_tx: mpsc::UnboundedSender<Error>,
Expand Down Expand Up @@ -375,10 +375,9 @@ impl ConsumerContext for RebalanceContext {
self.failures_tx.clone(),
);

if let Ok(task_handle) = self.task_center.spawn_unmanaged(
if let Ok(task_handle) = self.task_center_handle.spawn_unmanaged(
TaskKind::Ingress,
"kafka-partition-ingest",
None,
task,
) {
topic_partition_tasks.insert(partition, AbortOnDrop(task_handle));
Expand Down Expand Up @@ -431,7 +430,7 @@ impl Drop for AbortOnDrop {

async fn topic_partition_queue_consumption_loop(
sender: MessageSender,
partition: TopicPartition,
topic_partition: TopicPartition,
topic_partition_consumer: StreamPartitionQueue<impl ConsumerContext>,
consumer: Arc<MessageConsumer>,
consumer_group_id: String,
Expand All @@ -440,7 +439,9 @@ async fn topic_partition_queue_consumption_loop(
debug!(
restate.subscription.id = %sender.subscription.id(),
messaging.consumer.group.name = consumer_group_id,
"Starting topic '{topic}' partition '{partition}' consumption loop from offset '{offset}'"
"Starting topic '{}' partition '{}' consumption loop",
topic_partition.0,
topic_partition.1
);
// this future will be aborted when the partition is no longer needed, so any exit is a failure
let err = loop {
Expand All @@ -453,7 +454,7 @@ async fn topic_partition_queue_consumption_loop(
if let Err(err) = sender.send(&consumer_group_id, msg).await {
break err;
}
if let Err(err) = consumer.store_offset(&partition.0, partition.1, offset) {
if let Err(err) = consumer.store_offset(&topic_partition.0, topic_partition.1, offset) {
break err.into();
}
};
Expand Down

0 comments on commit 51d0c63

Please sign in to comment.