Skip to content

Commit 8688514

Browse files
committed
HTTP183: consolidate http configuration between sink and lookup
1 parent 9604080 commit 8688514

21 files changed

+456
-439
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [Unreleased]
44

5+
- Added new HTTP Sink configuration options: `gid.connector.http.sink.success-codes`, `gid.connector.http.sink.retry-codes`, and `gid.connector.http.sink.ignored-response-codes`.
56
- Amend to not log HTTP request response and header values by default.
67
- Added http 2 support.
78

README.md

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -452,16 +452,21 @@ is provided.
452452
## HTTP status code handler
453453
### Sink table
454454
You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table.
455-
By default all 400 and 500 response codes will be interpreted as error code.
455+
By default all 400 and 500 response codes will be interpreted as error code. 500, 503, and 504 response codes will be interpreted as retry.
456+
457+
The sink categorizes HTTP responses into groups:
458+
- Success codes (`gid.connector.http.sink.success-codes`): Expected successful responses. `1XX, 2XX, 3XX` are defaults
459+
- Retry codes (`gid.connector.http.sink.retry-codes`): Transient errors that trigger automatic retries when using `at-least-once` delivery guarantee. `500, 503, 504` are defaults
460+
- Ignored responses (`gid.connector.http.sink.ignored-response-codes`): Responses whose content is ignored but treated as successful.
461+
- Error codes: Any response code not classified in the above groups.
462+
463+
Parameters support whitelisting and blacklisting: `2XX,404,!203` means all codes from 200-299, plus 404, except 203.
464+
465+
#### Legacy error code configuration
466+
For backward compatibility, you can use the legacy properties:
467+
- `gid.connector.http.sink.error.code` - HTTP status codes treated as errors (supports masks like `3XX, 4XX, 5XX`).
468+
- `gid.connector.http.sink.error.code.exclude` - HTTP codes to exclude from the error list.
456469

457-
This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are:
458-
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
459-
Many status codes can be defined in one value, where each code should be separated with comma, for example:
460-
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
461-
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
462-
- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list.
463-
Many status codes can be defined in one value, where each code should be separated with comma, for example:
464-
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.
465470

466471
### Source table
467472
The source table categorizes HTTP responses into three groups based on status codes:
@@ -619,6 +624,9 @@ be requested if the current time is later than the cached token expiry time minu
619624
| gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. |
620625
| gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. |
621626
| gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. |
627+
| gid.connector.http.sink.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. |
628+
| gid.connector.http.sink.retry-codes | optional | Comma separated http codes considered as transient errors that will trigger retries. Use [1-5]XX for groups and '!' character for excluding. Only used when `sink.delivery-guarantee` is set to `at-least-once`. |
629+
| gid.connector.http.sink.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. |
622630
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
623631
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
624632
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.getindata.connectors.http;
2+
3+
import java.util.List;
4+
5+
import lombok.Getter;
6+
7+
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
8+
9+
@Getter
10+
public class BatchHttpStatusCodeValidationFailedException extends Exception {
11+
List<HttpRequest> failedRequests;
12+
13+
public BatchHttpStatusCodeValidationFailedException(String message, List<HttpRequest> failedRequests) {
14+
super(message);
15+
this.failedRequests = failedRequests;
16+
}
17+
}
Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,62 @@
11
package com.getindata.connectors.http.internal;
22

33
import java.util.List;
4+
import java.util.stream.Collectors;
45

56
import lombok.Data;
67
import lombok.NonNull;
78
import lombok.ToString;
89

10+
import com.getindata.connectors.http.internal.config.ResponseItemStatus;
911
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
1012
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
1113

1214
/**
1315
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
14-
* to write, divided into two lists &mdash; successful and failed ones.
16+
* to write.
1517
*/
1618
@Data
1719
@ToString
1820
public class SinkHttpClientResponse {
1921

2022
/**
21-
* A list of successfully written requests.
23+
* A list of requests along with write status.
2224
*/
2325
@NonNull
24-
private final List<HttpRequest> successfulRequests;
26+
private final List<ResponseItem> requests;
2527

26-
/**
27-
* A list of requests that {@link SinkHttpClient} failed to write.
28-
*/
29-
@NonNull
30-
private final List<HttpRequest> failedRequests;
28+
public List<HttpRequest> getSuccessfulRequests() {
29+
return requests.stream()
30+
.filter(r -> r.getStatus().equals(ResponseItemStatus.SUCCESS))
31+
.map(ResponseItem::getRequest)
32+
.collect(Collectors.toList());
33+
}
34+
35+
public List<HttpRequest> getFailedRequests() {
36+
return requests.stream()
37+
.filter(r -> r.getStatus().equals(ResponseItemStatus.FAILURE))
38+
.map(ResponseItem::getRequest)
39+
.collect(Collectors.toList());
40+
}
41+
42+
public List<HttpRequest> getTemporalRequests() {
43+
return requests.stream()
44+
.filter(r -> r.getStatus().equals(ResponseItemStatus.TEMPORAL))
45+
.map(ResponseItem::getRequest)
46+
.collect(Collectors.toList());
47+
}
48+
49+
public List<HttpRequest> getIgnoredRequests() {
50+
return requests.stream()
51+
.filter(r -> r.getStatus().equals(ResponseItemStatus.IGNORE))
52+
.map(ResponseItem::getRequest)
53+
.collect(Collectors.toList());
54+
}
55+
56+
@Data
57+
@ToString
58+
public static class ResponseItem {
59+
private final HttpRequest request;
60+
private final ResponseItemStatus status;
61+
}
3162
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public final class HttpConnectorConfigConstants {
1919
*/
2020
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
2121
private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";
22+
private static final String SINK_PREFIX = GID_CONNECTOR_HTTP + "sink.";
2223

2324
/**
2425
* A property prefix for http connector header properties
@@ -45,9 +46,13 @@ public final class HttpConnectorConfigConstants {
4546
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";
4647

4748
// --------- Error code handling configuration ---------
48-
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";
49+
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = SINK_PREFIX + "error.code.exclude";
50+
public static final String HTTP_ERROR_SINK_CODES_LIST = SINK_PREFIX + "error.code";
51+
52+
public static final String SINK_SUCCESS_CODES = SINK_PREFIX + "success-codes";
53+
public static final String SINK_RETRY_CODES = SINK_PREFIX + "retry-codes";
54+
public static final String SINK_IGNORE_RESPONSE_CODES = SINK_PREFIX + "ignored-response-codes";
4955

50-
public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
5156
// -----------------------------------------------------
5257

5358
public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.getindata.connectors.http.internal.config;
2+
3+
public enum ResponseItemStatus {
4+
SUCCESS("success"),
5+
TEMPORAL("temporal"),
6+
IGNORE("ignore"),
7+
FAILURE("failure");
8+
9+
private final String status;
10+
11+
ResponseItemStatus(String status) {
12+
this.status = status;
13+
}
14+
15+
public String getStatus() {
16+
return status;
17+
}
18+
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
import org.apache.flink.metrics.Counter;
1717
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1818

19+
import com.getindata.connectors.http.BatchHttpStatusCodeValidationFailedException;
1920
import com.getindata.connectors.http.internal.SinkHttpClient;
2021
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
22+
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
2123
import com.getindata.connectors.http.internal.utils.ThreadUtils;
2224

2325
/**
@@ -89,20 +91,44 @@ protected void submitRequestEntries(
8991
if (err != null) {
9092
int failedRequestsNumber = requestEntries.size();
9193
log.error(
92-
"Http Sink fatally failed to write all {} requests",
93-
failedRequestsNumber);
94+
"Http Sink fatally failed to write {} requests",
95+
failedRequestsNumber,
96+
err
97+
);
9498
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
9599

96100
// TODO: Make `HttpSinkInternal` retry the failed requests.
97101
// Currently, it does not retry those at all, only adds their count
98102
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have
99103
// a clear image how we want to do it, so it would be both efficient and correct.
100104
//requestResult.accept(requestEntries);
101-
} else if (response.getFailedRequests().size() > 0) {
102-
int failedRequestsNumber = response.getFailedRequests().size();
103-
log.error("Http Sink failed to write and will retry {} requests",
104-
failedRequestsNumber);
105-
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
105+
} else {
106+
List<HttpRequest> failedRequests = response.getFailedRequests();
107+
List<HttpRequest> ignoredRequests = response.getIgnoredRequests();
108+
List<HttpRequest> temporalRequests = response.getTemporalRequests();
109+
110+
if (!failedRequests.isEmpty()) {
111+
numRecordsSendErrorsCounter.inc(failedRequests.size());
112+
log.error(
113+
"failed requests: {}, throwing BatchHttpStatusCodeValidationFailedException from sink",
114+
failedRequests
115+
);
116+
getFatalExceptionCons().accept(new BatchHttpStatusCodeValidationFailedException(
117+
String.format("Received %d fatal response codes", failedRequests.size()), failedRequests)
118+
);
119+
}
120+
121+
if (!ignoredRequests.isEmpty()) {
122+
log.info("Ignoring {} requests", ignoredRequests.size());
123+
}
124+
125+
if (!temporalRequests.isEmpty()) {
126+
numRecordsSendErrorsCounter.inc(temporalRequests.size());
127+
log.error(
128+
"Http Sink failed to write {} temporal requests",
129+
temporalRequests.size()
130+
);
131+
}
106132

107133
// TODO: Make `HttpSinkInternal` retry the failed requests. Currently,
108134
// it does not retry those at all, only adds their count to the

0 commit comments

Comments
 (0)