Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
<ul>
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated)</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated)</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
Expand All @@ -100,7 +100,6 @@
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
</ul>
</li>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
Expand Down Expand Up @@ -543,11 +542,16 @@ <h4><a class="toc-backref" href="#id45">num.standby.replicas</a><a class="header
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td>
</tr>
<tr class="row-odd"><td>window.size.ms</td>
<tr class="row-odd"><td>window.size.ms (Deprecated. See <a href="datatypes.html#window-serdes">Window Serdes</a> for alternatives.)</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>windowed.inner.class.serde (Deprecated. See <a href="datatypes.html#window-serdes">Window Serdes</a> for alternatives.)</td>
<td>Low</td>
<td colspan="2">Serde for the inner class of a windowed record. Must implement the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
</tbody>
</table>
<div class="section" id="acceptable-recovery-lag">
Expand Down Expand Up @@ -1224,18 +1228,6 @@ <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="heade
</p>
</div></blockquote>
</div>
<div class="section" id="windowed.inner.class.serde">
<h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="headerlink" href="#windowed.inner.class.serde" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface.
</p>
<p>
Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.
</p>
</div></blockquote>
</div>
<div class="section" id="upgrade-from">
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
<blockquote>
Expand Down
75 changes: 73 additions & 2 deletions docs/streams/developer-guide/datatypes.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
<ul>
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
<li><a class="reference internal" href="#json" id="id6">JSON</a></li>
<li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li>
<li><a class="reference internal" href="#window-serdes" id="id7">Window Serdes</a></li>
<li><a class="reference internal" href="#implementing-custom-serdes" id="id8">Implementing custom serdes</a></li>
</ul>
<li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit Serdes</a></li>
<li><a class="reference internal" href="#scala-dsl-serdes" id="id9">Kafka Streams DSL for Scala Implicit Serdes</a></li>
</ul>
<div class="section" id="configuring-serdes">
<h2>Configuring Serdes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
Expand Down Expand Up @@ -163,6 +164,76 @@ <h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></
<p>As shown in the example, you can use JSONSerdes inner classes <code class="docutils literal"><span class="pre">Serdes.serdeFrom(&lt;serializerInstance&gt;, &lt;deserializerInstance&gt;)</span></code> to construct JSON compatible serializers and deserializers.
</p>
</div>
<div class="section" id="window-serdes">
<h3>Window Serdes<a class="headerlink" href="#window-serdes" title="Permalink to this headline"></a></h3>
<p>Apache Kafka Streams includes serde implementations for windowed types in
its <code class="docutils literal"><span class="pre">kafka-streams</span></code> Maven artifact:</p>
<pre class="line-numbers"><code class="language-xml">&lt;dependency&gt;
&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
&lt;/dependency&gt;</code></pre>
<p>This artifact provides the following windowed serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/kstream">org.apache.kafka.streams.kstream</a>:</p>

<p><strong>Serdes:</strong></p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">WindowedSerdes.TimeWindowedSerde&lt;T&gt;</span></code></li>
<li><code class="docutils literal"><span class="pre">WindowedSerdes.SessionWindowedSerde&lt;T&gt;</span></code></li>
</ul>

<p><strong>Serializers:</strong></p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">TimeWindowedSerializer&lt;T&gt;</span></code></li>
<li><code class="docutils literal"><span class="pre">SessionWindowedSerializer&lt;T&gt;</span></code></li>
</ul>

<p><strong>Deserializers:</strong></p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">TimeWindowedDeserializer&lt;T&gt;</span></code></li>
<li><code class="docutils literal"><span class="pre">SessionWindowedDeserializer&lt;T&gt;</span></code></li>
</ul>
<h4>Usage in Code</h4>
<p>When using windowed serdes in your application code, you typically create instances via constructors or factory methods:</p>
<pre class="line-numbers"><code class="language-java">// Time windowed serde - using factory method
Serde&lt;Windowed&lt;String&gt;&gt; timeWindowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);

// Time windowed serde - using constructor
Serde&lt;Windowed&lt;String&gt;&gt; timeWindowedSerde2 =
new WindowedSerdes.TimeWindowedSerde&lt;&gt;(Serdes.String(), 500L);

// Session windowed serde - using factory method
Serde&lt;Windowed&lt;String&gt;&gt; sessionWindowedSerde =
WindowedSerdes.sessionWindowedSerdeFrom(String.class);

// Session windowed serde - using constructor
Serde&lt;Windowed&lt;String&gt;&gt; sessionWindowedSerde2 =
new WindowedSerdes.SessionWindowedSerde&lt;&gt;(Serdes.String());

// Using individual serializers/deserializers
TimeWindowedSerializer&lt;String&gt; serializer = new TimeWindowedSerializer&lt;&gt;(Serdes.String().serializer());
TimeWindowedDeserializer&lt;String&gt; deserializer = new TimeWindowedDeserializer&lt;&gt;(Serdes.String().deserializer(), 500L);</code></pre>

<h4>Usage in Command Line</h4>
<p>When using command-line tools (like <code>kafka-console-consumer</code>), you can configure windowed deserializers by passing the inner class and window size via configuration properties. The property names use a prefix pattern:</p>
<pre class="line-numbers"><code class="language-bash"># Time windowed deserializer configuration
--property print.key=true \
--property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer \
--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer \
--property key.deserializer.window.size.ms=500

# Session windowed deserializer configuration
--property print.key=true \
--property key.deserializer=org.apache.kafka.streams.kstream.SessionWindowedDeserializer \
--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer</code></pre>

<h4>Deprecated Configs</h4>
<p>The following <code>StreamsConfig</code> parameters are deprecated in favor of passing parameters directly to serializer/deserializer constructors:</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">StreamsConfig.WINDOWED_INNER_CLASS_SERDE</span></code> is deprecated in favor of <code class="docutils literal"><span class="pre">TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS</span></code> and <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS</span></code></li>
<li><code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_SIZE_MS_CONFIG</span></code> is deprecated in favor of <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG</span></code></li>
</ul>
</div>
<div class="section" id="implementing-custom-serdes">
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom Serdes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
<p>If you need to implement custom Serdes, your best starting point is to take a look at the source code references of
Expand Down