Skip to content

Commit 39d8bda

Browse files
dolfinusebyhr
authored andcommitted
Add compression option to OpenLineage HttpTransport
1 parent 8bee139 commit 39d8bda

File tree

5 files changed

+37
-4
lines changed

5 files changed

+37
-4
lines changed

docs/src/main/sphinx/admin/event-listeners-openlineage.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ event-listener.config-files=etc/openlineage-event-listener.properties,...
210210
- List of custom url params to be added to final HTTP Request. See
211211
[](openlineage-event-listener-custom-url-params) for more details.
212212
- Empty
213+
*
214+
- openlineage-event-listener.transport.compression
215+
- Compression codec used for reducing size of HTTP body.
216+
Allowed values: `none`, `gzip`.
217+
- `none`
213218

214219
:::
215220

plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/http/OpenLineageHttpTransport.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.URI;
2424
import java.util.Map;
2525

26+
import static io.trino.plugin.openlineage.transport.http.OpenLineageHttpTransportConfig.Compression;
2627
import static java.lang.Math.toIntExact;
2728

2829
public class OpenLineageHttpTransport
@@ -34,6 +35,7 @@ public class OpenLineageHttpTransport
3435
private final TokenProvider tokenProvider;
3536
private final Map<String, String> urlParams;
3637
private final Map<String, String> headers;
38+
private final Compression compression;
3739

3840
@Inject
3941
public OpenLineageHttpTransport(OpenLineageHttpTransportConfig config)
@@ -44,6 +46,7 @@ public OpenLineageHttpTransport(OpenLineageHttpTransportConfig config)
4446
this.tokenProvider = config.getApiKey().map(OpenLineageHttpTransport::createTokenProvider).orElse(null);
4547
this.urlParams = config.getUrlParams();
4648
this.headers = config.getHeaders();
49+
this.compression = config.getCompression();
4750
}
4851

4952
@Override
@@ -57,7 +60,7 @@ public HttpTransport buildTransport()
5760
this.tokenProvider,
5861
this.urlParams,
5962
this.headers,
60-
null,
63+
this.compression == Compression.GZIP ? HttpConfig.Compression.GZIP : null,
6164
new HttpSslContextConfig()));
6265
}
6366

plugin/trino-openlineage/src/main/java/io/trino/plugin/openlineage/transport/http/OpenLineageHttpTransportConfig.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,20 @@
3333

3434
public class OpenLineageHttpTransportConfig
3535
{
36+
public enum Compression
37+
{
38+
NONE, GZIP
39+
}
40+
3641
private URI url;
3742
private String endpoint;
3843
private Optional<String> apiKey = Optional.empty();
3944
private Duration timeout = new Duration(5000, TimeUnit.MILLISECONDS);
4045
private Map<String, String> headers = new HashMap<>();
4146
private Map<String, String> urlParams = new HashMap<>();
47+
// Most of OpenLineage HTTP servers support GZIP compression,
48+
// but users may use custom servers that don't. Use NONE for compatibility
49+
private Compression compression = Compression.NONE;
4250

4351
@NotNull
4452
public URI getUrl()
@@ -137,4 +145,18 @@ public OpenLineageHttpTransportConfig setUrlParams(List<String> urlParas)
137145
}
138146
return this;
139147
}
148+
149+
@NotNull
150+
public Compression getCompression()
151+
{
152+
return compression;
153+
}
154+
155+
@Config("openlineage-event-listener.transport.compression")
156+
@ConfigDescription("Compression codec using for HTTP body")
157+
public OpenLineageHttpTransportConfig setCompression(Compression compression)
158+
{
159+
this.compression = compression;
160+
return this;
161+
}
140162
}

plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TestOpenLineageEventListenerMarquezIntegration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ protected QueryRunner createQueryRunner()
4242
return OpenLineageListenerQueryRunner.builder()
4343
.addListenerProperty("openlineage-event-listener.transport.type", "HTTP")
4444
.addListenerProperty("openlineage-event-listener.transport.url", server.getMarquezUri().toString())
45+
.addListenerProperty("openlineage-event-listener.transport.compression", "gzip")
4546
.addListenerProperty("openlineage-event-listener.trino.uri", TRINO_URI)
4647
.build();
4748
}

plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/http/TestOpenLineageHttpTransportConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ void testDefaults()
3737
.setTimeout(Duration.valueOf("5s"))
3838
.setApiKey(null)
3939
.setHeaders(ImmutableList.of())
40-
.setUrlParams(ImmutableList.of()));
40+
.setUrlParams(ImmutableList.of())
41+
.setCompression(OpenLineageHttpTransportConfig.Compression.NONE));
4142
}
4243

4344
@Test
@@ -51,7 +52,7 @@ void testExplicitPropertyMappings()
5152
.put("openlineage-event-listener.transport.timeout", "30s")
5253
.put("openlineage-event-listener.transport.headers", "header1:value1,header2:value2")
5354
.put("openlineage-event-listener.transport.url-params", "urlParam1:urlVal1,urlParam2:urlVal2")
54-
55+
.put("openlineage-event-listener.transport.compression", "gzip")
5556
.buildOrThrow();
5657

5758
OpenLineageHttpTransportConfig expected = new OpenLineageHttpTransportConfig()
@@ -60,7 +61,8 @@ void testExplicitPropertyMappings()
6061
.setApiKey("dummy")
6162
.setTimeout(Duration.valueOf("30s"))
6263
.setHeaders(ImmutableList.of("header1:value1", "header2:value2"))
63-
.setUrlParams(ImmutableList.of("urlParam1:urlVal1", "urlParam2:urlVal2"));
64+
.setUrlParams(ImmutableList.of("urlParam1:urlVal1", "urlParam2:urlVal2"))
65+
.setCompression(OpenLineageHttpTransportConfig.Compression.GZIP);
6466

6567
assertFullMapping(properties, expected);
6668
}

0 commit comments

Comments
 (0)