From 6260f091f3c81cd350ba2a237759aa750fcb1057 Mon Sep 17 00:00:00 2001 From: Eugene_Utkin Date: Tue, 5 May 2020 13:18:11 +0300 Subject: [PATCH 1/6] ISSUE-724 Add Jdk Data Codec --- .../services/transport/api/JdkCodec.java | 33 +++++++++++++++++ ...HeadersCodec.java => JdkHeadersCodec.java} | 2 +- .../transport/api/ServiceMessageCodec.java | 4 +- ...scalecube.services.transport.api.DataCodec | 1 + .../services/transport/api/JdkCodecTest.java | 37 +++++++++++++++++++ ...odecTest.java => JdkHeadersCodecTest.java} | 4 +- 6 files changed, 76 insertions(+), 5 deletions(-) create mode 100644 services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java rename services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/{DefaultHeadersCodec.java => JdkHeadersCodec.java} (97%) create mode 100644 services-bytebuf-codec/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec create mode 100644 services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java rename services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/{DefaultHeadersCodecTest.java => JdkHeadersCodecTest.java} (94%) diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java new file mode 100644 index 000000000..5d864206f --- /dev/null +++ b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java @@ -0,0 +1,33 @@ +package io.scalecube.services.transport.api; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Type; + +public class JdkCodec implements DataCodec { + + @Override + public String contentType() { + return "application/octet-stream"; + } + + @Override + public void encode(OutputStream stream, Object value) throws IOException { + try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { + oos.writeObject(value); + oos.flush(); + } + } + + @Override + public Object decode(InputStream stream, Type type) throws IOException { + try (ObjectInputStream is = new ObjectInputStream(stream)) { + return is.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } +} diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/DefaultHeadersCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java similarity index 97% rename from services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/DefaultHeadersCodec.java rename to services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java index c7782d338..0bcdf8e2f 100644 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/DefaultHeadersCodec.java +++ b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java @@ -11,7 +11,7 @@ import java.util.Map.Entry; /** Simple binary codec for headers service message. */ -public final class DefaultHeadersCodec implements HeadersCodec { +public final class JdkHeadersCodec implements HeadersCodec { /** * {@inheritDoc} diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java index be100d9a5..688a8bf81 100644 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java +++ b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java @@ -44,12 +44,12 @@ public ServiceMessageCodec() { *

Default HeadersCodec is DefaultHeadersCodec. This is lightweight binary codec written on * vanilla java. * - * @param headersCodec codec for message headers. Default, {@link DefaultHeadersCodec} + * @param headersCodec codec for message headers. Default, {@link JdkHeadersCodec} * @param dataCodecs codecs for message body. Codec will select by Message Content Type. */ public ServiceMessageCodec( @Nullable HeadersCodec headersCodec, @Nullable Collection dataCodecs) { - this.headersCodec = headersCodec == null ? new DefaultHeadersCodec() : headersCodec; + this.headersCodec = headersCodec == null ? new JdkHeadersCodec() : headersCodec; Map defaultCodecs = DataCodec.INSTANCES; if (dataCodecs == null) { this.dataCodecs = defaultCodecs; diff --git a/services-bytebuf-codec/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec b/services-bytebuf-codec/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec new file mode 100644 index 000000000..a8885001a --- /dev/null +++ b/services-bytebuf-codec/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec @@ -0,0 +1 @@ +io.scalecube.services.transport.api.JdkCodec \ No newline at end of file diff --git a/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java b/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java new file mode 100644 index 000000000..0b9477424 --- /dev/null +++ b/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java @@ -0,0 +1,37 @@ +package io.scalecube.services.transport.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +class JdkCodecTest { + + private final DataCodec codec = new JdkCodec(); + + @ParameterizedTest + @MethodSource("provider") + void test(Object body) throws IOException { + Object decoded = writeAndRead(body); + assertEquals(body, decoded); + } + + static Stream provider() { + return Stream.of("hello", Arrays.asList(1,2,3)); + } + + private Object writeAndRead(Object body) throws IOException { + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + codec.encode(os, body); + byte[] bytes = os.toByteArray(); + try (ByteArrayInputStream is = new ByteArrayInputStream(bytes)) { + return codec.decode(is, body.getClass()); + } + } + } +} \ No newline at end of file diff --git a/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/DefaultHeadersCodecTest.java b/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java similarity index 94% rename from services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/DefaultHeadersCodecTest.java rename to services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java index 00e07bd24..a2275be4a 100644 --- a/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/DefaultHeadersCodecTest.java +++ b/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java @@ -12,9 +12,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -class DefaultHeadersCodecTest { +class JdkHeadersCodecTest { - private HeadersCodec codec = new DefaultHeadersCodec(); + private HeadersCodec codec = new JdkHeadersCodec(); @ParameterizedTest @MethodSource("provider") From 352d38c1a12b4f139b8c1027431d5ed33c5f153f Mon Sep 17 00:00:00 2001 From: Eugene_Utkin Date: Sun, 10 May 2020 21:08:13 +0300 Subject: [PATCH 2/6] ISSUE-708 merge with develop --- .../services/transport/api/JdkCodec.java | 102 ++++++++++++++++++ ...scalecube.services.transport.api.DataCodec | 0 .../services/transport/api/JdkCodecTest.java | 0 .../transport/api/JdkHeadersCodecTest.java | 2 +- .../services/transport/api/JdkCodec.java | 33 ------ .../transport/api/ServiceMessageCodec.java | 4 +- 6 files changed, 106 insertions(+), 35 deletions(-) create mode 100644 services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java rename {services-bytebuf-codec => services-api}/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec (100%) rename {services-bytebuf-codec => services-api}/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java (100%) rename {services-bytebuf-codec => services-api}/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java (96%) delete mode 100644 services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java diff --git a/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java b/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java new file mode 100644 index 000000000..6f651557d --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java @@ -0,0 +1,102 @@ +package io.scalecube.services.transport.api; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +public class JdkCodec implements DataCodec, HeadersCodec { + + @Override + public String contentType() { + return "application/octet-stream"; + } + + /** + * Uses Jdk Object Serialization. + * + *

{@inheritDoc} + */ + @Override + public void encode(OutputStream stream, Object value) throws IOException { + try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { + oos.writeObject(value); + oos.flush(); + } + } + + /** + * Uses Jdk Object Serialization. + * + *

{@inheritDoc} + */ + @Override + public Object decode(InputStream stream, Type type) throws IOException { + try (ObjectInputStream is = new ObjectInputStream(stream)) { + return is.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } + + /** {@inheritDoc} */ + @Override + public void encode(OutputStream stream, Map headers) throws IOException { + if (headers.isEmpty()) { + return; + } + writeInt(stream, headers.size()); + for (Entry header : headers.entrySet()) { + byte[] nameBytes = header.getKey().getBytes(UTF_8); + writeInt(stream, nameBytes.length); + stream.write(nameBytes); + byte[] valueBytes = header.getValue().getBytes(UTF_8); + writeInt(stream, valueBytes.length); + stream.write(valueBytes); + } + } + + /** {@inheritDoc} */ + @Override + public Map decode(InputStream stream) throws IOException { + if (stream.available() < 1) { + return Collections.emptyMap(); + } + int size = readInt(stream); + Map headers = new HashMap<>(size); + for (int i = 0; i < size; i++) { + int nameLength = readInt(stream); + byte[] nameBytes = new byte[nameLength]; + stream.read(nameBytes); + String name = new String(nameBytes, UTF_8); + int valueLength = readInt(stream); + byte[] valueBytes = new byte[valueLength]; + stream.read(valueBytes); + String value = new String(valueBytes, UTF_8); + headers.put(name, value); + } + return headers; + } + + private void writeInt(OutputStream stream, int number) throws IOException { + for (int i = Integer.BYTES - 1; i >= 0; i--) { + stream.write(number >>> i * Byte.SIZE); + } + } + + private Integer readInt(InputStream stream) throws IOException { + int r = 0; + for (int i = Integer.BYTES - 1; i >= 0; i--) { + r = r | ((stream.read() & 0xFF) << i * Byte.SIZE); + } + return r; + } +} diff --git a/services-bytebuf-codec/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec b/services-api/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec similarity index 100% rename from services-bytebuf-codec/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec rename to services-api/src/main/resources/META-INF/services/io.scalecube.services.transport.api.DataCodec diff --git a/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java b/services-api/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java similarity index 100% rename from services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java rename to services-api/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java diff --git a/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java b/services-api/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java similarity index 96% rename from services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java rename to services-api/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java index a2275be4a..f0ab1f2ab 100644 --- a/services-bytebuf-codec/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java +++ b/services-api/src/test/java/io/scalecube/services/transport/api/JdkHeadersCodecTest.java @@ -14,7 +14,7 @@ class JdkHeadersCodecTest { - private HeadersCodec codec = new JdkHeadersCodec(); + private final HeadersCodec codec = new JdkCodec(); @ParameterizedTest @MethodSource("provider") diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java deleted file mode 100644 index 5d864206f..000000000 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkCodec.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.scalecube.services.transport.api; - -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.lang.reflect.Type; - -public class JdkCodec implements DataCodec { - - @Override - public String contentType() { - return "application/octet-stream"; - } - - @Override - public void encode(OutputStream stream, Object value) throws IOException { - try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { - oos.writeObject(value); - oos.flush(); - } - } - - @Override - public Object decode(InputStream stream, Type type) throws IOException { - try (ObjectInputStream is = new ObjectInputStream(stream)) { - return is.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException(e.getMessage(), e); - } - } -} diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java index 688a8bf81..057b972ce 100644 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java +++ b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java @@ -26,6 +26,8 @@ public final class ServiceMessageCodec { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceMessageCodec.class); + private static final HeadersCodec DEFAULT_HEADERS_CODEC = new JdkCodec(); + private final HeadersCodec headersCodec; private final Map dataCodecs; @@ -49,7 +51,7 @@ public ServiceMessageCodec() { */ public ServiceMessageCodec( @Nullable HeadersCodec headersCodec, @Nullable Collection dataCodecs) { - this.headersCodec = headersCodec == null ? new JdkHeadersCodec() : headersCodec; + this.headersCodec = headersCodec == null ? DEFAULT_HEADERS_CODEC : headersCodec; Map defaultCodecs = DataCodec.INSTANCES; if (dataCodecs == null) { this.dataCodecs = defaultCodecs; From 1518c67ebfd4c392870cd0fde6036e4b09c17434 Mon Sep 17 00:00:00 2001 From: Eugene_Utkin Date: Sun, 10 May 2020 22:28:04 +0300 Subject: [PATCH 3/6] ISSUE-708 merge with develop --- .../services/transport/api/JdkCodec.java | 34 ++++---- .../transport/api/JdkHeadersCodec.java | 80 ------------------- .../transport/api/ServiceMessageCodec.java | 3 +- 3 files changed, 22 insertions(+), 95 deletions(-) delete mode 100644 services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java diff --git a/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java b/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java index 6f651557d..4160ccf22 100644 --- a/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java +++ b/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java @@ -13,8 +13,10 @@ import java.util.Map; import java.util.Map.Entry; +/** Simple headers and data codec based on JDK only. */ public class JdkCodec implements DataCodec, HeadersCodec { + /** {@inheritDoc} */ @Override public String contentType() { return "application/octet-stream"; @@ -34,21 +36,9 @@ public void encode(OutputStream stream, Object value) throws IOException { } /** - * Uses Jdk Object Serialization. - * - *

{@inheritDoc} + * {@inheritDoc} */ @Override - public Object decode(InputStream stream, Type type) throws IOException { - try (ObjectInputStream is = new ObjectInputStream(stream)) { - return is.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException(e.getMessage(), e); - } - } - - /** {@inheritDoc} */ - @Override public void encode(OutputStream stream, Map headers) throws IOException { if (headers.isEmpty()) { return; @@ -64,7 +54,23 @@ public void encode(OutputStream stream, Map headers) throws IOEx } } - /** {@inheritDoc} */ + /** + * Uses Jdk Object Serialization. + * + *

{@inheritDoc} + */ + @Override + public Object decode(InputStream stream, Type type) throws IOException { + try (ObjectInputStream is = new ObjectInputStream(stream)) { + return is.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } + + /** + * {@inheritDoc} + */ @Override public Map decode(InputStream stream) throws IOException { if (stream.available() < 1) { diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java deleted file mode 100644 index 0bcdf8e2f..000000000 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.scalecube.services.transport.api; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -/** Simple binary codec for headers service message. */ -public final class JdkHeadersCodec implements HeadersCodec { - - /** - * {@inheritDoc} - */ - @Override - public String contentType() { - return "application/octet-stream"; - } - - /** - * {@inheritDoc} - */ - @Override - public void encode(OutputStream stream, Map headers) throws IOException { - if (headers.isEmpty()) { - return; - } - writeInt(stream, headers.size()); - for (Entry header : headers.entrySet()) { - byte[] nameBytes = header.getKey().getBytes(UTF_8); - writeInt(stream, nameBytes.length); - stream.write(nameBytes); - byte[] valueBytes = header.getValue().getBytes(UTF_8); - writeInt(stream, valueBytes.length); - stream.write(valueBytes); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Map decode(InputStream stream) throws IOException { - if (stream.available() < 1) { - return Collections.emptyMap(); - } - int size = readInt(stream); - Map headers = new HashMap<>(size); - for (int i = 0; i < size; i++) { - int nameLength = readInt(stream); - byte[] nameBytes = new byte[nameLength]; - stream.read(nameBytes); - String name = new String(nameBytes, UTF_8); - int valueLength = readInt(stream); - byte[] valueBytes = new byte[valueLength]; - stream.read(valueBytes); - String value = new String(valueBytes, UTF_8); - headers.put(name, value); - } - return headers; - } - - private void writeInt(OutputStream stream, int number) throws IOException { - for (int i = Integer.BYTES - 1; i >= 0; i--) { - stream.write(number >>> i * Byte.SIZE); - } - } - - private Integer readInt(InputStream stream) throws IOException { - int r = 0; - for (int i = Integer.BYTES - 1; i >= 0; i--) { - r = r | ((stream.read() & 0xFF) << i * Byte.SIZE); - } - return r; - } -} diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java index 057b972ce..ad0824d38 100644 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java +++ b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java @@ -31,6 +31,7 @@ public final class ServiceMessageCodec { private final HeadersCodec headersCodec; private final Map dataCodecs; + /** Message codec with default Headers/Data Codecs. */ public ServiceMessageCodec() { this(null, null); @@ -46,7 +47,7 @@ public ServiceMessageCodec() { *

Default HeadersCodec is DefaultHeadersCodec. This is lightweight binary codec written on * vanilla java. * - * @param headersCodec codec for message headers. Default, {@link JdkHeadersCodec} + * @param headersCodec codec for message headers. Default, {@link JdkCodec} * @param dataCodecs codecs for message body. Codec will select by Message Content Type. */ public ServiceMessageCodec( From 40a8052d7d28e00d39dc860fb852583416b09227 Mon Sep 17 00:00:00 2001 From: Eugene_Utkin Date: Sun, 10 May 2020 22:28:04 +0300 Subject: [PATCH 4/6] ISSUE-708 merge with develop --- .../services/transport/api/JdkCodec.java | 36 +++++---- .../transport/api/JdkHeadersCodec.java | 80 ------------------- .../transport/api/ServiceMessageCodec.java | 3 +- 3 files changed, 24 insertions(+), 95 deletions(-) delete mode 100644 services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java diff --git a/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java b/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java index 6f651557d..9d574a646 100644 --- a/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java +++ b/services-api/src/main/java/io/scalecube/services/transport/api/JdkCodec.java @@ -13,8 +13,12 @@ import java.util.Map; import java.util.Map.Entry; +/** Simple headers and data codec based on JDK only. */ public class JdkCodec implements DataCodec, HeadersCodec { + /** + * {@inheritDoc} + */ @Override public String contentType() { return "application/octet-stream"; @@ -34,21 +38,9 @@ public void encode(OutputStream stream, Object value) throws IOException { } /** - * Uses Jdk Object Serialization. - * - *

{@inheritDoc} + * {@inheritDoc} */ @Override - public Object decode(InputStream stream, Type type) throws IOException { - try (ObjectInputStream is = new ObjectInputStream(stream)) { - return is.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException(e.getMessage(), e); - } - } - - /** {@inheritDoc} */ - @Override public void encode(OutputStream stream, Map headers) throws IOException { if (headers.isEmpty()) { return; @@ -64,7 +56,23 @@ public void encode(OutputStream stream, Map headers) throws IOEx } } - /** {@inheritDoc} */ + /** + * Uses Jdk Object Serialization. + * + *

{@inheritDoc} + */ + @Override + public Object decode(InputStream stream, Type type) throws IOException { + try (ObjectInputStream is = new ObjectInputStream(stream)) { + return is.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e.getMessage(), e); + } + } + + /** + * {@inheritDoc} + */ @Override public Map decode(InputStream stream) throws IOException { if (stream.available() < 1) { diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java deleted file mode 100644 index 0bcdf8e2f..000000000 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/JdkHeadersCodec.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.scalecube.services.transport.api; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -/** Simple binary codec for headers service message. */ -public final class JdkHeadersCodec implements HeadersCodec { - - /** - * {@inheritDoc} - */ - @Override - public String contentType() { - return "application/octet-stream"; - } - - /** - * {@inheritDoc} - */ - @Override - public void encode(OutputStream stream, Map headers) throws IOException { - if (headers.isEmpty()) { - return; - } - writeInt(stream, headers.size()); - for (Entry header : headers.entrySet()) { - byte[] nameBytes = header.getKey().getBytes(UTF_8); - writeInt(stream, nameBytes.length); - stream.write(nameBytes); - byte[] valueBytes = header.getValue().getBytes(UTF_8); - writeInt(stream, valueBytes.length); - stream.write(valueBytes); - } - } - - /** - * {@inheritDoc} - */ - @Override - public Map decode(InputStream stream) throws IOException { - if (stream.available() < 1) { - return Collections.emptyMap(); - } - int size = readInt(stream); - Map headers = new HashMap<>(size); - for (int i = 0; i < size; i++) { - int nameLength = readInt(stream); - byte[] nameBytes = new byte[nameLength]; - stream.read(nameBytes); - String name = new String(nameBytes, UTF_8); - int valueLength = readInt(stream); - byte[] valueBytes = new byte[valueLength]; - stream.read(valueBytes); - String value = new String(valueBytes, UTF_8); - headers.put(name, value); - } - return headers; - } - - private void writeInt(OutputStream stream, int number) throws IOException { - for (int i = Integer.BYTES - 1; i >= 0; i--) { - stream.write(number >>> i * Byte.SIZE); - } - } - - private Integer readInt(InputStream stream) throws IOException { - int r = 0; - for (int i = Integer.BYTES - 1; i >= 0; i--) { - r = r | ((stream.read() & 0xFF) << i * Byte.SIZE); - } - return r; - } -} diff --git a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java index 057b972ce..ad0824d38 100644 --- a/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java +++ b/services-bytebuf-codec/src/main/java/io/scalecube/services/transport/api/ServiceMessageCodec.java @@ -31,6 +31,7 @@ public final class ServiceMessageCodec { private final HeadersCodec headersCodec; private final Map dataCodecs; + /** Message codec with default Headers/Data Codecs. */ public ServiceMessageCodec() { this(null, null); @@ -46,7 +47,7 @@ public ServiceMessageCodec() { *

Default HeadersCodec is DefaultHeadersCodec. This is lightweight binary codec written on * vanilla java. * - * @param headersCodec codec for message headers. Default, {@link JdkHeadersCodec} + * @param headersCodec codec for message headers. Default, {@link JdkCodec} * @param dataCodecs codecs for message body. Codec will select by Message Content Type. */ public ServiceMessageCodec( From dd96aab7e6269c644ac8c54d1636b6214c3531c7 Mon Sep 17 00:00:00 2001 From: Eugene_Utkin Date: Tue, 12 May 2020 16:48:27 +0300 Subject: [PATCH 5/6] ISSUE-708 merge with develop --- .../services/transport/api/JdkCodecTest.java | 37 ++++++++++++- .../services/examples/codecs/Example2.java | 53 +++++++++++++++++++ .../helloworld/service/api/Greeting.java | 4 +- 3 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java diff --git a/services-api/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java b/services-api/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java index 0b9477424..cf4307ace 100644 --- a/services-api/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java +++ b/services-api/src/test/java/io/scalecube/services/transport/api/JdkCodecTest.java @@ -5,7 +5,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; +import java.util.Objects; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -22,7 +24,7 @@ void test(Object body) throws IOException { } static Stream provider() { - return Stream.of("hello", Arrays.asList(1,2,3)); + return Stream.of("hello", Arrays.asList(1, 2, 3), new Greeting("joe")); } private Object writeAndRead(Object body) throws IOException { @@ -34,4 +36,35 @@ private Object writeAndRead(Object body) throws IOException { } } } -} \ No newline at end of file + + static class Greeting implements Serializable { + + private final String name; + + Greeting(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Greeting greeting = (Greeting) o; + return Objects.equals(name, greeting.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } + +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java new file mode 100644 index 000000000..4c75b004b --- /dev/null +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java @@ -0,0 +1,53 @@ +package io.scalecube.services.examples.codecs; + +import io.scalecube.net.Address; +import io.scalecube.services.Microservices; +import io.scalecube.services.discovery.ScalecubeServiceDiscovery; +import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; +import io.scalecube.services.examples.helloworld.service.api.GreetingsService; +import io.scalecube.services.transport.rsocket.RSocketServiceTransport; + +public class Example2 { + + public static final String CONTENT_TYPE = "application/octet-stream"; + + /** + * Start the example. + * + * @param args ignored + */ + public static void main(String[] args) { + // ScaleCube Node node with no members + Microservices seed = + Microservices.builder() + .discovery(ScalecubeServiceDiscovery::new) + .transport(RSocketServiceTransport::new) + .contentType(CONTENT_TYPE) // need to send with non-default data format + .startAwait(); + + final Address seedAddress = seed.discovery().address(); + + // Construct a ScaleCube node which joins the cluster hosting the Greeting Service + Microservices ms = + Microservices.builder() + .discovery( + endpoint -> + new ScalecubeServiceDiscovery(endpoint) + .membership(cfg -> cfg.seedMembers(seedAddress))) + .transport(RSocketServiceTransport::new) + .services(new GreetingServiceImpl()) + .startAwait(); + + // Create service proxy + GreetingsService service = seed.call().api(GreetingsService.class); + + // Execute the services and subscribe to service events + service + .sayHello("joe") + .subscribe(consumer -> System.out.println(consumer.message())); + + seed.onShutdown().block(); + ms.onShutdown().block(); + } + +} diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/service/api/Greeting.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/service/api/Greeting.java index 6ba8db8d5..f0fab4a58 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/service/api/Greeting.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/helloworld/service/api/Greeting.java @@ -1,6 +1,8 @@ package io.scalecube.services.examples.helloworld.service.api; -public class Greeting { +import java.io.Serializable; + +public class Greeting implements Serializable { String message; From 932ec85e53c759760debbab92d43197bf16cdeb3 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Wed, 13 May 2020 14:55:58 +0300 Subject: [PATCH 6/6] Minor --- .../java/io/scalecube/services/examples/codecs/Example2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java index 4c75b004b..f7349af55 100644 --- a/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java +++ b/services-examples-parent/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example2.java @@ -22,7 +22,7 @@ public static void main(String[] args) { Microservices.builder() .discovery(ScalecubeServiceDiscovery::new) .transport(RSocketServiceTransport::new) - .contentType(CONTENT_TYPE) // need to send with non-default data format + .defaultContentType(CONTENT_TYPE) // need to send with non-default data format .startAwait(); final Address seedAddress = seed.discovery().address();