Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public synchronized void shutdownNow(Status reason) {
@Override
@GuardedBy("this")
void notifyShutdown(Status status) {
clientTransportListener.transportShutdown(status, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN);
}

@Override
Expand Down

This file was deleted.

30 changes: 28 additions & 2 deletions core/src/main/java/io/grpc/internal/KeepAliveManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;

/**
* Manages keepalive pings.
Expand Down Expand Up @@ -262,9 +263,34 @@ public interface KeepAlivePinger {
* Default client side {@link KeepAlivePinger}.
*/
public static final class ClientKeepAlivePinger implements KeepAlivePinger {
private final ClientTransportWithDisconnectReason transport;

public ClientKeepAlivePinger(ClientTransportWithDisconnectReason transport) {

/**
* A {@link ClientTransport} that has life-cycle management.
*
*/
@ThreadSafe
public interface TransportWithDisconnectReason extends ClientTransport {

/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but the
* transport will not own any new streams. New streams will either fail (once
* {@link ManagedClientTransport.Listener#transportShutdown} callback called), or be
* transferred off this transport (in which case they may succeed). This method may only be
* called once.
*/
void shutdown(Status reason, DisconnectError disconnectError);

/**
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
* should be closed with the provided {@code reason} and {@code disconnectError}.
*/
void shutdownNow(Status reason, DisconnectError disconnectError);
}

private final TransportWithDisconnectReason transport;

public ClientKeepAlivePinger(TransportWithDisconnectReason transport) {
this.transport = transport;
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public void keepAlivePingDelayedByIncomingData() {

@Test
public void clientKeepAlivePinger_pingTimeout() {
ClientTransportWithDisconnectReason transport =
mock(ClientTransportWithDisconnectReason.class);
ClientKeepAlivePinger.TransportWithDisconnectReason transport =
mock(ClientKeepAlivePinger.TransportWithDisconnectReason.class);
ClientKeepAlivePinger pinger = new ClientKeepAlivePinger(transport);

pinger.onPingTimeout();
Expand All @@ -122,8 +122,8 @@ public void clientKeepAlivePinger_pingTimeout() {

@Test
public void clientKeepAlivePinger_pingFailure() {
ClientTransportWithDisconnectReason transport =
mock(ClientTransportWithDisconnectReason.class);
ClientKeepAlivePinger.TransportWithDisconnectReason transport =
mock(ClientKeepAlivePinger.TransportWithDisconnectReason.class);
ClientKeepAlivePinger pinger = new ClientKeepAlivePinger(transport);
pinger.ping();
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public void shutdownNowKillsClientStream() throws Exception {
client = null;

verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class),
eq(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN));
any(DisconnectError.class));
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Expand Down
10 changes: 7 additions & 3 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientTransportWithDisconnectReason;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.DisconnectError;
import io.grpc.internal.FailingClientStream;
Expand Down Expand Up @@ -72,7 +71,7 @@
* A Netty-based {@link ConnectionClientTransport} implementation.
*/
class NettyClientTransport implements ConnectionClientTransport,
ClientTransportWithDisconnectReason {
ClientKeepAlivePinger.TransportWithDisconnectReason {

private final InternalLogId logId;
private final Map<ChannelOption<?>, ?> channelOptions;
Expand Down Expand Up @@ -351,6 +350,11 @@ public void operationComplete(ChannelFuture future) throws Exception {

@Override
public void shutdown(Status reason) {
shutdown(reason, SimpleDisconnectError.UNKNOWN);
}

@Override
public void shutdown(Status reason, DisconnectError disconnectError) {
// start() could have failed
if (channel == null) {
return;
Expand All @@ -373,7 +377,7 @@ public void shutdownNow(final Status reason, DisconnectError disconnectError) {
handler.getWriteQueue().enqueue(new Runnable() {
@Override
public void run() {
lifecycleManager.notifyShutdown(reason, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
lifecycleManager.notifyShutdown(reason, disconnectError);
channel.write(new ForcefulCloseCommand(reason));
}
}, true);
Expand Down
16 changes: 14 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.grpc.TlsChannelCredentials;
import io.grpc.internal.CertificateUtils;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.ClientTransportWithDisconnectReason;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.DisconnectError;
import io.grpc.internal.GoAwayDisconnectError;
Expand Down Expand Up @@ -133,7 +132,7 @@
* A okhttp-based {@link ConnectionClientTransport} implementation.
*/
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
OutboundFlowController.Transport, ClientTransportWithDisconnectReason {
OutboundFlowController.Transport, ClientKeepAlivePinger.TransportWithDisconnectReason {
private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK =
Expand Down Expand Up @@ -999,6 +998,19 @@ public void shutdown(Status reason) {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this method should just be shutdown(reason, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN). I don't know why it was made to have a duplicate.

}

@Override
public void shutdown(Status reason, DisconnectError disconnectError) {
synchronized (lock) {
if (goAwayStatus != null) {
return;
}

goAwayStatus = reason;
listener.transportShutdown(goAwayStatus, disconnectError);
stopIfNecessary();
}
}

@Override
public void shutdownNow(Status reason) {
shutdownNow(reason, SimpleDisconnectError.UNKNOWN);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUBCHANNEL_SHUTDOWN

Expand Down
69 changes: 41 additions & 28 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1835,12 +1835,16 @@ public void unreachableServer() throws Exception {

ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
clientTransport.start(listener);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
any(DisconnectError.class));
Status status = captor.getValue();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);

verify(listener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
errorCaptor.capture());
Status status = statusCaptor.getValue();
DisconnectError error = errorCaptor.getValue();
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
assertTrue(status.getCause().toString(), status.getCause() instanceof IOException);
assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error);

MockStreamListener streamListener = new MockStreamListener();
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers)
Expand All @@ -1867,11 +1871,14 @@ public void customSocketFactory() throws Exception {

ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
clientTransport.start(listener);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
any(DisconnectError.class));
Status status = captor.getValue();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
errorCaptor.capture());
Status status = statusCaptor.getValue();
DisconnectError error = errorCaptor.getValue();
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error);
assertSame(exception, status.getCause());
}

Expand Down Expand Up @@ -1960,18 +1967,21 @@ public void proxy_500() throws Exception {

assertEquals(-1, sock.getInputStream().read());

ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
any(DisconnectError.class));
Status error = captor.getValue();
assertTrue("Status didn't contain error code: " + captor.getValue(),
error.getDescription().contains("500"));
assertTrue("Status didn't contain error description: " + captor.getValue(),
error.getDescription().contains("OH NO"));
assertTrue("Status didn't contain error text: " + captor.getValue(),
error.getDescription().contains(errorText));
assertEquals("Not UNAVAILABLE: " + captor.getValue(),
Status.UNAVAILABLE.getCode(), error.getCode());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
errorCaptor.capture());
Status status = statusCaptor.getValue();
DisconnectError error = errorCaptor.getValue();
assertTrue("Status didn't contain error code: " + statusCaptor.getValue(),
status.getDescription().contains("500"));
assertTrue("Status didn't contain error description: " + statusCaptor.getValue(),
status.getDescription().contains("OH NO"));
assertTrue("Status didn't contain error text: " + statusCaptor.getValue(),
status.getDescription().contains(errorText));
assertEquals("Not UNAVAILABLE: " + statusCaptor.getValue(),
Status.UNAVAILABLE.getCode(), status.getCode());
assertEquals(new GoAwayDisconnectError(GrpcUtil.Http2Error.INTERNAL_ERROR), error);
sock.close();
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}
Expand All @@ -1998,14 +2008,17 @@ public void proxy_immediateServerClose() throws Exception {
serverSocket.close();
sock.close();

ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture(),
any(DisconnectError.class));
Status error = captor.getValue();
assertTrue("Status didn't contain proxy: " + captor.getValue(),
error.getDescription().contains("proxy"));
assertEquals("Not UNAVAILABLE: " + captor.getValue(),
Status.UNAVAILABLE.getCode(), error.getCode());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<DisconnectError> errorCaptor = ArgumentCaptor.forClass(DisconnectError.class);
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture(),
errorCaptor.capture());
Status status = statusCaptor.getValue();
DisconnectError error = errorCaptor.getValue();
assertTrue("Status didn't contain proxy: " + statusCaptor.getValue(),
status.getDescription().contains("proxy"));
assertEquals("Not UNAVAILABLE: " + statusCaptor.getValue(),
Status.UNAVAILABLE.getCode(), status.getCode());
assertEquals(SimpleDisconnectError.SUBCHANNEL_SHUTDOWN, error);
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}

Expand Down