Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use std::collections::{BTreeMap, BTreeSet};
use lazy_static::lazy_static;
use postgres_types::{Kind, Type};

use dataflow_types::logging::{DifferentialLog, LogVariant, MaterializedLog, TimelyLog};
use dataflow_types::logging::{
DifferentialLog, LogVariant, MaterializedLog, RDKafkaLog, TimelyLog,
};
use expr::GlobalId;
use repr::{RelationDesc, ScalarType};

Expand Down Expand Up @@ -549,6 +551,14 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog {
index_id: GlobalId::System(3029),
};

pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog {
name: "mz_kafka_consumer_statistics",
schema: MZ_CATALOG_SCHEMA,
variant: LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics),
id: GlobalId::System(3030),
index_id: GlobalId::System(3031),
};

lazy_static! {
pub static ref MZ_VIEW_KEYS: BuiltinTable = BuiltinTable {
name: "mz_view_keys",
Expand Down Expand Up @@ -1265,6 +1275,7 @@ lazy_static! {
Builtin::Log(&MZ_PEEK_DURATIONS),
Builtin::Log(&MZ_SOURCE_INFO),
Builtin::Log(&MZ_MESSAGE_COUNTS),
Builtin::Log(&MZ_KAFKA_CONSUMER_STATISTICS),
Builtin::Table(&MZ_VIEW_KEYS),
Builtin::Table(&MZ_VIEW_FOREIGN_KEYS),
Builtin::Table(&MZ_KAFKA_SINKS),
Expand Down
19 changes: 19 additions & 0 deletions src/dataflow-types/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub enum LogVariant {
Timely(TimelyLog),
Differential(DifferentialLog),
Materialized(MaterializedLog),
RDKafkaLog(RDKafkaLog),
}

#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -56,6 +57,10 @@ pub enum MaterializedLog {
PeekDuration,
SourceInfo,
}
#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub enum RDKafkaLog {
ConsumerStatistics,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably just wants to a a KafkaConsumerStatistics variant in the existing Materialized enum! The existing separation for timely and differential logs is because those logs integrate with messages that are already integrated with the timely logging framework. As far as the timely logger is concerned, events generated by librdkafka look identical to events generated by other parts of Materialize, so I think you'll save yourself some trouble if you just combine the two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I'm not sure if you saw the thread on Slack yesterday -- I originally tried adding this to the Materialized enum for the reasons you stated above. Unfortunately, rdkafka expects the GlueConsumerContext to be Send + Sync, prohibiting the ability to pass the Logger object into the callback's context. Frank advised that I write a new MPSC Logger , as I'll need to implement one that is both Send + Sync safe. By putting this into it's own enum, we won't need to modify the Materialized Logger for messages that it will never see.

Do you have thoughts on this? I'd like to keep the implementation simple and coherent with the rest of the codebase. If there's a way to use the Materialized Logger in the GlueConsumerContext, I'd be very happy to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hm, I see. I think I understand what Frank is proposing, and it seems nice and general purpose, but it seems like it will require trudging through quite a bit of pain to implement. Personally I'd do something like this:

diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs
index cc567dd71..61419acc1 100644
--- a/src/dataflow/src/source/kafka.rs
+++ b/src/dataflow/src/source/kafka.rs
@@ -62,6 +62,8 @@ pub struct KafkaSourceInfo {
     cached_files: Vec<PathBuf>,
     /// Timely worker logger for source events
     logger: Option<Logger>,
+    /// Receiver for librdkafka statistics events.
+    stats_rx: crossbeam_channel::Receiver<Statistics>,
 }
 
 impl SourceConstructor<Vec<u8>> for KafkaSourceInfo {
@@ -157,6 +159,12 @@ impl SourceInfo<Vec<u8>> for KafkaSourceInfo {
             }
         }
 
+        if let Some(logger) = self.logger {
+            while let Ok(statistics) = self.stats_rx.try_recv() {
+                logger.log(MaterializedEvent::KafkaConsumerStatistics { /* whatever */});
+            }
+        }
+
         let mut next_message = NextMessage::Pending;
         let consumer_count = self.get_partition_consumers_count();
         let mut attempts = 0;
@@ -329,8 +337,12 @@ impl KafkaSourceInfo {
             cluster_id,
             &config_options,
         );
+        let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
         let consumer: BaseConsumer<GlueConsumerContext> = kafka_config
-            .create_with_context(GlueConsumerContext(consumer_activator))
+            .create_with_context(GlueConsumerContext {
+                activator: consumer_activator,
+                stats_tx,
+            })
             .expect("Failed to create Kafka Consumer");
         let cached_files = kc
             .cached_files
@@ -621,17 +633,21 @@ impl PartitionConsumer {
 
 /// An implementation of [`ConsumerContext`] that unparks the wrapped thread
 /// when the message queue switches from nonempty to empty.
-struct GlueConsumerContext(SyncActivator);
+struct GlueConsumerContext {
+    activator: SyncActivator,
+    stats_tx: crossbeam_channel::Sender<Statistics>,
+}
 
 impl ClientContext for GlueConsumerContext {
     fn stats(&self, statistics: Statistics) {
-        info!("Client stats: {:#?}", statistics);
+        self.stats_tx.send(statistics).expect("timely operator hung up while Kafka source active");
+        self.activate();
     }
 }
 
 impl GlueConsumerContext {
     fn activate(&self) {
-        self.0
+        self.activator
             .activate()
             .expect("timely operator hung up while Kafka source active");
     }

Possibly that's a terrible idea for some reason, but seems easy enough just to try out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium term we can also make rust-rdkafka provide access to the stats events on a thread of your choice, which would also sidestep this issue nicely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does seem much easier than the route that I was going down -- thank you! I'll give this a shot first thing tomorrow!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You bet! Probably worth pinging Frank or Eli or someone else on integrations to sanity check this. Maybe there's some reason this is crazy that I'm not seeing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind linking the slack thread, @cirego ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in the Rust channel. Here's the link to Frank's comment about creating an Arc<EventLinkArc> class: https://materializeinc.slack.com/archives/CMH6PG4CW/p1615938814067100


impl LogVariant {
/// By which columns should the logs be indexed.
Expand Down Expand Up @@ -174,6 +179,16 @@ impl LogVariant {
.with_column("duration_ns", ScalarType::Int64.nullable(false))
.with_column("count", ScalarType::Int64.nullable(false))
.with_key(vec![0, 1]),

LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics) => RelationDesc::empty()
.with_column("source_name", ScalarType::String.nullable(false))
.with_column("source_id", ScalarType::String.nullable(false))
.with_column("consumer_name", ScalarType::String.nullable(false))
.with_column("rx", ScalarType::Int64.nullable(false))
.with_column("rx_bytes", ScalarType::Int64.nullable(false))
.with_column("tx", ScalarType::Int64.nullable(false))
.with_column("tx_bytes", ScalarType::Int64.nullable(false))
.with_key(vec![0, 1, 2]),
}
}

Expand Down Expand Up @@ -222,6 +237,10 @@ impl LogVariant {
LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![],
LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![],
LogVariant::RDKafkaLog(RDKafkaLog::ConsumerStatistics) => vec![(
LogVariant::Materialized(MaterializedLog::SourceInfo),
vec![(0, 0), (1, 1)],
)],
}
}
}
6 changes: 5 additions & 1 deletion src/dataflow/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,11 @@ struct GlueConsumerContext(SyncActivator);

impl ClientContext for GlueConsumerContext {
fn stats(&self, statistics: Statistics) {
info!("Client stats: {:#?}", statistics);
info!(
"Client stats: name={}, tx={}, tx_bytes={}, rx={}, rx_bytes={}",
statistics.name, statistics.tx, statistics.tx_bytes, statistics.rx, statistics.rx_bytes,
);
// info!("Client stats: {:#?}", statistics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this can be removed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch. Thank you!

}
}

Expand Down
4 changes: 3 additions & 1 deletion test/testdrive/catalog.td
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ mz_arrangement_sizes
mz_dataflow_channels
mz_dataflow_operator_addresses
mz_dataflow_operators
mz_kafka_consumer_statistics
mz_materialization_dependencies
mz_materializations
mz_message_counts
Expand All @@ -320,6 +321,7 @@ mz_arrangement_sizes system true
mz_dataflow_channels system true
mz_dataflow_operator_addresses system true
mz_dataflow_operators system true
mz_kafka_consumer_statistics system true
mz_materialization_dependencies system true
mz_materializations system true
mz_message_counts system true
Expand Down Expand Up @@ -388,7 +390,7 @@ SHOW EXTENDED TABLES not yet supported

# There is one entry in mz_indexes for each field_number/expression of the index.
> SELECT COUNT(id) FROM mz_indexes WHERE id LIKE 's%'
36
37

> SHOW VIEWS FROM mz_catalog
mz_addresses_with_unit_length
Expand Down