Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The rsocket object supports IP acquisition #1013

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ public interface DuplexConnection extends Availability, Closeable {
*/
ByteBufAllocator alloc();

/**
* Return the local address that this connection is connected to. The returned {@link
* SocketAddress} varies by transport type and should be downcast to obtain more detailed
* information. For TCP and WebSocket, the address type is {@link java.net.InetSocketAddress}. For
* local transport, it is {@link io.rsocket.transport.local.LocalSocketAddress}.
*
* @return the address
* @since 1.1.1
*/
SocketAddress localAddress();

/**
* Return the remote address that this connection is connected to. The returned {@link
* SocketAddress} varies by transport type and should be downcast to obtain more detailed
Expand Down
33 changes: 33 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/RSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.SocketAddress;

/**
* A contract providing different interaction models for <a
Expand Down Expand Up @@ -79,6 +80,38 @@ default Mono<Void> metadataPush(Payload payload) {
return RSocketAdapter.metadataPush(payload);
}

/**
* Returns the local address where this channel is bound to. The returned
* {@link SocketAddress} is supposed to be down-cast into more concrete
* type such as {@link java.net.InetSocketAddress} to retrieve the detailed
* information.
*
* @return the local address of this channel.
* {@code null} if this channel is not bound.
* @since 1.1.1
*/
default SocketAddress localAddress() {
return null;
}

/**
* Returns the remote address where this channel is connected to. The
* returned {@link SocketAddress} is supposed to be down-cast into more
* concrete type such as {@link java.net.InetSocketAddress} to retrieve the detailed
* information.
*
* @return the remote address of this channel.
* {@code null} if this channel is not connected.
* If this channel is not connected but it can receive messages
* from arbitrary remote addresses to determine
* the origination of the received message as this method will
* return {@code null}.
* @since 1.1.1
*/
default SocketAddress remoteAddress() {
return null;
}

@Override
default double availability() {
return isDisposed() ? 0.0 : 1.0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ public ByteBufAllocator alloc() {
return source.alloc();
}

@Override
public SocketAddress localAddress() {
return source.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return source.remoteAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public ByteBufAllocator alloc() {
return source.alloc();
}

@Override
public SocketAddress localAddress() {
return source.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return source.remoteAddress();
Expand Down
11 changes: 11 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.plugins.RequestInterceptor;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
Expand Down Expand Up @@ -174,6 +175,16 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
return nextStreamId;
}

@Override
public SocketAddress localAddress() {
return getDuplexConnection().localAddress();
}

@Override
public SocketAddress remoteAddress() {
return getDuplexConnection().remoteAddress();
}

@Override
public double availability() {
final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
Expand Down
11 changes: 11 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.rsocket.frame.RequestStreamFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RequestInterceptor;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand Down Expand Up @@ -151,6 +152,16 @@ public Mono<Void> metadataPush(Payload payload) {
}
}

@Override
public SocketAddress localAddress() {
return getDuplexConnection().localAddress();
}

@Override
public SocketAddress remoteAddress() {
return getDuplexConnection().remoteAddress();
}

@Override
public void dispose() {
tryTerminate(() -> new CancellationException("Disposed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public Flux<ByteBuf> receive() {
return this;
}

@Override
public SocketAddress localAddress() {
return source.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return source.remoteAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ResumableDuplexConnection extends Flux<ByteBuf>
final UnboundedProcessor savableFramesSender;
final Disposable framesSaverDisposable;
final Sinks.Empty<Void> onClose;
final SocketAddress localAddress;
final SocketAddress remoteAddress;
final Sinks.Many<Integer> onConnectionClosedSink;

Expand All @@ -73,6 +74,7 @@ public ResumableDuplexConnection(
this.savableFramesSender = new UnboundedProcessor();
this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe();
this.onClose = Sinks.empty();
this.localAddress = initialConnection.localAddress();
this.remoteAddress = initialConnection.remoteAddress();

ACTIVE_CONNECTION.lazySet(this, initialConnection);
Expand Down Expand Up @@ -219,6 +221,11 @@ public boolean isDisposed() {
return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED);
}

@Override
public SocketAddress localAddress() {
return localAddress;
}

@Override
public SocketAddress remoteAddress() {
return remoteAddress;
Expand Down Expand Up @@ -278,6 +285,11 @@ public ByteBufAllocator alloc() {
return ByteBufAllocator.DEFAULT;
}

@Override
public SocketAddress localAddress() {
return null;
}

@Override
@SuppressWarnings("ConstantConditions")
public SocketAddress remoteAddress() {
Expand Down
11 changes: 11 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/util/RSocketProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.SocketAddress;

/** Wrapper/Proxy for a RSocket. This is useful when we want to override a specific method. */
public class RSocketProxy implements RSocket {
Expand Down Expand Up @@ -55,6 +56,16 @@ public Mono<Void> metadataPush(Payload payload) {
return source.metadataPush(payload);
}

@Override
public SocketAddress localAddress() {
return source.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return source.remoteAddress();
}

@Override
public double availability() {
return source.availability();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public ByteBufAllocator alloc() {
return allocator;
}

@Override
public SocketAddress localAddress() {
return new TestLocalSocketAddress(name);
}

@Override
public SocketAddress remoteAddress() {
return new TestLocalSocketAddress(name);
Expand Down
11 changes: 11 additions & 0 deletions rsocket-core/src/test/java/io/rsocket/test/util/MockRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.rsocket.Payload;
import io.rsocket.RSocket;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -71,6 +72,16 @@ public final Mono<Void> metadataPush(Payload payload) {
return delegate.metadataPush(payload).doOnSubscribe(s -> pushCount.incrementAndGet());
}

@Override
public SocketAddress localAddress() {
return delegate.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return delegate.remoteAddress();
}

@Override
public double availability() {
return delegate.availability();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public LeaksTrackingByteBufAllocator alloc() {
return allocator;
}

@Override
public SocketAddress localAddress() {
return new TestLocalSocketAddress("TestDuplexConnection");
}

@Override
public SocketAddress remoteAddress() {
return new TestLocalSocketAddress("TestDuplexConnection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.rsocket.stat.Median;
import io.rsocket.stat.Quantile;
import io.rsocket.util.Clock;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -518,6 +519,16 @@ public Mono<Void> metadataPush(Payload payload) {
return errorVoid;
}

@Override
public SocketAddress localAddress() {
throw new RuntimeException(NoAvailableRSocketException.INSTANCE);
}

@Override
public SocketAddress remoteAddress() {
throw new RuntimeException(NoAvailableRSocketException.INSTANCE);
}

@Override
public double availability() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.rsocket.stat.FrugalQuantile;
import io.rsocket.stat.Quantile;
import io.rsocket.util.Clock;
import java.net.SocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -84,6 +85,16 @@ public Mono<Void> metadataPush(Payload payload) {
return child.metadataPush(payload);
}

@Override
public SocketAddress localAddress() {
return child.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return child.remoteAddress();
}

@Override
public double availability() {
return child.availability();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.rsocket.stat.Ewma;
import io.rsocket.util.Clock;
import io.rsocket.util.RSocketProxy;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -149,6 +150,16 @@ public Mono<Void> metadataPush(Payload payload) {
.doOnSuccess(v -> updateErrorPercentage(1.0));
}

@Override
public SocketAddress localAddress() {
return source.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return source.remoteAddress();
}

@Override
public double availability() {
// If the window is expired set success and failure to zero and return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public ByteBufAllocator alloc() {
return delegate.alloc();
}

@Override
public SocketAddress localAddress() {
return delegate.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return delegate.remoteAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.micrometer.core.instrument.Timer.Sample;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -95,6 +96,16 @@ public Mono<Void> metadataPush(Payload payload) {
return delegate.metadataPush(payload).doFinally(metadataPush);
}

@Override
public SocketAddress localAddress() {
return delegate.localAddress();
}

@Override
public SocketAddress remoteAddress() {
return delegate.remoteAddress();
}

@Override
public Mono<Void> onClose() {
return delegate.onClose();
Expand Down
Loading