diff --git a/rsocket-router/build.gradle b/rsocket-router/build.gradle new file mode 100644 index 000000000..fdadefaf1 --- /dev/null +++ b/rsocket-router/build.gradle @@ -0,0 +1,43 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id 'java-library' + id 'maven-publish' + id 'com.jfrog.artifactory' + id 'com.jfrog.bintray' +} + +dependencies { + api project(':rsocket-core') + + implementation 'org.slf4j:slf4j-api' + + testImplementation project(':rsocket-test') + testImplementation 'org.junit.jupiter:junit-jupiter-api' + testImplementation 'org.junit.jupiter:junit-jupiter-params' + testImplementation 'org.mockito:mockito-core' + testImplementation 'org.assertj:assertj-core' + testImplementation 'io.projectreactor:reactor-test' + + // TODO: Remove after JUnit5 migration + testCompileOnly 'junit:junit' + testImplementation 'org.hamcrest:hamcrest-library' + testRuntimeOnly 'org.junit.vintage:junit-vintage-engine' + testRuntimeOnly 'ch.qos.logback:logback-classic' +} + +description = 'Transparent Load Balancer for RSocket' diff --git a/rsocket-router/src/main/java/io/rsocket/router/CompositeMetadataRouteCodec.java b/rsocket-router/src/main/java/io/rsocket/router/CompositeMetadataRouteCodec.java new file mode 100644 index 000000000..76261d690 --- /dev/null +++ b/rsocket-router/src/main/java/io/rsocket/router/CompositeMetadataRouteCodec.java @@ -0,0 +1,64 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.router; + +import io.netty.buffer.ByteBuf; +import io.rsocket.frame.FrameType; +import io.rsocket.metadata.CompositeMetadata; +import io.rsocket.metadata.CompositeMetadata.Entry; +import io.rsocket.metadata.CompositeMetadata.WellKnownMimeTypeEntry; +import io.rsocket.metadata.RoutingMetadata; +import io.rsocket.metadata.WellKnownMimeType; +import reactor.util.annotation.Nullable; + +public class CompositeMetadataRouteCodec implements RouteCodec { + + @Override + @Nullable + public Route decode(ByteBuf metadataByteBuf, FrameType requestType) { + final CompositeMetadata compositeMetadata = new CompositeMetadata(metadataByteBuf, false); + + String route = null; + String mimeType = null; + + for (Entry compositeMetadatum : compositeMetadata) { + if (compositeMetadatum instanceof WellKnownMimeTypeEntry) { + final WellKnownMimeTypeEntry wellKnownMimeTypeEntry = + (WellKnownMimeTypeEntry) compositeMetadatum; + final WellKnownMimeType type = wellKnownMimeTypeEntry.getType(); + + if (type == WellKnownMimeType.MESSAGE_RSOCKET_ROUTING) { + final RoutingMetadata routingMetadata = + new RoutingMetadata(compositeMetadatum.getContent()); + for (String routeEntry : routingMetadata) { + route = routeEntry; + break; + } + } else if (type == WellKnownMimeType.MESSAGE_RSOCKET_MIMETYPE) { + // FIXME: once codecs are available + // mimeType = compositeMetadatum + } + } + } + + if (route != null) { + return new Route(requestType, route, mimeType); + } + + return null; + } +} diff --git a/rsocket-router/src/main/java/io/rsocket/router/HandlerFunction.java b/rsocket-router/src/main/java/io/rsocket/router/HandlerFunction.java new file mode 100644 index 000000000..a1a1199ee --- /dev/null +++ b/rsocket-router/src/main/java/io/rsocket/router/HandlerFunction.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.router; + +import io.rsocket.Payload; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.util.annotation.Nullable; + +public interface HandlerFunction { + + Route route(); + + @SuppressWarnings("rawtypes") + default Publisher handle(Payload payload) { + return handle(payload, null); + } + + @SuppressWarnings("rawtypes") + Publisher handle(Payload firstPayload, @Nullable Flux payloads); +} diff --git a/rsocket-router/src/main/java/io/rsocket/router/ImmutableRoutingRSocket.java b/rsocket-router/src/main/java/io/rsocket/router/ImmutableRoutingRSocket.java new file mode 100644 index 000000000..cc7dcbf7c --- /dev/null +++ b/rsocket-router/src/main/java/io/rsocket/router/ImmutableRoutingRSocket.java @@ -0,0 +1,58 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.router; + +import java.util.HashMap; +import java.util.Map; + +class ImmutableRoutingRSocket extends RoutingRSocket { + + private final Map mapping; + + ImmutableRoutingRSocket(Map mapping, RouteCodec routeCodec) { + super(routeCodec); + this.mapping = mapping; + } + + @Override + protected HandlerFunction handlerFor(Route route) { + return mapping.get(route); + } + + static final class ImmutableRouterBuilder + implements RoutingRSocket.Builder { + + final HashMap mapping = new HashMap<>(); + final RouteCodec routeCodec; + + ImmutableRouterBuilder(RouteCodec routeCodec) { + this.routeCodec = routeCodec; + } + + @Override + public ImmutableRouterBuilder addHandler(HandlerFunction handler) { + this.mapping.put(handler.route(), handler); + + return this; + } + + @Override + public RoutingRSocket build() { + return new ImmutableRoutingRSocket(this.mapping, routeCodec); + } + } +} diff --git a/rsocket-router/src/main/java/io/rsocket/router/Route.java b/rsocket-router/src/main/java/io/rsocket/router/Route.java new file mode 100644 index 000000000..2f9e77f73 --- /dev/null +++ b/rsocket-router/src/main/java/io/rsocket/router/Route.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.router; + +import io.rsocket.frame.FrameType; +import reactor.util.annotation.Nullable; + +public final class Route { + + final String route; + final String mimeType; + final FrameType requestType; + + public Route(FrameType requestType, String route) { + this(requestType, route, null); + } + + public Route(FrameType requestType, String route, @Nullable String mimeType) { + this.route = route; + this.mimeType = mimeType; + this.requestType = requestType; + } + + public String route() { + return this.route; + } + + @Nullable + public String mimeType() { + return this.mimeType; + } + + public FrameType requestType() { + return requestType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Route route1 = (Route) o; + + if (!route.equals(route1.route)) { + return false; + } + if (mimeType != null ? !mimeType.equals(route1.mimeType) : route1.mimeType != null) { + return false; + } + return requestType == route1.requestType; + } + + @Override + public int hashCode() { + int result = route.hashCode(); + result = 31 * result + (mimeType != null ? mimeType.hashCode() : 0); + result = 31 * result + requestType.hashCode(); + return result; + } +} diff --git a/rsocket-router/src/main/java/io/rsocket/router/RouteCodec.java b/rsocket-router/src/main/java/io/rsocket/router/RouteCodec.java new file mode 100644 index 000000000..ad43bcbfb --- /dev/null +++ b/rsocket-router/src/main/java/io/rsocket/router/RouteCodec.java @@ -0,0 +1,27 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.router; + +import io.netty.buffer.ByteBuf; +import io.rsocket.frame.FrameType; +import reactor.util.annotation.Nullable; + +public interface RouteCodec { + + @Nullable + Route decode(ByteBuf metadataByteBuf, FrameType requestType); +} diff --git a/rsocket-router/src/main/java/io/rsocket/router/Router.java b/rsocket-router/src/main/java/io/rsocket/router/Router.java new file mode 100644 index 000000000..b716acb1b --- /dev/null +++ b/rsocket-router/src/main/java/io/rsocket/router/Router.java @@ -0,0 +1,141 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.router; + +import io.rsocket.Payload; +import io.rsocket.frame.FrameType; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.annotation.Nullable; + +public final class Router { + + public static RequestSpec route(String route) { + return route(route, null); + } + + public static RequestSpec route(String route, @Nullable String mimeType) { + return new RequestSpec(route, mimeType); + } + + @SuppressWarnings("rawtypes") + static final class RequestSpec { + final String route; + final String mimeType; + + RequestSpec(String route, @Nullable String mimeType) { + this.route = route; + this.mimeType = mimeType; + } + + public HandlerFunction fireAndForget(Function> handler) { + final Route route = new Route(FrameType.REQUEST_FNF, this.route, mimeType); + return new HandlerFunction() { + @Override + public Route route() { + return route; + } + + @Override + public Publisher handle(Payload firstPayload, Flux payloads) { + return handler.apply(firstPayload); + } + }; + } + + public HandlerFunction fireAndForget(Consumer handler) { + final Route route = new Route(FrameType.REQUEST_FNF, this.route, this.mimeType); + return new HandlerFunction() { + @Override + public Route route() { + return route; + } + + @Override + public Publisher handle(Payload firstPayload, Flux payloads) { + handler.accept(firstPayload); + return Mono.empty(); + } + }; + } + + public HandlerFunction requestResponse(Function> handler) { + final Route route = new Route(FrameType.REQUEST_RESPONSE, this.route, this.mimeType); + return new HandlerFunction() { + @Override + public Route route() { + return route; + } + + @Override + public Publisher handle(Payload firstPayload, Flux payloads) { + return handler.apply(firstPayload); + } + }; + } + + public HandlerFunction requestStream(Function> handler) { + final Route route = new Route(FrameType.REQUEST_STREAM, this.route, this.mimeType); + return new HandlerFunction() { + @Override + public Route route() { + return route; + } + + @Override + public Publisher handle(Payload firstPayload, Flux payloads) { + return handler.apply(firstPayload); + } + }; + } + + public HandlerFunction requestChannel(Function, Flux> handler) { + final Route route = new Route(FrameType.REQUEST_CHANNEL, this.route, this.mimeType); + return new HandlerFunction() { + @Override + public Route route() { + return route; + } + + @Override + public Publisher handle(Payload firstPayload, Flux payloads) { + return handler.apply(payloads); + } + }; + } + + public HandlerFunction requestChannel( + BiFunction, Flux> handler) { + final Route route = new Route(FrameType.REQUEST_CHANNEL, this.route, this.mimeType); + return new HandlerFunction() { + @Override + public Route route() { + return route; + } + + @Override + public Publisher handle(Payload firstPayload, Flux payloads) { + return handler.apply(firstPayload, payloads); + } + }; + } + } +} diff --git a/rsocket-router/src/main/java/io/rsocket/router/RoutingRSocket.java b/rsocket-router/src/main/java/io/rsocket/router/RoutingRSocket.java new file mode 100644 index 000000000..7fb2fe320 --- /dev/null +++ b/rsocket-router/src/main/java/io/rsocket/router/RoutingRSocket.java @@ -0,0 +1,169 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.router; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.frame.FrameType; +import io.rsocket.router.ImmutableRoutingRSocket.ImmutableRouterBuilder; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.annotation.Nullable; + +public abstract class RoutingRSocket implements RSocket { + + final RouteCodec routeCodec; + + RoutingRSocket(RouteCodec routeCodec) { + this.routeCodec = routeCodec; + } + + @Nullable + protected abstract HandlerFunction handlerFor(Route route); + + @Override + @SuppressWarnings({"unchecked"}) + public Mono fireAndForget(Payload payload) { + try { + final Route route = this.routeCodec.decode(payload.sliceMetadata(), FrameType.REQUEST_FNF); + + if (route != null) { + final HandlerFunction handler = handlerFor(route); + + if (handler != null) { + return (Mono) handler.handle(payload); + } + } + + return RSocket.super.fireAndForget(payload); + } catch (Throwable t) { + return Mono.error(t); + } + } + + @Override + @SuppressWarnings({"unchecked"}) + public Mono requestResponse(Payload payload) { + try { + final Route route = + this.routeCodec.decode(payload.sliceMetadata(), FrameType.REQUEST_RESPONSE); + + if (route != null) { + final HandlerFunction handler = handlerFor(route); + + if (handler != null) { + return (Mono) handler.handle(payload); + } + } + + return RSocket.super.requestResponse(payload); + } catch (Throwable t) { + return Mono.error(t); + } + } + + @Override + @SuppressWarnings("unchecked") + public Flux requestStream(Payload payload) { + try { + final Route route = this.routeCodec.decode(payload.sliceMetadata(), FrameType.REQUEST_STREAM); + + if (route != null) { + final HandlerFunction handler = handlerFor(route); + + if (handler != null) { + return (Flux) handler.handle(payload); + } + } + + return RSocket.super.requestStream(payload); + } catch (Throwable t) { + return Flux.error(t); + } + } + + @Override + @SuppressWarnings("unchecked") + public Flux requestChannel(Publisher source) { + return Flux.from(source) + .switchOnFirst( + (firstSignal, payloads) -> { + final Payload firstPayload = firstSignal.get(); + + if (firstPayload != null) { + try { + final Route route = + this.routeCodec.decode( + firstPayload.sliceMetadata(), FrameType.REQUEST_CHANNEL); + if (route != null) { + final HandlerFunction handler = handlerFor(route); + + if (handler != null) { + return (Flux) handler.handle(firstPayload, payloads); + } + } + } catch (Throwable t) { + firstPayload.release(); + return Flux.error(t); + } + } + + return RSocket.super.requestChannel(payloads); + }, + false); + } + + public static Builder immutable(RouteCodec routeCodec) { + return new ImmutableRouterBuilder(routeCodec); + } + + public interface Builder> { + + default T addFireAndForget(String route, Function> handler) { + return addHandler(Router.route(route).fireAndForget(handler)); + } + + default T addFireAndForget(String route, Consumer handler) { + return addHandler(Router.route(route).fireAndForget(handler)); + } + + default T addRequestResponse(String route, Function> handler) { + return addHandler(Router.route(route).requestResponse(handler)); + } + + default T addRequestStream(String route, Function> handler) { + return addHandler(Router.route(route).requestStream(handler)); + } + + default T addRequestChannel(String route, Function, Flux> handler) { + return addHandler(Router.route(route).requestChannel(handler)); + } + + default T addRequestChannel( + String route, BiFunction, Flux> handler) { + return addHandler(Router.route(route).requestChannel(handler)); + } + + T addHandler(HandlerFunction handler); + + RoutingRSocket build(); + } +} diff --git a/settings.gradle b/settings.gradle index 25c3feee5..9520a25c6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,6 +21,7 @@ rootProject.name = 'rsocket-java' include 'rsocket-core' include 'rsocket-load-balancer' +include 'rsocket-router' include 'rsocket-micrometer' include 'rsocket-test' include 'rsocket-transport-local'