Skip to content

Commit

Permalink
Merge pull request #726 from scalecube/feature/issue-724-codec-zepo-3…
Browse files Browse the repository at this point in the history
…dparty

ISSUE-724 Add Jdk Data Codec
  • Loading branch information
artem-v committed May 13, 2020
2 parents d0c8414 + 932ec85 commit f07c7dd
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

/** Simple binary codec for headers service message. */
public final class DefaultHeadersCodec implements HeadersCodec {
/** Simple headers and data codec based on JDK only. */
public class JdkCodec implements DataCodec, HeadersCodec {

/**
* {@inheritDoc}
Expand All @@ -21,6 +24,19 @@ public String contentType() {
return "application/octet-stream";
}

/**
* Uses Jdk Object Serialization.
*
* <p>{@inheritDoc}
*/
@Override
public void encode(OutputStream stream, Object value) throws IOException {
try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
oos.writeObject(value);
oos.flush();
}
}

/**
* {@inheritDoc}
*/
Expand All @@ -40,6 +56,20 @@ public void encode(OutputStream stream, Map<String, String> headers) throws IOEx
}
}

/**
* Uses Jdk Object Serialization.
*
* <p>{@inheritDoc}
*/
@Override
public Object decode(InputStream stream, Type type) throws IOException {
try (ObjectInputStream is = new ObjectInputStream(stream)) {
return is.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e.getMessage(), e);
}
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.scalecube.services.transport.api.JdkCodec
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.scalecube.services.transport.api;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class JdkCodecTest {

private final DataCodec codec = new JdkCodec();

@ParameterizedTest
@MethodSource("provider")
void test(Object body) throws IOException {
Object decoded = writeAndRead(body);
assertEquals(body, decoded);
}

static Stream<Object> provider() {
return Stream.of("hello", Arrays.<Object>asList(1, 2, 3), new Greeting("joe"));
}

private Object writeAndRead(Object body) throws IOException {
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
codec.encode(os, body);
byte[] bytes = os.toByteArray();
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
return codec.decode(is, body.getClass());
}
}
}

static class Greeting implements Serializable {

private final String name;

Greeting(String name) {
this.name = name;
}

public String getName() {
return name;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Greeting greeting = (Greeting) o;
return Objects.equals(name, greeting.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class DefaultHeadersCodecTest {
class JdkHeadersCodecTest {

private HeadersCodec codec = new DefaultHeadersCodec();
private final HeadersCodec codec = new JdkCodec();

@ParameterizedTest
@MethodSource("provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ public final class ServiceMessageCodec {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceMessageCodec.class);

private static final HeadersCodec DEFAULT_HEADERS_CODEC = new JdkCodec();

private final HeadersCodec headersCodec;
private final Map<String, DataCodec> dataCodecs;


/** Message codec with default Headers/Data Codecs. */
public ServiceMessageCodec() {
this(null, null);
Expand All @@ -44,12 +47,12 @@ public ServiceMessageCodec() {
* <p>Default HeadersCodec is DefaultHeadersCodec. This is lightweight binary codec written on
* vanilla java.
*
* @param headersCodec codec for message headers. Default, {@link DefaultHeadersCodec}
* @param headersCodec codec for message headers. Default, {@link JdkCodec}
* @param dataCodecs codecs for message body. Codec will select by Message Content Type.
*/
public ServiceMessageCodec(
@Nullable HeadersCodec headersCodec, @Nullable Collection<DataCodec> dataCodecs) {
this.headersCodec = headersCodec == null ? new DefaultHeadersCodec() : headersCodec;
this.headersCodec = headersCodec == null ? DEFAULT_HEADERS_CODEC : headersCodec;
Map<String, DataCodec> defaultCodecs = DataCodec.INSTANCES;
if (dataCodecs == null) {
this.dataCodecs = defaultCodecs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.scalecube.services.examples.codecs;

import io.scalecube.net.Address;
import io.scalecube.services.Microservices;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl;
import io.scalecube.services.examples.helloworld.service.api.GreetingsService;
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;

public class Example2 {

public static final String CONTENT_TYPE = "application/octet-stream";

/**
* Start the example.
*
* @param args ignored
*/
public static void main(String[] args) {
// ScaleCube Node node with no members
Microservices seed =
Microservices.builder()
.discovery(ScalecubeServiceDiscovery::new)
.transport(RSocketServiceTransport::new)
.defaultContentType(CONTENT_TYPE) // need to send with non-default data format
.startAwait();

final Address seedAddress = seed.discovery().address();

// Construct a ScaleCube node which joins the cluster hosting the Greeting Service
Microservices ms =
Microservices.builder()
.discovery(
endpoint ->
new ScalecubeServiceDiscovery(endpoint)
.membership(cfg -> cfg.seedMembers(seedAddress)))
.transport(RSocketServiceTransport::new)
.services(new GreetingServiceImpl())
.startAwait();

// Create service proxy
GreetingsService service = seed.call().api(GreetingsService.class);

// Execute the services and subscribe to service events
service
.sayHello("joe")
.subscribe(consumer -> System.out.println(consumer.message()));

seed.onShutdown().block();
ms.onShutdown().block();
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.scalecube.services.examples.helloworld.service.api;

public class Greeting {
import java.io.Serializable;

public class Greeting implements Serializable {

String message;

Expand Down

0 comments on commit f07c7dd

Please sign in to comment.