diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index 764aa0bca4cd..1067d74bcfa2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; @@ -52,6 +53,8 @@ public Coder getValueCoder() { private final Coder keyCoder; private final Coder valueCoder; + private static final MapCoder CONTEXT_CODER = + MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); private KvCoder(Coder keyCoder, Coder valueCoder) { this.keyCoder = keyCoder; @@ -69,6 +72,7 @@ public void encode(KV kv, OutputStream outStream, Context context) if (kv == null) { throw new CoderException("cannot encode a null KV"); } + CONTEXT_CODER.encode(kv.getW3cTraceContext(), outStream); keyCoder.encode(kv.getKey(), outStream); valueCoder.encode(kv.getValue(), outStream, context); } @@ -80,9 +84,12 @@ public KV decode(InputStream inStream) throws IOException, CoderException @Override public KV decode(InputStream inStream, Context context) throws IOException, CoderException { + Map traceContext = CONTEXT_CODER.decode(inStream); K key = keyCoder.decode(inStream); V value = valueCoder.decode(inStream, context); - return KV.of(key, value); + KV newKv = KV.of(key, value); + newKv.getW3cTraceContext().putAll(traceContext); + return newKv; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java index 0c095ade1555..992bbc521e4f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/KV.java @@ -18,8 +18,12 @@ package org.apache.beam.sdk.values; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; @@ -38,7 +42,7 @@ * @param the type of the key * @param the type of the value */ -public class KV implements Serializable { +public class KV implements Serializable, Traceable { /** Returns a {@link KV} with the given key and value. */ public static KV of(K key, V value) { return new KV<>(key, value); @@ -56,10 +60,22 @@ public V getValue() { return value; } + @Override + public Map getW3cTraceContext() { + return w3cTraceContext; + } + + @Override + public List> getW3cLinkedContext() { + return w3cLinkedContext; + } + ///////////////////////////////////////////////////////////////////////////// final K key; final V value; + final Map w3cTraceContext = new HashMap<>(); + final List> w3cLinkedContext = new ArrayList<>(); private KV(K key, V value) { this.key = key; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Traceable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Traceable.java new file mode 100644 index 000000000000..fe6acb7f84f3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Traceable.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import java.util.List; +import java.util.Map; + +public interface Traceable { + + Map getW3cTraceContext(); + + List> getW3cLinkedContext(); +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java index 436f4c1ae910..5d19775b11f1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java @@ -105,7 +105,7 @@ public void testCoderIsSerializableWithWellKnownCoderType() throws Exception { * org.apache.beam.sdk.coders.PrintBase64Encodings}. */ private static final List TEST_ENCODINGS = - Arrays.asList("AP____8P", "BWhlbGxvAA", "B2dvb2RieWX_____Bw"); + Arrays.asList("AAAAAAD_____Dw", "AAAAAAVoZWxsbwA", "AAAAAAdnb29kYnll_____wc"); @Test public void testWireFormatEncode() throws Exception {