Skip to content

Commit

Permalink
KAFKA-18026: KIP-1112, document new config and update the Streams upg…
Browse files Browse the repository at this point in the history
…rade guide (#17906)

This PR covers all the docs for KIP-1112, including the new config and a note about the new APIs in the 4.0 section of the upgrade guide.

This also fixes/updates some unrelated parts of the upgrade guide that were out-of-date, such as the broker compatibility matrix

Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>, Matthias Sax <[email protected]>
  • Loading branch information
ableegoldman authored Dec 10, 2024
1 parent 408d0f6 commit d208abb
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 17 deletions.
53 changes: 39 additions & 14 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-exception-handler" id="id41">processing.exception.handler</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#processor-wrapper-class" id="id42">processor.wrapper.class</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">production.exception.handler</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-non-overlap-cost" id="id37">rack.aware.assignment.non_overlap_cost</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-strategy" id="id35">rack.aware.assignment.strategy</a></li>
Expand Down Expand Up @@ -408,76 +409,83 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
<code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>.
<td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td>
</tr>
<tr class="row-odd"><td>production.exception.handler</td>
<tr class="row-odd"><td>processor.wrapper.class</td>
<td>Medium</td>
<td colspan="2">A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface.
Must be passed in when creating the topology, and will not be applied unless passed in to the appropriate constructor as a TopologyConfig. You should
use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.</td>
</tr>
<tr class="row-even"><td>production.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>poll.ms</td>
<tr class="row-odd"><td>poll.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
<td>100 milliseconds</td>
</tr>
<tr class="row-odd"><td>rack.aware.assignment.tags</td>
<tr class="row-even"><td>rack.aware.assignment.tags</td>
<td>Medium</td>
<td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
clients with different tag values.</td>
<td>the empty list</td>
</tr>
<tr class="row-even"><td>replication.factor</td>
<tr class="row-odd"><td>replication.factor</td>
<td>Medium</td>
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
</tr>
<tr class="row-odd"><td>retry.backoff.ms</td>
<tr class="row-even"><td>retry.backoff.ms</td>
<td>Medium</td>
<td colspan="2">The amount of time in milliseconds, before a request is retried.</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-even"><td>rocksdb.config.setter</td>
<tr class="row-odd"><td>rocksdb.config.setter</td>
<td>Medium</td>
<td colspan="2">The RocksDB configuration.</td>
<td></td>
</tr>
<tr class="row-odd"><td>state.cleanup.delay.ms</td>
<tr class="row-even"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td>600000 milliseconds (10 minutes)</td>
</tr>
<tr class="row-even"><td>state.dir</td>
<tr class="row-odd"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr>
<tr class="row-odd"><td>task.assignor.class</td>
<tr class="row-even"><td>task.assignor.class</td>
<td>Medium</td>
<td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td>
<td>The high-availability task assignor.</td>
</tr>
<tr class="row-even"><td>task.timeout.ms</td>
<tr class="row-odd"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td>300000 milliseconds (5 minutes)</td>
</tr>
<tr class="row-odd"><td>topology.optimization</td>
<tr class="row-even"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>),
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td>
<td><code>NO_OPTIMIZATION</code></td>
</tr>
<tr class="row-even"><td>upgrade.from</td>
<tr class="row-odd"><td>upgrade.from</td>
<td>Medium</td>
<td colspan="2">The version you are upgrading from during a rolling upgrade.</td>
<td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
</tr>
<tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>86400000 milliseconds (1 day)</td>
</tr>
<tr class="row-even"><td>window.size.ms</td>
<tr class="row-odd"><td>window.size.ms</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
Expand Down Expand Up @@ -998,6 +1006,23 @@ <h4><a class="toc-backref" href="#id30">probing.rebalance.interval.ms</a><a clas
</dl>
</div></blockquote>
</div>
<div class="section" id="processor-wrapper-class">
<span id="streams-developer-guide-processor-wrapper-class"></span><h4><a class="toc-backref" href="#id42">processor.wrapper.class</a><a class="headerlink" href="#processor-wrapper-class" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface. This feature allows you to wrap any of the
processors in the compiled topology, including both custom processor implementations and those created by Streams for DSL operators. This can be useful for logging or tracing
implementations since it allows access to the otherwise-hidden processor context for DSL operators, and also allows for injecting additional debugging information to an entire
application topology with just a single config.
</p>
<p>
IMPORTANT: This MUST be passed in when creating the topology, and will not be applied unless passed in to the appropriate topology-building constructor. You should
use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.
</p>
</div></blockquote>
</div>
<div class="section" id="replication-factor">
<span id="replication-factor-parm"></span><h4><a class="toc-backref" href="#id13">replication.factor</a><a class="headerlink" href="#replication-factor" title="Permalink to this headline"></a></h4>
<blockquote>
Expand Down
43 changes: 40 additions & 3 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ <h1>Upgrade Guide and API Changes</h1>
<p>For a table that shows Streams API compatibility with Kafka broker versions, see <a href="#streams_api_broker_compat">Broker Compatibility</a>.</p>

<h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"></a><a href="#streams_notable_changes">Notable compatibility changes in past releases</a></h3>

<p>
Starting in version 4.0.0, Kafka Streams will only be compatible when running against brokers on version 2.1
or higher. Additionally, exactly-once semantics (EOS) will require brokers to be at least version 2.5.
</p>

<p>
Downgrading from 3.5.x or newer version to 3.4.x or older version needs special attention:
Since 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics.
Expand Down Expand Up @@ -155,6 +161,18 @@ <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API
<code>TransformerSupplier</code>, <code>ValueTransformer</code>, and <code>ValueTransformerSupplier</code>.
</p>

<p>
You can now configure your topology with a <code>ProcessorWrapper</code>, which allows you to access and optionally wrap/replace
any processor in the topology by injecting an alternative <code>ProcessorSupplier</code> in its place. This can be used to peek
records and access the processor context even for DSL operators, for example to implement a logging or tracing framework, or to
aid in testing or debugging scenarios. You must implement the <code>ProcessorWrapper</code> interface and then pass the class
or class name into the configs via the new <code>StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG</code> config. NOTE: this config is
applied during the topology building phase, and therefore will not take effect unless the config is passed in when creating
the StreamsBuilder (DSL) or Topology(PAPI) objects. You MUST use the StreamsBuilder/Topology constructor overload that
accepts a TopologyConfig parameter for the <code>StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG</code> to be picked up.
See <a href="https://cwiki.apache.org/confluence/x/TZCMEw">KIP-1112</a> for more details.
</p>

<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>

<p>
Expand Down Expand Up @@ -1634,45 +1652,64 @@ <h3 class="anchor-heading"><a id="streams_api_broker_compat" class="anchor-link"
<thead>
<tr>
<th></th>
<th colspan="3">Kafka Broker (columns)</th>
<th colspan="5">Kafka Broker (columns)</th>
</tr>
</thead>
<tbody>
<tr>
<td>Kafka Streams API (rows)</td>
<td>0.10.0.x</td>
<td>0.10.1.x and 0.10.2.x</td>
<td>0.11.0.x and<br>1.0.x and<br>1.1.x and<br>2.0.x and<br>2.1.x and<br>2.2.x and<br>2.3.x and<br>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x</td>
<td>0.11.0.x and<br>1.0.x and<br>1.1.x and<br>2.0.x</td>
<td>2.1.x and<br>2.2.x and<br>2.3.x and<br>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x</td>
<td>4.0.x</td>
</tr>
<tr>
<td>0.10.0.x</td>
<td>compatible</td>
<td>compatible</td>
<td>compatible</td>
<td>compatible</td>
<td></td>
</tr>
<tr>
<td>0.10.1.x and 0.10.2.x</td>
<td></td>
<td>compatible</td>
<td>compatible</td>
<td>compatible</td>
<td></td>
</tr>
<tr>
<td>0.11.0.x</td>
<td></td>
<td>compatible with exactly-once turned off<br>(requires broker version 0.11.0.x or higher)</td>
<td>compatible</td>
<td></td>
</tr>
<tr>
<td>1.0.x and<br>1.1.x and<br>2.0.x and<br>2.1.x and<br>2.2.0 and<br>2.2.0</td>
<td></td>
<td>compatible with exactly-once turned off<br>(requires broker version 0.11.0.x or higher);<br>requires message format 0.10 or higher;<br>message headers are not supported<br>(requires broker version 0.11.0.x or higher<br>with message format 0.11 or higher)</td>
<td>compatible; requires message format 0.10 or higher;<br>if message headers are used, message format 0.11<br>or higher required</td>
<td>compatible</td>
<td></td>
</tr>
<tr>
<td>2.2.1 and<br>2.3.x and<br>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x</td>
<td></td>
<td></td>
<td>compatible; requires message format 0.11 or higher;<br>enabling exactly-once v2 requires 2.4.x or higher</td>
<td>compatible; requires message format 0.11 or higher;<br>enabling exactly-once v2 requires 2.5.x or higher</td>
<td>compatible</td>
<td></td>
</tr>
<tr>
<td>4.0.x</td>
<td></td>
<td></td>
<td></td>
<td>compatible; enabling exactly-once v2 requires broker version 2.5.x or higher</td>
<td>compatible</td>
</tr>
</tbody>
</table>
Expand Down

0 comments on commit d208abb

Please sign in to comment.