Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
<ul>
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated)</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated)</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
Expand All @@ -100,7 +100,7 @@
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde (deprecated)</a></li>
</ul>
</li>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
Expand Down Expand Up @@ -543,7 +543,7 @@ <h4><a class="toc-backref" href="#id45">num.standby.replicas</a><a class="header
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td>
</tr>
<tr class="row-odd"><td>window.size.ms</td>
<tr class="row-odd"><td>window.size.ms (Deprecated. See <a href="datatypes.html#window-serdes">Window Serdes</a> for alternatives.)</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
Expand Down Expand Up @@ -1225,7 +1225,7 @@ <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="heade
</div></blockquote>
</div>
<div class="section" id="windowed.inner.class.serde">
<h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="headerlink" href="#windowed.inner.class.serde" title="Permalink to this headline"></a></h4>
<h4><a class="toc-backref" href="#id31">windowed.inner.class.serde (Deprecated)</a><a class="headerlink" href="#windowed.inner.class.serde" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Expand All @@ -1234,6 +1234,9 @@ <h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="
<p>
Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.
</p>
<p>
<strong>Note:</strong> This configuration is deprecated. Use <a href="datatypes.html#window-serdes">Window Serdes</a> directly in your topology instead.
</p>
</div></blockquote>
</div>
<div class="section" id="upgrade-from">
Expand Down
71 changes: 69 additions & 2 deletions docs/streams/developer-guide/datatypes.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
<ul>
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
<li><a class="reference internal" href="#json" id="id6">JSON</a></li>
<li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li>
<li><a class="reference internal" href="#window-serdes" id="id7">Window Serdes</a></li>
<li><a class="reference internal" href="#implementing-custom-serdes" id="id8">Implementing custom serdes</a></li>
</ul>
<li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit Serdes</a></li>
<li><a class="reference internal" href="#scala-dsl-serdes" id="id9">Kafka Streams DSL for Scala Implicit Serdes</a></li>
</ul>
<div class="section" id="configuring-serdes">
<h2>Configuring Serdes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
Expand Down Expand Up @@ -163,6 +164,72 @@ <h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></
<p>As shown in the example, you can use JSONSerdes inner classes <code class="docutils literal"><span class="pre">Serdes.serdeFrom(&lt;serializerInstance&gt;, &lt;deserializerInstance&gt;)</span></code> to construct JSON compatible serializers and deserializers.
</p>
</div>
<div class="section" id="window-serdes">
<h3>Window Serdes<a class="headerlink" href="#window-serdes" title="Permalink to this headline"></a></h3>
<p>Apache Kafka Streams includes serde implementations for windowed keys in
its <code class="docutils literal"><span class="pre">kafka-streams</span></code> Maven artifact:</p>
<pre class="line-numbers"><code class="language-xml">&lt;dependency&gt;
&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
&lt;/dependency&gt;</code></pre>
<p>This artifact provides the following windowed serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/kstream">org.apache.kafka.streams.kstream</a>:</p>
<table border="1" class="docutils">
<colgroup>
<col width="17%" />
<col width="83%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Data type</th>
<th class="head">Serde</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>Windowed&lt;T&gt; (Time Windows)</td>
<td><code class="docutils literal"><span class="pre">new WindowedSerdes.TimeWindowedSerde&lt;&gt;(innerSerde, windowSize)</span></code>, <code class="docutils literal"><span class="pre">WindowedSerdes.timeWindowedSerdeFrom(Class&lt;T&gt; type, long windowSize)</span></code></td>
</tr>
<tr class="row-odd"><td>Windowed&lt;T&gt; (Session Windows)</td>
<td><code class="docutils literal"><span class="pre">new WindowedSerdes.SessionWindowedSerde&lt;&gt;(innerSerde)</span></code>, <code class="docutils literal"><span class="pre">WindowedSerdes.sessionWindowedSerdeFrom(Class&lt;T&gt; type)</span></code></td>
</tr>
<tr class="row-even"><td>TimeWindowedSerializer&lt;T&gt;</td>
<td><code class="docutils literal"><span class="pre">new TimeWindowedSerializer&lt;&gt;(innerSerializer)</span></code></td>
</tr>
<tr class="row-odd"><td>TimeWindowedDeserializer&lt;T&gt;</td>
<td><code class="docutils literal"><span class="pre">new TimeWindowedDeserializer&lt;&gt;(innerDeserializer, windowSize)</span></code></td>
</tr>
<tr class="row-even"><td>SessionWindowedSerializer&lt;T&gt;</td>
<td><code class="docutils literal"><span class="pre">new SessionWindowedSerializer&lt;&gt;(innerSerializer)</span></code></td>
</tr>
<tr class="row-odd"><td>SessionWindowedDeserializer&lt;T&gt;</td>
<td><code class="docutils literal"><span class="pre">new SessionWindowedDeserializer&lt;&gt;(innerDeserializer)</span></code></td>
</tr>
</tbody>
</table>
<h4>Migration from Deprecated Configs</h4>
<p>The following stream configuration parameters have been deprecated. For details, see the <a href="config-streams.html#windowed.inner.class.serde">windowed.inner.class.serde</a> configuration documentation:</p>
<table border="1" class="docutils">
<colgroup>
<col width="17%" />
<col width="83%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd">
<th class="head">Deprecated Config</th>
<th class="head">Replacement</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even">
<td><code class="docutils literal"><span class="pre">windowed.inner.class.serde</span></code></td>
<td>Use <code class="docutils literal"><span class="pre">TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS</span></code>, <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS</span></code>, <code class="docutils literal"><span class="pre">SessionWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS</span></code>, or <code class="docutils literal"><span class="pre">SessionWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS</span></code></td>
</tr>
<tr class="row-odd">
<td><code class="docutils literal"><span class="pre">window.size.ms</span></code></td>
<td>Use <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG</span></code> or pass window size directly to constructor</td>
</tr>
</tbody>
</table>
</div>
<div class="section" id="implementing-custom-serdes">
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom Serdes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
<p>If you need to implement custom Serdes, your best starting point is to take a look at the source code references of
Expand Down