Skip to content

Commit

Permalink
KAFKA-16863 : Deprecate default exception handlers (#17005)
Browse files Browse the repository at this point in the history
Implements KIP-1056:
 - deprecates default.deserialization.exception.handler in favor of deserialization.exception.handler
 - deprecates default.production.exception.handler in favor of production.exception.handler

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
muralibasani authored Sep 8, 2024
1 parent be3ab8b commit 72e16cb
Show file tree
Hide file tree
Showing 16 changed files with 243 additions and 49 deletions.
60 changes: 36 additions & 24 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
<ul>
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
<li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
Expand All @@ -83,6 +84,7 @@
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-exception-handler" id="id41">processing.exception.handler</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">production.exception.handler</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-non-overlap-cost" id="id37">rack.aware.assignment.non_overlap_cost</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-strategy" id="id35">rack.aware.assignment.strategy</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-tags" id="id34">rack.aware.assignment.tags</a></li>
Expand Down Expand Up @@ -281,7 +283,7 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
<td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td>
<td>30000 milliseconds (30 seconds)</td>
</tr>
<tr class="row-odd"><td>default.deserialization.exception.handler</td>
<tr class="row-odd"><td>default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.)</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
Expand All @@ -292,7 +294,7 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
set by the user or all serdes must be passed in explicitly (see also default.value.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>default.production.exception.handler</td>
<tr class="row-odd"><td>default.production.exception.handler (Deprecated. Use production.exception.handler instead.)</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
Expand All @@ -316,7 +318,12 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
</td>
<td><code>ROCKS_DB</code></td>
</tr>
<tr class="row-odd"><td>dsl.store.suppliers.class</td>
<tr class="row-odd"><td>deserialization.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>dsl.store.suppliers.class</td>
<td>Low</td>
<td colspan="2">
Defines a default state store implementation to be used by any stateful DSL operator
Expand All @@ -325,12 +332,12 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
</td>
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
</tr>
<tr class="row-even"><td>log.summary.interval.ms</td>
<tr class="row-odd"><td>log.summary.interval.ms</td>
<td>Low</td>
<td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td>
<td>120000 milliseconds (2 minutes)</td>
</tr>
<tr class="row-odd"><td>max.task.idle.ms</td>
<tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td>
<td colspan="2">
<p>
Expand All @@ -349,58 +356,63 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
</td>
<td>0 milliseconds</td>
</tr>
<tr class="row-even"><td>max.warmup.replicas</td>
<tr class="row-odd"><td>max.warmup.replicas</td>
<td>Medium</td>
<td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-odd"><td>metric.reporters</td>
<tr class="row-even"><td>metric.reporters</td>
<td>Low</td>
<td colspan="2">A list of classes to use as metrics reporters.</td>
<td>the empty list</td>
</tr>
<tr class="row-even"><td>metrics.num.samples</td>
<tr class="row-odd"><td>metrics.num.samples</td>
<td>Low</td>
<td colspan="2">The number of samples maintained to compute metrics.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-odd"><td>metrics.recording.level</td>
<tr class="row-even"><td>metrics.recording.level</td>
<td>Low</td>
<td colspan="2">The highest recording level for metrics.</td>
<td><code class="docutils literal"><span class="pre">INFO</span></code></td>
</tr>
<tr class="row-even"><td>metrics.sample.window.ms</td>
<tr class="row-odd"><td>metrics.sample.window.ms</td>
<td>Low</td>
<td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td>
<td>30000 milliseconds (30 seconds)</td>
</tr>
<tr class="row-odd"><td>num.standby.replicas</td>
<tr class="row-even"><td>num.standby.replicas</td>
<td>High</td>
<td colspan="2">The number of standby replicas for each task.</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td>
</tr>
<tr class="row-even"><td>num.stream.threads</td>
<tr class="row-odd"><td>num.stream.threads</td>
<td>Medium</td>
<td colspan="2">The number of threads to execute stream processing.</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
</tr>
<tr class="row-odd"><td>probing.rebalance.interval.ms</td>
<tr class="row-even"><td>probing.rebalance.interval.ms</td>
<td>Low</td>
<td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<td>600000 milliseconds (10 minutes)</td>
</tr>
<tr class="row-even"><td>processing.exception.handler</td>
<tr class="row-odd"><td>processing.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProcessingExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndFailProcessingExceptionHandler</span></code></td>
</tr>
<tr class="row-odd"><td>processing.guarantee</td>
<tr class="row-even"><td>processing.guarantee</td>
<td>Medium</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default)
or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Deprecated config options are
<code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>.
<td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td>
</tr>
<tr class="row-odd"><td>production.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>poll.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
Expand Down Expand Up @@ -488,10 +500,10 @@ <h4><a class="toc-backref" href="#id27">acceptable.recovery.lag</a><a class="hea
</div>
</blockquote>
</div>
<div class="section" id="default-deserialization-exception-handler">
<span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a class="headerlink" href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4>
<div class="section" id="deserialization-exception-handler">
<span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">deserialization.exception.handler (deprecated: default.deserialization.exception.handler)</a><a class="headerlink" href="#deserialization-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
<div><p>The deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
Expand Down Expand Up @@ -540,10 +552,10 @@ <h4><a class="toc-backref" href="#id27">acceptable.recovery.lag</a><a class="hea

</div></blockquote>
</div>
<div class="section" id="default-production-exception-handler">
<span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">default.production.exception.handler</a><a class="headerlink" href="#default-production-exception-handler" title="Permalink to this headline"></a></h4>
<div class="section" id="production-exception-handler">
<span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">production.exception.handler (deprecated: default.production.exception.handler)</a><a class="headerlink" href="#production-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker
<div><p>The production exception handler allows you to manage exceptions triggered when trying to interact with a broker
such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
that always fails when these exceptions occur.</p>

Expand Down Expand Up @@ -574,7 +586,7 @@ <h4><a class="toc-backref" href="#id27">acceptable.recovery.lag</a><a class="hea

// other various kafka streams settings, e.g. bootstrap servers, application id, etc

settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
settings.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);</code></pre></div>
</blockquote>
</div>
Expand Down
6 changes: 6 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><

<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>

<p>
In this release two configs <code>default.deserialization.exception.handler</code> and <code>default.production.exception.handler</code> are deprecated, as they don't have any overwrites, which is described in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig">KIP-1056</a>
You can refer to new configs via <code>deserialization.exception.handler</code> and <code>production.exception.handler</code>.
</p>

<p>
In previous release, a new version of the Processor API was introduced and the old Processor API was
incrementally replaced and deprecated.
Expand Down
Loading

0 comments on commit 72e16cb

Please sign in to comment.