Skip to content

Commit dad8f83

Browse files
authored
HTTP-154 - Continue on error and metadata column support (#173)
Signed-off-by: davidradl <[email protected]>
1 parent b7fde8a commit dad8f83

22 files changed

+1059
-143
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
.gitignore.swp
66
.project
77
.settings
8+
.DS_Store
89
target
910
bin
1011
/flink.http.connector.iml

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## [Unreleased]
44

5+
- Added ability to continue on error, introducing new metadata columns and new configuration option
6+
`gid.connector.http.source.lookup.continue-on-error`
7+
58
## [0.21.0] - 2025-09-16
69

710
- optimized logging in HttpHeaderUtils.

README.md

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,16 +176,55 @@ The second one is set per individual HTTP requests by HTTP client. Its default v
176176
Flink's current implementation of `AsyncTableFunction` does not allow specifying custom logic for handling Flink AsyncIO timeouts as it is for Java API.
177177
Because of that, if AsyncIO timer passes, Flink will throw TimeoutException which will cause job restart.
178178

179-
#### Retries (Lookup source)
179+
#### Available Metadata (Lookup source)
180+
181+
The metadata column `http-status-code`, if specified in the table definition, will get the HTTP status code.
182+
The metadata column `http-headers-map `, if specified in the table definition, will get a map of the HTTP headers.
183+
184+
HTTP requests can fail either immediately or after temporary error retries. The usual behaviour after such failures is to end the job. If you would like to continue
185+
processing after these failures then specify `gid.connector.http.source.lookup.continue-on-error` as true. THe lookup join will complete without content in the expected enrichment columns from the http call,
186+
this means that these columns will be null for nullable columns and hold a default value for the type for non-nullable columns.
187+
188+
When using `gid.connector.http.source.lookup.continue-on-error` as true, consider adding extra metadata columns that will surface information about failures into your stream.
189+
190+
Metadata columns can be specified and hold http information. They are optional read-only columns that must be declared VIRTUAL to exclude them during an INSERT INTO operation.
191+
192+
| Key | Data Type | Description |
193+
|-----------------------|----------------------------------|----------------------------------------|
194+
| error-string | STRING NULL | A message associated with the error |
195+
| http-status-code | INT NULL | The HTTP status code |
196+
| http-headers-map | MAP <STRING, ARRAY<STRING>> NULL | The headers returned with the response |
197+
| http-completion-state | STRING NULL | The completion state of the http call. |
198+
199+
##### http-completion-state possible values
200+
201+
| Value | Description |
202+
|:------------------|------------------------|
203+
| SUCCESS | Success |
204+
| HTTP_ERROR_STATUS | HTTP error status code |
205+
| EXCEPTION | An Exception occurred |
206+
207+
If the `error-string` metadata column is defined on the table and the call succeeds then it will have a null value.
208+
209+
When a http lookup call fails and populates the metadata columns with the error information, the expected enrichment columns from the http call
210+
are not populated, this means that they will be null for nullable columns and hold a default value for the type for non-nullable columns.
211+
212+
If you are using the Table API `TableResult` and have an `await` with a timeout, this Timeout exception will cause the job to terminate,
213+
even if there are metadata columns defined.
214+
215+
#### Retries and handling errors (Lookup source)
180216
Lookup source handles auto-retries for two scenarios:
181217
1. IOException occurs (e.g. temporary network outage)
182218
2. The response contains a HTTP error code that indicates a retriable error. These codes are defined in the table configuration (see `gid.connector.http.source.lookup.retry-codes`).
183-
Retries are executed silently, without restarting the job. After reaching max retries attempts (per request) operation will fail and restart job.
219+
Retries are executed silently, without restarting the job.
184220

185221
Notice that HTTP codes are categorized into into 3 groups:
186222
- successful responses - response is returned immediately for further processing
187223
- temporary errors - request will be retried up to the retry limit
188-
- error responses - unexpected responses are not retried and will fail the job. Any HTTP error code which is not configured as successful or temporary error is treated as an unretriable error.
224+
- error responses - unexpected responses are not retried. Any HTTP error code which is not configured as successful or temporary error is treated as an unretriable error.
225+
226+
For temporary errors that have reached max retries attempts (per request) and error responses, the operation will
227+
succeed if `gid.connector.http.source.lookup.continue-on-error` is true, otherwise the job will fail.
189228

190229
##### Retry strategy
191230
User can choose retry strategy type for source table:
@@ -555,6 +594,7 @@ be requested if the current time is later than the cached token expiry time minu
555594
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. |
556595
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. Use with `lookup.max-retries` parameter. |
557596
| gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 |
597+
| gid.connector.http.source.lookup.continue-on-error | optional | When true, the flow will continue on errors, returning row content. When false (the default) the job ends on errors. |
558598
| gid.connector.http.source.lookup.proxy.host | optional | Specify the hostname of the proxy. |
559599
| gid.connector.http.source.lookup.proxy.port | optional | Specify the port of the proxy. |
560600
| gid.connector.http.source.lookup.proxy.username | optional | Specify the username used for proxy authentication. |

src/main/java/com/getindata/connectors/http/internal/PollingClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package com.getindata.connectors.http.internal;
22

3-
import java.util.Collection;
4-
53
import org.apache.flink.table.data.RowData;
64
import org.apache.flink.table.functions.FunctionContext;
75

6+
import com.getindata.connectors.http.internal.table.lookup.HttpRowDataWrapper;
7+
88
/**
99
* A client that is used to get enrichment data from external component.
1010
*/
11-
public interface PollingClient<T> {
11+
public interface PollingClient {
1212

1313
/**
1414
* Gets enrichment data from external component using provided lookup arguments.
1515
* @param lookupRow A {@link RowData} containing request parameters.
16-
* @return an optional result of data lookup.
16+
* @return an optional result of data lookup with http information.
1717
*/
18-
Collection<T> pull(RowData lookupRow);
18+
HttpRowDataWrapper pull(RowData lookupRow);
1919

2020
/**
2121
* Initialize the client.

src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import java.io.Serializable;
44

55
import org.apache.flink.api.common.serialization.DeserializationSchema;
6+
import org.apache.flink.table.data.RowData;
67
import org.apache.flink.util.ConfigurationException;
78

89
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
910

10-
public interface PollingClientFactory<OUT> extends Serializable {
11+
public interface PollingClientFactory extends Serializable {
1112

12-
PollingClient<OUT> createPollClient(
13+
PollingClient createPollClient(
1314
HttpLookupConfig options,
14-
DeserializationSchema<OUT> schemaDecoder
15+
DeserializationSchema<RowData> schemaDecoder
1516
) throws ConfigurationException;
1617
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ public final class HttpConnectorConfigConstants {
8888
public static final String SOURCE_CONNECTION_TIMEOUT =
8989
SOURCE_LOOKUP_PREFIX + "connection.timeout";
9090

91+
public static final String CONTINUE_ON_ERROR =
92+
SOURCE_LOOKUP_PREFIX + "continue-on-error";
93+
9194
public static final String SOURCE_PROXY_HOST =
9295
SOURCE_LOOKUP_PREFIX + "proxy.host";
9396

src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
99
public enum RetryStrategyType {
1010
FIXED_DELAY("fixed-delay"),
11-
EXPONENTIAL_DELAY("exponential-delay"),
12-
;
11+
EXPONENTIAL_DELAY("exponential-delay");
1312

1413
private final String code;
1514

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
public enum HttpCompletionState {
4+
HTTP_ERROR_STATUS,
5+
EXCEPTION,
6+
SUCCESS
7+
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ public class HttpLookupConnectorOptions {
8282
.noDefaultValue()
8383
.withDescription("Http client connection timeout.");
8484

85+
public static final ConfigOption<Boolean> SOURCE_LOOKUP_CONTINUE_ON_ERROR =
86+
ConfigOptions.key(CONTINUE_ON_ERROR)
87+
.booleanType()
88+
.defaultValue(false)
89+
.withDescription("Continue job on error.");
90+
8591
public static final ConfigOption<String> SOURCE_LOOKUP_PROXY_HOST =
8692
ConfigOptions.key(SOURCE_PROXY_HOST)
8793
.stringType()

0 commit comments

Comments
 (0)