Skip to content

Commit

Permalink
problem: polkadot started to use string for subscription id; v0.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
splix committed Jun 22, 2020
1 parent a2c92dd commit 466e229
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 62 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ apply plugin: 'jacoco'

allprojects {
group = 'io.emeraldpay.polkaj'
version = "0.2.0"
version = "0.2.1"

repositories {
mavenLocal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
public class DecodeResponse {

private final ObjectMapper objectMapper;
private final TypeMapping subscriptionMapping;
private final TypeMapping rpcMapping;
private final TypeMapping<String> subscriptionMapping;
private final TypeMapping<Integer> rpcMapping;

public DecodeResponse(ObjectMapper objectMapper, TypeMapping rpcMapping, TypeMapping subscriptionMapping) {
public DecodeResponse(ObjectMapper objectMapper, TypeMapping<Integer> rpcMapping, TypeMapping<String> subscriptionMapping) {
this.objectMapper = objectMapper;
this.rpcMapping = rpcMapping;
this.subscriptionMapping = subscriptionMapping;
Expand All @@ -36,8 +36,8 @@ public <T> WsResponse decode(String json) throws IOException {
throw new IllegalStateException("Not an object");
}
String method = null;
WsResponse.IdValue value = null;
Preparsed preparsed = new Preparsed(objectMapper);
WsResponse.IdValue<String> value = null;
Preparsed<Integer> preparsed = new Preparsed<>(objectMapper);
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.currentToken() == null) {
throw new IllegalStateException("JSON finished before data received");
Expand Down Expand Up @@ -107,16 +107,16 @@ protected RpcResponseError decodeError(JsonParser parser) throws IOException {

}

protected WsResponse.IdValue decodeSubscription(TypeMapping typeMapping, JsonParser parser) throws IOException {
Preparsed preparsed = new Preparsed(objectMapper);
protected WsResponse.IdValue<String> decodeSubscription(TypeMapping<String> typeMapping, JsonParser parser) throws IOException {
Preparsed<String> preparsed = new Preparsed<>(objectMapper);
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.currentToken() == null) {
throw new IllegalStateException("JSON finished before data received");
}

String field = parser.currentName();
if ("subscription".equals(field)) {
preparsed.id = decodeNumber(parser);
preparsed.id = decodeString(parser);
preparsed.type = findType(typeMapping, preparsed.id);
if (preparsed.isReady()) {
return preparsed.build();
Expand All @@ -133,7 +133,7 @@ protected WsResponse.IdValue decodeSubscription(TypeMapping typeMapping, JsonPar
throw new IllegalStateException("Either id or result not found in JSON");
}

private JavaType findType(TypeMapping typeMapping, Integer id) {
private <T> JavaType findType(TypeMapping<T> typeMapping, T id) {
JavaType type;
type = typeMapping.get(id);
if (type == null) {
Expand All @@ -152,14 +152,24 @@ private Integer decodeNumber(JsonParser parser) throws IOException {
return parser.getIntValue();
}

public interface TypeMapping {
JavaType get(int id);
private String decodeString(JsonParser parser) throws IOException {
if (parser.currentToken() != JsonToken.VALUE_STRING) {
parser.nextToken();
}
if (!parser.currentToken().isScalarValue()) {
throw new IllegalStateException("Id is not a string");
}
return parser.getValueAsString();
}

public interface TypeMapping<T> {
JavaType get(T id);
}

private static class Preparsed {
private static class Preparsed<T> {
private final ObjectMapper objectMapper;

Integer id = null;
T id = null;
JavaType type = null;
TreeNode node = null;

Expand All @@ -174,12 +184,12 @@ public boolean isReady() {
error != null || (type != null && node != null);
}

public WsResponse.IdValue build() throws IOException {
public WsResponse.IdValue<T> build() throws IOException {
if (id == null) {
throw new IllegalStateException("Id is not set");
}
if (error != null) {
return new WsResponse.IdValue(id, error);
return new WsResponse.IdValue<T>(id, error);
}
if (type == null) {
throw new IllegalStateException("Type is not set");
Expand All @@ -188,7 +198,7 @@ public WsResponse.IdValue build() throws IOException {
Object value = objectMapper
.readerFor(type)
.readValue(node.traverse(objectMapper));
return new WsResponse.IdValue(id, value);
return new WsResponse.IdValue<T>(id, value);
}
throw new IllegalStateException("Not ready");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

public class DefaultSubscription<T> implements Subscription<T>, Consumer<Subscription.Event<? extends T>> {

private Integer id;
private String id;
private final JavaType type;
private final String unsubscribeMethod;
private final PolkadotWsApi client;
Expand All @@ -20,15 +20,15 @@ public DefaultSubscription(JavaType type, String unsubscribeMethod, PolkadotWsAp
this.client = client;
}

public Integer getId() {
public String getId() {
return id;
}

public JavaType getType() {
return type;
}

public void setId(int id) {
public void setId(String id) {
if (this.id != null) {
throw new IllegalStateException("Subscription id is already set to " + this.id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PolkadotWsApi extends AbstractPolkadotApi implements AutoCloseable,
private final AtomicInteger id = new AtomicInteger(0);
private final AtomicReference<WebSocket> webSocket = new AtomicReference<>(null);
private final ConcurrentHashMap<Integer, RequestExpectation<?>> execution = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, DefaultSubscription<?>> subscriptions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, DefaultSubscription<?>> subscriptions = new ConcurrentHashMap<>();
private final URI target;
private final DecodeResponse decodeResponse;
private final HttpClient httpClient;
Expand All @@ -42,19 +42,19 @@ private PolkadotWsApi(URI target, HttpClient httpClient, ObjectMapper objectMapp
this.target = target;
this.httpClient = httpClient;
this.onClose = onClose;
var rpcMapping = new DecodeResponse.TypeMapping() {
var rpcMapping = new DecodeResponse.TypeMapping<Integer>() {
@Override
public JavaType get(int id) {
public JavaType get(Integer id) {
var x = execution.get(id);
if (x == null) {
return null;
}
return x.getType();
}
};
var subMapping = new DecodeResponse.TypeMapping() {
var subMapping = new DecodeResponse.TypeMapping<String>() {
@Override
public JavaType get(int id) {
public JavaType get(String id) {
var x = subscriptions.get(id);
if (x == null) {
return null;
Expand Down Expand Up @@ -161,7 +161,7 @@ public <T> CompletableFuture<T> execute(RpcCall<T> call) {
@Override
public <T> CompletableFuture<Subscription<T>> subscribe(SubscribeCall<T> call) {
var subscription = new DefaultSubscription<T>(call.getResultType(objectMapper.getTypeFactory()), call.getUnsubscribe(), this);
var start = this.execute(RpcCall.create(Integer.class, call.getMethod(), call.getParams()));
var start = this.execute(RpcCall.create(String.class, call.getMethod(), call.getParams()));
return start.thenApply(id -> {
subscriptions.put(id, subscription);
subscription.setId(id);
Expand Down Expand Up @@ -197,7 +197,7 @@ public <T> void accept(SubscriptionResponse<T> response) {
s.accept(new Subscription.Event<T>(response.method, response.value));
}

public boolean removeSubscription(int id) {
public boolean removeSubscription(String id) {
return subscriptions.remove(id) != null;
}

Expand All @@ -217,17 +217,17 @@ public void close() throws Exception {
}

static class SubscriptionResponse<T> {
private final int id;
private final String id;
private final String method;
private final T value;

public SubscriptionResponse(int id, String method, T value) {
public SubscriptionResponse(String id, String method, T value) {
this.id = id;
this.method = method;
this.value = value;
}

public int getId() {
public String getId() {
return id;
}

Expand All @@ -244,7 +244,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SubscriptionResponse)) return false;
SubscriptionResponse<?> that = (SubscriptionResponse<?>) o;
return id == that.id &&
return id.equals(that.id) &&
Objects.equals(method, that.method) &&
Objects.equals(value, that.value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,16 @@ public enum Type {
/**
* Pair of ID and associated Value
*/
public static class IdValue {
private final int id;
public static class IdValue<T> {
private final T id;
private final Object value;

public IdValue(int id, Object value) {
public IdValue(T id, Object value) {
this.id = id;
this.value = value;
}

public int getId() {
public T getId() {
return id;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,35 @@ class DecodeResponseSpec extends Specification {

def "Decode rpc response with number"() {
setup:
def json = '{"jsonrpc":"2.0","result":5,"id":1}'
def json = '{"jsonrpc":"2.0","result":"EsqruyKPnZvPZ6fr","id":1}'
def mapping = Mock(DecodeResponse.TypeMapping) {
1 * get(1) >> objectMapper.typeFactory.constructType(Integer.class)
1 * get(1) >> objectMapper.typeFactory.constructType(String.class)
}
def decoder = new DecodeResponse(objectMapper, mapping, Stub(DecodeResponse.TypeMapping))
when:
def act = decoder.decode(json)
then:
act.type == WsResponse.Type.RPC
with(act.asRpc()) {
result instanceof Integer
result == 5
result instanceof String
result == "EsqruyKPnZvPZ6fr"
}
}

def "Decode rpc response with number, when id comes first"() {
setup:
def json = '{"jsonrpc":"2.0","id":3,"result":5}'
def json = '{"jsonrpc":"2.0","id":3,"result":"EsqruyKPnZvPZ6fr"}'
def mapping = Mock(DecodeResponse.TypeMapping) {
1 * get(3) >> objectMapper.typeFactory.constructType(Integer.class)
1 * get(3) >> objectMapper.typeFactory.constructType(String.class)
}
def decoder = new DecodeResponse(objectMapper, mapping, Stub(DecodeResponse.TypeMapping))
when:
def act = decoder.decode(json)
then:
act.type == WsResponse.Type.RPC
with(act.asRpc()) {
result instanceof Integer
result == 5
result instanceof String
result == "EsqruyKPnZvPZ6fr"
}
}

Expand Down Expand Up @@ -138,11 +138,11 @@ class DecodeResponseSpec extends Specification {
' "parentHash":"0xbe9110f6da6a19ac645a27472e459dcca6eaf4ee4b0b12700ca5d566eea9a638",' +
' "stateRoot":"0x57059722d680b591a469937449df772b95625d4230b39a0a7d855e16d597f168"' +
' },' +
' "subscription":3' +
' "subscription":"EsqruyKPnZvPZ6fr"' +
' }' +
'}'
def mapping = Mock(DecodeResponse.TypeMapping) {
1 * get(3) >> objectMapper.typeFactory.constructType(BlockJson.Header.class)
1 * get("EsqruyKPnZvPZ6fr") >> objectMapper.typeFactory.constructType(BlockJson.Header.class)
}
def decoder = new DecodeResponse(objectMapper, Stub(DecodeResponse.TypeMapping), mapping)
when:
Expand All @@ -151,7 +151,7 @@ class DecodeResponseSpec extends Specification {
act.type == WsResponse.Type.SUBSCRIPTION
PolkadotWsApi.SubscriptionResponse event = act.asEvent()
event.method == "chain_newHead"
event.id == 3
event.id == "EsqruyKPnZvPZ6fr"
event.value instanceof BlockJson.Header
with((BlockJson.Header)event.value) {
number == 0x1d878c
Expand All @@ -167,7 +167,7 @@ class DecodeResponseSpec extends Specification {
'"jsonrpc":"2.0",' +
'"method":"chain_newHead",' +
'"params":{' +
' "subscription":3,' +
' "subscription":"EsqruyKPnZvPZ6fr",' +
' "result":{' +
' "digest":{"logs":["0x06424142453402d90000004077c40f00000000","0x05424142450101a0085dbd50d943878845263fa4d2bd8259cde78692f1e22488227843057d5a3101909f2bdfb492e6da3f63413366c9e189c7bf4bd62ae10607fe0c1550dc4d88"]},' +
' "extrinsicsRoot":"0x9869230c3cc05051ce9afef4458d2515fb2141bfd3bdcd88292f41e17ea00ae7",' +
Expand All @@ -178,7 +178,7 @@ class DecodeResponseSpec extends Specification {
' }' +
'}'
def mapping = Mock(DecodeResponse.TypeMapping) {
1 * get(3) >> objectMapper.typeFactory.constructType(BlockJson.Header.class)
1 * get("EsqruyKPnZvPZ6fr") >> objectMapper.typeFactory.constructType(BlockJson.Header.class)
}
def decoder = new DecodeResponse(objectMapper, Stub(DecodeResponse.TypeMapping), mapping)
when:
Expand All @@ -187,7 +187,7 @@ class DecodeResponseSpec extends Specification {
act.type == WsResponse.Type.SUBSCRIPTION
PolkadotWsApi.SubscriptionResponse event = act.asEvent()
event.method == "chain_newHead"
event.id == 3
event.id == "EsqruyKPnZvPZ6fr"
event.value instanceof BlockJson.Header
with((BlockJson.Header)event.value) {
number == 0x1d878c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ class DefaultSubscriptionSpec extends Specification {
def "Id is immutable"() {
when:
def s = new DefaultSubscription(null, "test", null)
s.setId(101)
s.setId("EsqruyKPnZvPZ6fr")
then:
s.getId() == 101
s.getId() == "EsqruyKPnZvPZ6fr"

when:
s.setId(102)
s.setId("EsqruyKPnZvPZ6fr")
then:
thrown(IllegalStateException)
}
Expand Down Expand Up @@ -46,11 +46,11 @@ class DefaultSubscriptionSpec extends Specification {
def client = Mock(PolkadotWsApi)
when:
def s = new DefaultSubscription(null, "untest", client)
s.setId(10)
s.setId("EsqruyKPnZvPZ6fr")
s.close()
then:
1 * client.execute(RpcCall.create(Boolean.class, "untest", [10])) >> CompletableFuture.completedFuture(true)
1 * client.removeSubscription(10)
1 * client.execute(RpcCall.create(Boolean.class, "untest", ["EsqruyKPnZvPZ6fr"])) >> CompletableFuture.completedFuture(true)
1 * client.removeSubscription("EsqruyKPnZvPZ6fr")
}

def "Close does nothing if not initialized"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PolkadotWsClientSpec extends Specification {
setup:
List<Map<String, Object>> received = []
when:
server.onNextReply('{"jsonrpc":"2.0","result":101,"id":0}')
server.onNextReply('{"jsonrpc":"2.0","result":"EsqruyKPnZvPZ6fr","id":0}')
def f = client.subscribe(SubscribeCall.create(BlockJson.Header.class, "chain_subscribeNewHead", "chain_unsubscribeNewHead"))
def sub = f.get(TIMEOUT, TimeUnit.SECONDS)
sub.handler({ event ->
Expand All @@ -58,7 +58,7 @@ class PolkadotWsClientSpec extends Specification {
result: event.result
])
})
server.reply('{"jsonrpc":"2.0","method":"chain_newHead","params":{"result":{"digest":{"logs":[]},"extrinsicsRoot":"0x9869230c3cc05051ce9afef4458d2515fb2141bfd3bdcd88292f41e17ea00ae7","number":"0x1d878c","parentHash":"0xbe9110f6da6a19ac645a27472e459dcca6eaf4ee4b0b12700ca5d566eea9a638","stateRoot":"0x57059722d680b591a469937449df772b95625d4230b39a0a7d855e16d597f168"},"subscription":101}}')
server.reply('{"jsonrpc":"2.0","method":"chain_newHead","params":{"result":{"digest":{"logs":[]},"extrinsicsRoot":"0x9869230c3cc05051ce9afef4458d2515fb2141bfd3bdcd88292f41e17ea00ae7","number":"0x1d878c","parentHash":"0xbe9110f6da6a19ac645a27472e459dcca6eaf4ee4b0b12700ca5d566eea9a638","stateRoot":"0x57059722d680b591a469937449df772b95625d4230b39a0a7d855e16d597f168"},"subscription":"EsqruyKPnZvPZ6fr"}}')
Thread.sleep(SLEEP)
sub.close()
Thread.sleep(SLEEP)
Expand All @@ -69,7 +69,7 @@ class PolkadotWsClientSpec extends Specification {

server.received.size() == 2
server.received[0].value == '{"jsonrpc":"2.0","id":0,"method":"chain_subscribeNewHead","params":[]}'
server.received[1].value == '{"jsonrpc":"2.0","id":1,"method":"chain_unsubscribeNewHead","params":[101]}'
server.received[1].value == '{"jsonrpc":"2.0","id":1,"method":"chain_unsubscribeNewHead","params":["EsqruyKPnZvPZ6fr"]}'
}

def "Make a request"() {
Expand Down
Loading

0 comments on commit 466e229

Please sign in to comment.