Skip to content

Commit

Permalink
[FLINK-23817][docs] Add metric description to Kafka source.
Browse files Browse the repository at this point in the history
  • Loading branch information
Arvid Heise authored and AHeise committed Sep 1, 2021
1 parent e6d2ea7 commit c42ed7e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 61 deletions.
86 changes: 71 additions & 15 deletions docs/content/docs/connectors/datastream/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,26 +212,82 @@ Note that Kafka source does **NOT** rely on committed offsets for fault toleranc
is only for exposing the progress of consumer and consuming group for monitoring.

### Monitoring
Kafka source exposes metrics in Flink's metric group for monitoring and diagnosing.

Kafka source exposes the following metrics in the respective [scope]({{< ref "docs/ops/metrics" >}}/#scope).

#### Scope of Metric
All metrics of Kafka source reader are registered under group ```KafkaSourceReader```, which is a
child group of operator metric group. Metrics related to a specific topic partition will be registered
in the group ```KafkaSourceReader.topic.<topic_name>.partition.<partition_id>```.

For example, current consuming offset of topic "my-topic" and partition 1 will be reported in metric:
```<some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset``` ,
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 15%">Scope</th>
<th class="text-left" style="width: 18%">Metrics</th>
<th class="text-left" style="width: 18%">User Variables</th>
<th class="text-left" style="width: 39%">Description</th>
<th class="text-left" style="width: 10%">Type</th>
</tr>
</thead>
<tbody>
<tr>
<th rowspan="8">Operator</th>
<td>currentEmitEventTimeLag</td>
<td>n/a</td>
<td>The time span from the record event timestamp to the time the record is emitted by the source connector¹: <code>currentEmitEventTimeLag = EmitTime - EventTime.</code></td>
<td>Gauge</td>
</tr>
<tr>
<td>watermarkLag</td>
<td>n/a</td>
<td>The time span that the watermark lags behind the wall clock time: <code>watermarkLag = CurrentTime - Watermark</code></td>
<td>Gauge</td>
</tr>
<tr>
<td>sourceIdleTime</td>
<td>n/a</td>
<td>The time span that the source has not processed any record: <code>sourceIdleTime = CurrentTime - LastRecordProcessTime</code></td>
<td>Gauge</td>
</tr>
<tr>
<td>pendingRecords</td>
<td>n/a</td>
<td>The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition.</td>
<td>Gauge</td>
</tr>
<tr>
<td>KafkaSourceReader.commitsSucceeded</td>
<td>n/a</td>
<td>The total number of successful offset commits to Kafka, if offset committing is turned on and checkpointing is enabled.</td>
<td>Counter</td>
</tr>
<tr>
<td>KafkaSourceReader.commitsFailed</td>
<td>n/a</td>
<td>The total number of offset commit failures to Kafka, if offset committing is
turned on and checkpointing is enabled. Note that committing offsets back to Kafka
is only a means to expose consumer progress, so a commit failure does not affect
the integrity of Flink's checkpointed partition offsets.</td>
<td>Counter</td>
</tr>
<tr>
<td>KafkaSourceReader.committedOffsets</td>
<td>topic, partition</td>
<td>The last successfully committed offsets to Kafka, for each partition.
A particular partition's metric can be specified by topic name and partition id.</td>
<td>Gauge</td>
</tr>
<tr>
<td>KafkaSourceReader.currentOffsets</td>
<td>topic, partition</td>
<td>The consumer's current read offset, for each partition. A particular
partition's metric can be specified by topic name and partition id.</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

and number of successful commits will be reported in metric:
```<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded``` .

#### List of Metrics
¹ This metric is an instantaneous value recorded for the last processed record. This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency.

| Metric Name | Description | Scope |
|:----------------:|:-----------------------------------------------:|:-----------------:|
| currentOffset | Current consuming offset of the topic partition | TopicPartition |
| committedOffset | Committed offset of the topic partition | TopicPartition |
| commitsSucceeded | Number of successful commits | KafkaSourceReader |
| commitsFailed | Number of failed commits | KafkaSourceReader |

#### Kafka Consumer Metrics
All metrics of Kafka consumer are also registered under group ```KafkaSourceReader.KafkaConsumer```.
Expand Down
47 changes: 1 addition & 46 deletions docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1345,52 +1345,7 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
### Connectors

#### Kafka Connectors
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 15%">Scope</th>
<th class="text-left" style="width: 18%">Metrics</th>
<th class="text-left" style="width: 18%">User Variables</th>
<th class="text-left" style="width: 39%">Description</th>
<th class="text-left" style="width: 10%">Type</th>
</tr>
</thead>
<tbody>
<tr>
<th rowspan="1">Operator</th>
<td>commitsSucceeded</td>
<td>n/a</td>
<td>The total number of successful offset commits to Kafka, if offset committing is turned on and checkpointing is enabled.</td>
<td>Counter</td>
</tr>
<tr>
<th rowspan="1">Operator</th>
<td>commitsFailed</td>
<td>n/a</td>
<td>The total number of offset commit failures to Kafka, if offset committing is
turned on and checkpointing is enabled. Note that committing offsets back to Kafka
is only a means to expose consumer progress, so a commit failure does not affect
the integrity of Flink's checkpointed partition offsets.</td>
<td>Counter</td>
</tr>
<tr>
<th rowspan="1">Operator</th>
<td>committedOffsets</td>
<td>topic, partition</td>
<td>The last successfully committed offsets to Kafka, for each partition.
A particular partition's metric can be specified by topic name and partition id.</td>
<td>Gauge</td>
</tr>
<tr>
<th rowspan="1">Operator</th>
<td>currentOffsets</td>
<td>topic, partition</td>
<td>The consumer's current read offset, for each partition. A particular
partition's metric can be specified by topic name and partition id.</td>
<td>Gauge</td>
</tr>
</tbody>
</table>
Please refer to [Kafka monitoring]({{< ref "docs/connectors/datastream/kafka" >}}/#monitoring).

#### Kinesis Connectors
<table class="table table-bordered">
Expand Down

0 comments on commit c42ed7e

Please sign in to comment.