Skip to content

Commit

Permalink
Do not abort CI Visibility spans dispatch on interrupt (#6926)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-tkachenko-datadog authored Apr 18, 2024
1 parent 652193c commit a1463c6
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package datadog.communication.http;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -34,7 +37,7 @@
* instance.
*/
@NotThreadSafe
public class HttpRetryPolicy {
public class HttpRetryPolicy implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(HttpRetryPolicy.class);

Expand All @@ -47,12 +50,35 @@ public class HttpRetryPolicy {

private int retriesLeft;
private long delay;
private boolean interrupted;
private final double delayFactor;
private final boolean suppressInterrupts;

private HttpRetryPolicy(int retriesLeft, long delay, double delayFactor) {
private HttpRetryPolicy(
int retriesLeft, long delay, double delayFactor, boolean suppressInterrupts) {
this.retriesLeft = retriesLeft;
this.delay = delay;
this.delayFactor = delayFactor;
this.suppressInterrupts = suppressInterrupts;
}

public boolean shouldRetry(Exception e) {
if (e instanceof ConnectException) {
return shouldRetry((okhttp3.Response) null);
}
if (e instanceof InterruptedIOException) {
if (suppressInterrupts) {
return shouldRetry((okhttp3.Response) null);
}
}
if (e instanceof InterruptedException) {
if (suppressInterrupts) {
// remember interrupted status to restore the thread's interrupted flag later
interrupted = true;
return shouldRetry((okhttp3.Response) null);
}
}
return false;
}

public boolean shouldRetry(@Nullable okhttp3.Response response) {
Expand Down Expand Up @@ -106,25 +132,52 @@ private long getRateLimitResetTime(okhttp3.Response response) {
}
}

public long backoff() {
long getBackoffDelay() {
long currentDelay = delay;
delay = (long) (delay * delayFactor);
return currentDelay;
}

public void backoff() throws IOException {
try {
Thread.sleep(getBackoffDelay());
} catch (InterruptedException e) {
if (suppressInterrupts) {
// remember interrupted status to restore the thread's interrupted flag later
interrupted = true;
} else {
Thread.currentThread().interrupt();
throw new InterruptedIOException("thread interrupted");
}
}
}

@Override
public void close() {
if (interrupted) {
Thread.currentThread().interrupt();
}
}

public static class Factory {
private final int maxRetries;
private final long initialDelay;
private final double delayFactor;
private final boolean retryInterrupts;

public Factory(int maxRetries, int initialDelay, double delayFactor) {
this(maxRetries, initialDelay, delayFactor, false);
}

public Factory(int maxRetries, int initialDelay, double delayFactor, boolean retryInterrupts) {
this.maxRetries = maxRetries;
this.initialDelay = initialDelay;
this.delayFactor = delayFactor;
this.retryInterrupts = retryInterrupts;
}

public HttpRetryPolicy create() {
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor);
return new HttpRetryPolicy(maxRetries, initialDelay, delayFactor, retryInterrupts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import datadog.trace.util.AgentProxySelector;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -357,30 +356,27 @@ public void writeTo(BufferedSink sink) throws IOException {
}

public static Response sendWithRetries(
OkHttpClient httpClient, HttpRetryPolicy retryPolicy, Request request) throws IOException {
while (true) {
try {
okhttp3.Response response = httpClient.newCall(request).execute();
if (response.isSuccessful()) {
return response;
OkHttpClient httpClient, HttpRetryPolicy.Factory retryPolicyFactory, Request request)
throws IOException {
try (HttpRetryPolicy retryPolicy = retryPolicyFactory.create()) {
while (true) {
try {
Response response = httpClient.newCall(request).execute();
if (response.isSuccessful()) {
return response;
}
if (!retryPolicy.shouldRetry(response)) {
return response;
} else {
closeQuietly(response);
}
} catch (Exception ex) {
if (!retryPolicy.shouldRetry(ex)) {
throw ex;
}
}
if (!retryPolicy.shouldRetry(response)) {
return response;
} else {
closeQuietly(response);
}
} catch (ConnectException ex) {
if (!retryPolicy.shouldRetry(null)) {
throw ex;
}
}
// If we get here, there has been an error, and we still have retries left
long backoffMs = retryPolicy.backoff();
try {
Thread.sleep(backoffMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
// If we get here, there has been an error, and we still have retries left
retryPolicy.backoff();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class HttpRetryPolicyTest extends Specification {

when:
while (retry <= maxRetries) {
def shouldRetry = retryPolicy.shouldRetry()
def shouldRetry = retryPolicy.shouldRetry((Response) null)
shouldRetries << shouldRetry
if (shouldRetry) {
backoffs << retryPolicy.backoff()
backoffs << retryPolicy.getBackoffDelay()
}
retry += 1
}
Expand All @@ -44,10 +44,10 @@ class HttpRetryPolicyTest extends Specification {
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0).create()

def responseBuilder = new Response.Builder()
.code(responseCode)
.request(GroovyMock(Request))
.protocol(Protocol.HTTP_1_1)
.message("")
.code(responseCode)
.request(GroovyMock(Request))
.protocol(Protocol.HTTP_1_1)
.message("")
if (rateLimitHeader != null) {
responseBuilder.header("x-ratelimit-reset", rateLimitHeader)
}
Expand All @@ -73,4 +73,58 @@ class HttpRetryPolicyTest extends Specification {
500 | null | 5
501 | null | 5
}

def "test exceptions are retried: #exception with suppress interrupts #suppressInterrupts"() {
setup:
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, suppressInterrupts).create()

expect:
retryPolicy.shouldRetry(exception) == shouldRetry

where:
exception | suppressInterrupts | shouldRetry
new NullPointerException() | false | false
new IllegalArgumentException() | false | false
new ConnectException() | false | true
new InterruptedIOException() | false | false
new InterruptedIOException() | true | true
new InterruptedException() | false | false
new InterruptedException() | true | true
}

def "test interrupt flag is preserved when suppressing interrupts"() {
setup:
def retryPolicy = new HttpRetryPolicy.Factory(5, 100, 2.0, true).create()

when:
retryPolicy.shouldRetry(new InterruptedException())
retryPolicy.close()

then:
Thread.interrupted()
}

def "test interrupt flag is preserved if interrupted while backing off"() {
setup:
boolean[] b = new boolean[2]

Runnable r = () -> {
def retryPolicy = new HttpRetryPolicy.Factory(5, 1000, 2.0, true).create()
retryPolicy.backoff()

b[0] = Thread.currentThread().isInterrupted()
retryPolicy.close()
b[1] = Thread.interrupted()
}
Thread t = new Thread(r, "test-http-retry-policy-interrupts")

when:
t.start()
t.interrupt()
t.join()

then:
!b[0] // before retry policy is closed, the thread should not be interrupted: interrupts are suppressed
b[1] // after retry policy is closed, the thread should be interrupted: interrupt flag should be restored
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni
}

public @Nullable BackendApi createBackendApi() {
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);

if (config.isCiVisibilityAgentlessEnabled()) {
HttpUrl agentlessUrl = getAgentlessUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ public <T> T post(

final Request request = requestBuilder.post(requestBody).build();

HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
log.debug("Request to {} returned successful response: {}", uri, response.code());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ public <T> T post(
}

Request request = requestBuilder.build();
HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
log.debug("Request to {} returned successful response: {}", uri, response.code());
InputStream responseBodyStream = response.body().byteStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public DDEvpProxyApi build() {
? httpClient
: OkHttpUtils.buildHttpClient(proxiedApiUrl, timeoutMillis);

final HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
final HttpRetryPolicy.Factory retryPolicyFactory =
new HttpRetryPolicy.Factory(5, 100, 2.0, true);

log.debug("proxiedApiUrl: {}", proxiedApiUrl);
return new DDEvpProxyApi(
Expand Down Expand Up @@ -141,9 +142,8 @@ public Response sendSerializedTraces(Payload payload) {
totalTraces += payload.traceCount();
receivedTraces += payload.traceCount();

HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
return Response.success(response.code());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class DDIntakeApiBuilder {

HttpUrl hostUrl = null;
OkHttpClient httpClient = null;
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0);
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);

private String apiKey;

Expand Down Expand Up @@ -134,9 +134,8 @@ public Response sendSerializedTraces(Payload payload) {
totalTraces += payload.traceCount();
receivedTraces += payload.traceCount();

HttpRetryPolicy retryPolicy = retryPolicyFactory.create();
try (okhttp3.Response response =
OkHttpUtils.sendWithRetries(httpClient, retryPolicy, request)) {
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
if (response.isSuccessful()) {
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
return Response.success(response.code());
Expand Down

0 comments on commit a1463c6

Please sign in to comment.