Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,14 @@ pub const MZ_MESSAGE_COUNTS: BuiltinLog = BuiltinLog {
index_id: GlobalId::System(3029),
};

pub const MZ_KAFKA_CONSUMER_STATISTICS: BuiltinLog = BuiltinLog {
name: "mz_kafka_consumer_statistics",
schema: MZ_CATALOG_SCHEMA,
variant: LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
id: GlobalId::System(3030),
index_id: GlobalId::System(3031),
};

lazy_static! {
pub static ref MZ_VIEW_KEYS: BuiltinTable = BuiltinTable {
name: "mz_view_keys",
Expand Down Expand Up @@ -1265,6 +1273,7 @@ lazy_static! {
Builtin::Log(&MZ_PEEK_DURATIONS),
Builtin::Log(&MZ_SOURCE_INFO),
Builtin::Log(&MZ_MESSAGE_COUNTS),
Builtin::Log(&MZ_KAFKA_CONSUMER_STATISTICS),
Builtin::Table(&MZ_VIEW_KEYS),
Builtin::Table(&MZ_VIEW_FOREIGN_KEYS),
Builtin::Table(&MZ_KAFKA_SINKS),
Expand Down
15 changes: 15 additions & 0 deletions src/dataflow-types/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum MaterializedLog {
DataflowCurrent,
DataflowDependency,
FrontierCurrent,
KafkaConsumerInfo,
PeekCurrent,
PeekDuration,
SourceInfo,
Expand Down Expand Up @@ -162,6 +163,16 @@ impl LogVariant {
.with_column("worker", ScalarType::Int64.nullable(false))
.with_column("time", ScalarType::Int64.nullable(false)),

LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => RelationDesc::empty()
.with_column("source_name", ScalarType::String.nullable(false))
.with_column("source_id", ScalarType::String.nullable(false))
.with_column("consumer_name", ScalarType::String.nullable(false))
.with_column("rx", ScalarType::Int64.nullable(false))
.with_column("rx_bytes", ScalarType::Int64.nullable(false))
.with_column("tx", ScalarType::Int64.nullable(false))
.with_column("tx_bytes", ScalarType::Int64.nullable(false))
.with_key(vec![0, 1, 2]),

LogVariant::Materialized(MaterializedLog::PeekCurrent) => RelationDesc::empty()
.with_column("uuid", ScalarType::String.nullable(false))
.with_column("worker", ScalarType::Int64.nullable(false))
Expand Down Expand Up @@ -219,6 +230,10 @@ impl LogVariant {
LogVariant::Materialized(MaterializedLog::DataflowCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::DataflowDependency) => vec![],
LogVariant::Materialized(MaterializedLog::FrontierCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo) => vec![(
LogVariant::Materialized(MaterializedLog::SourceInfo),
vec![(0, 0), (1, 1)],
)],
LogVariant::Materialized(MaterializedLog::PeekCurrent) => vec![],
LogVariant::Materialized(MaterializedLog::SourceInfo) => vec![],
LogVariant::Materialized(MaterializedLog::PeekDuration) => vec![],
Expand Down
91 changes: 73 additions & 18 deletions src/dataflow/src/logging/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

use std::time::Duration;

use differential_dataflow::{difference::DiffPair, operators::count::CountTotal};
use differential_dataflow::difference::{DiffPair, DiffVector};
use differential_dataflow::operators::count::CountTotal;
use log::error;
use timely::communication::Allocate;
use timely::dataflow::operators::capture::EventLink;
Expand All @@ -38,6 +39,23 @@ pub enum MaterializedEvent {
/// Globally unique identifier for the source on which the dataflow depends.
source: GlobalId,
},
/// Tracks the source name, id, partition id, and received/ingested offsets
KafkaConsumerInfo {
/// Materialize name of the source
source_name: String,
/// Materialize source identifier
source_id: SourceInstanceId,
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will generate a string that has a global_id and instance_id inline as e.g. u0/1 where the global id tracks the source, and the instance ID tracks the per-worker/per-instantiation key.

I think it'd be better to split these out into two columns, because then the source_id here would be directly comparable to the GlobalIDs in other MaterializedEvent variants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that but didn't see any easy way to split this out. Let me look again!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the fields are pub, I think just renaming this to source: GlobalId, source_instance: u64 should do it.

Copy link
Contributor Author

@cirego cirego Mar 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I missed that SourceInfo does &id.source_id.to_string(). Updated kafka_consumer_info to do the same.

I also added a test case to verify the join behavior.

/// Kafka name for the consumer
consumer_name: String,
/// Number of message sets received from Brokers
rx: i64,
/// Number of bytes received from Brokers
rx_bytes: i64,
/// Number of message sets sent to Brokers
tx: i64,
/// Number of bytes transmitted to Brokers
tx_bytes: i64,
},
/// Peek command, true for install and false for retire.
Peek(Peek, bool),
/// Tracks the source name, id, partition id, and received/ingested offsets
Expand Down Expand Up @@ -103,9 +121,10 @@ pub fn construct<A: Allocate>(
let mut input = demux.new_input(&logs, Pipeline);
let (mut dataflow_out, dataflow) = demux.new_output();
let (mut dependency_out, dependency) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();
let (mut kafka_consumer_info_out, kafka_consumer_info) = demux.new_output();
let (mut peek_out, peek) = demux.new_output();
let (mut source_info_out, source_info) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();

let mut demux_buffer = Vec::new();
demux.build(move |_capability| {
Expand All @@ -114,18 +133,20 @@ pub fn construct<A: Allocate>(
move |_frontiers| {
let mut dataflow = dataflow_out.activate();
let mut dependency = dependency_out.activate();
let mut frontier = frontier_out.activate();
let mut kafka_consumer_info = kafka_consumer_info_out.activate();
let mut peek = peek_out.activate();
let mut source_info = source_info_out.activate();
let mut frontier = frontier_out.activate();

input.for_each(|time, data| {
data.swap(&mut demux_buffer);

let mut dataflow_session = dataflow.session(&time);
let mut dependency_session = dependency.session(&time);
let mut frontier_session = frontier.session(&time);
let mut kafka_consumer_info_session = kafka_consumer_info.session(&time);
let mut peek_session = peek.session(&time);
let mut source_info_session = source_info.session(&time);
let mut frontier_session = frontier.session(&time);

for (time, worker, datum) in demux_buffer.drain(..) {
let time_ns = time.as_nanos() as Timestamp;
Expand Down Expand Up @@ -176,6 +197,32 @@ pub fn construct<A: Allocate>(
),
}
}
MaterializedEvent::Frontier(name, logical, delta) => {
frontier_session.give((
row_packer.pack(&[
Datum::String(&name.to_string()),
Datum::Int64(worker as i64),
Datum::Int64(logical as i64),
]),
time_ms,
delta as isize,
));
}
MaterializedEvent::KafkaConsumerInfo {
source_name,
source_id,
consumer_name,
rx,
rx_bytes,
tx,
tx_bytes,
} => {
kafka_consumer_info_session.give((
(source_name, source_id, consumer_name),
time_ms,
DiffVector::new(vec![rx, rx_bytes, tx, tx_bytes]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, this would be more efficiently implemented as a bunch of DiffPair things. It's a bunch less clean, but compiles down to a 4-tuple, rather than an allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say a bunch of DiffPair things, do you mean something like DiffPair::new(DiffPair::new(rx, rx_bytes), DiffPair::new(tx, tx_bytes))?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me! Thanks for the tip!

));
}
MaterializedEvent::Peek(peek, is_install) => {
peek_session.give((peek, worker, is_install, time_ns))
}
Expand All @@ -192,17 +239,6 @@ pub fn construct<A: Allocate>(
DiffPair::new(offset, timestamp),
));
}
MaterializedEvent::Frontier(name, logical, delta) => {
frontier_session.give((
row_packer.pack(&[
Datum::String(&name.to_string()),
Datum::Int64(worker as i64),
Datum::Int64(logical as i64),
]),
time_ms,
delta as isize,
));
}
}
}
});
Expand Down Expand Up @@ -245,6 +281,24 @@ pub fn construct<A: Allocate>(
}
});

let frontier_current = frontier.as_collection();

use differential_dataflow::operators::Count;
let kafka_consumer_info_current = kafka_consumer_info.as_collection().count().map({
let mut row_packer = repr::RowPacker::new();
move |((source_name, source_id, consumer_name), diff_vector)| {
row_packer.pack(&[
Datum::String(&source_name),
Datum::String(&source_id.to_string()),
Datum::String(&consumer_name),
Datum::Int64(diff_vector[0]),
Datum::Int64(diff_vector[1]),
Datum::Int64(diff_vector[2]),
Datum::Int64(diff_vector[3]),
])
}
});

let peek_current = peek
.map(move |(name, worker, is_install, time_ns)| {
let time_ms = (time_ns / 1_000_000) as Timestamp;
Expand All @@ -265,7 +319,6 @@ pub fn construct<A: Allocate>(
}
});

use differential_dataflow::operators::Count;
let source_info_current = source_info.as_collection().count().map({
let mut row_packer = repr::RowPacker::new();
move |((name, id, pid), pair)| {
Expand All @@ -282,8 +335,6 @@ pub fn construct<A: Allocate>(
}
});

let frontier_current = frontier.as_collection();

// Duration statistics derive from the non-rounded event times.
let peek_duration = peek
.unary(
Expand Down Expand Up @@ -361,6 +412,10 @@ pub fn construct<A: Allocate>(
LogVariant::Materialized(MaterializedLog::FrontierCurrent),
frontier_current,
),
(
LogVariant::Materialized(MaterializedLog::KafkaConsumerInfo),
kafka_consumer_info_current,
),
(
LogVariant::Materialized(MaterializedLog::PeekCurrent),
peek_current,
Expand Down
90 changes: 85 additions & 5 deletions src/dataflow/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,22 @@ use repr::{CachedRecord, CachedRecordIter, Timestamp};
use tokio::sync::mpsc;
use uuid::Uuid;

use crate::logging::materialized::{Logger, MaterializedEvent};
use crate::server::CacheMessage;
use crate::source::cache::{RecordFileMetadata, WorkerCacheData};
use crate::source::{
ConsistencyInfo, NextMessage, PartitionMetrics, SourceConstructor, SourceInfo, SourceMessage,
};
use crate::{logging::materialized::Logger, server::CacheMessage};

/// Values recorded from the last rdkafka statistics callback, used to generate a
/// diff of values for logging
pub struct PreviousStats {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you #[derive(Default)] you don't have to explicitly zero init later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Looks like Option defines it's default as None as well, so this should work nicely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

consumer_name: Option<String>,
rx: i64,
rx_bytes: i64,
tx: i64,
tx_bytes: i64,
}

/// Contains all information necessary to ingest data from Kafka
pub struct KafkaSourceInfo {
Expand All @@ -62,6 +73,10 @@ pub struct KafkaSourceInfo {
cached_files: Vec<PathBuf>,
/// Timely worker logger for source events
logger: Option<Logger>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm I think this is fine but maybe a better interface would be to add a function called emit_logs or something like that to the SourceReader trait which then ConsistencyInfo or something else that holds onto the logger can actually log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not something that needs to happen in this pr though!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another nice thing about having a separate function is that we can then amortize the logs and only emit them once per operator execution instead of once per incoming message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this might be something to explore if we find ourselves adding more source types that want to log "source specific" things?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure yeah! as long as this seems to pass our load tests without a regression I'm fine with it :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! I'm running the avro upsert benchmark now. Previous testing didn't seem to show much of a delta with this logging enabled. Should have full results tomorrow.

/// Channel to receive Kafka statistics objects from the stats callback
stats_rx: crossbeam_channel::Receiver<Statistics>,
/// Memoized Statistics for a consumer
previous_stats: PreviousStats,
}

impl SourceConstructor<Vec<u8>> for KafkaSourceInfo {
Expand Down Expand Up @@ -157,6 +172,33 @@ impl SourceInfo<Vec<u8>> for KafkaSourceInfo {
}
}

// Read any statistics objects generated via the GlueConsumerContext::stats callback
while let Ok(statistics) = self.stats_rx.try_recv() {
if let Some(logger) = self.logger.as_mut() {
// If this is the first callback, initialize our consumer name
// so that we can later retract this when the source is dropped
match self.previous_stats.consumer_name {
None => self.previous_stats.consumer_name = Some(statistics.name.clone()),
_ => (),
}

logger.log(MaterializedEvent::KafkaConsumerInfo {
source_name: self.source_name.to_string(),
source_id: self.id,
consumer_name: statistics.name,
rx: statistics.rx - self.previous_stats.rx,
rx_bytes: statistics.rx_bytes - self.previous_stats.rx_bytes,
tx: statistics.tx - self.previous_stats.tx,
tx_bytes: statistics.tx_bytes - self.previous_stats.tx_bytes,
});

self.previous_stats.rx = statistics.rx;
self.previous_stats.rx_bytes = statistics.rx_bytes;
self.previous_stats.tx = statistics.tx;
self.previous_stats.tx_bytes = statistics.tx_bytes;
}
}

let mut next_message = NextMessage::Pending;
let consumer_count = self.get_partition_consumers_count();
let mut attempts = 0;
Expand Down Expand Up @@ -329,8 +371,12 @@ impl KafkaSourceInfo {
cluster_id,
&config_options,
);
let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
let consumer: BaseConsumer<GlueConsumerContext> = kafka_config
.create_with_context(GlueConsumerContext(consumer_activator))
.create_with_context(GlueConsumerContext {
activator: consumer_activator,
stats_tx: stats_tx,
})
.expect("Failed to create Kafka Consumer");
let cached_files = kc
.cached_files
Expand Down Expand Up @@ -385,6 +431,14 @@ impl KafkaSourceInfo {
worker_count,
cached_files,
logger,
stats_rx,
previous_stats: PreviousStats {
consumer_name: None,
rx: 0,
rx_bytes: 0,
tx: 0,
tx_bytes: 0,
},
}
}

Expand Down Expand Up @@ -493,6 +547,25 @@ impl KafkaSourceInfo {
}
}

impl Drop for KafkaSourceInfo {
fn drop(&mut self) {
// Retract any metrics logged for this source
if let Some(logger) = self.logger.as_mut() {
if let Some(consumer_name) = self.previous_stats.consumer_name.as_ref() {
logger.log(MaterializedEvent::KafkaConsumerInfo {
source_name: self.source_name.to_string(),
source_id: self.id,
consumer_name: consumer_name.to_string(),
rx: -self.previous_stats.rx,
rx_bytes: -self.previous_stats.rx_bytes,
tx: -self.previous_stats.tx,
tx_bytes: -self.previous_stats.tx_bytes,
});
}
}
}
}

/// Creates a Kafka config.
fn create_kafka_config(
name: &str,
Expand Down Expand Up @@ -621,17 +694,24 @@ impl PartitionConsumer {

/// An implementation of [`ConsumerContext`] that unparks the wrapped thread
/// when the message queue switches from nonempty to empty.
struct GlueConsumerContext(SyncActivator);
struct GlueConsumerContext {
activator: SyncActivator,
stats_tx: crossbeam_channel::Sender<Statistics>,
}

impl ClientContext for GlueConsumerContext {
fn stats(&self, statistics: Statistics) {
info!("Client stats: {:#?}", statistics);
self.stats_tx
.send(statistics)
.expect("timely operator hung up while Kafka source active");
self.activate();
// info!("Client stats: {:#?}", statistics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this can be removed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch. Thank you!

}
}

impl GlueConsumerContext {
fn activate(&self) {
self.0
self.activator
.activate()
.expect("timely operator hung up while Kafka source active");
}
Expand Down
Loading