Skip to content

Commit

Permalink
perf: Use OkHttp Client to completely replace Apache Client and resol…
Browse files Browse the repository at this point in the history
…ve code conflicts.
  • Loading branch information
cnzakii committed Jun 26, 2024
1 parent f5bb0dd commit e83abc5
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.eventmesh.common.config.connector.http;

import org.apache.eventmesh.connector.http.source.protocol.impl.CloudEventProtocol;

import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -53,7 +51,7 @@ public class SourceConnectorConfig {
private int batchSize = 10;

// protocol, default CloudEvent
private String protocol = CloudEventProtocol.PROTOCOL_NAME;
private String protocol = "CloudEvent";

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();
Expand Down
1 change: 0 additions & 1 deletion eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ dependencies {
implementation 'io.vertx:vertx-web-client:4.5.8'
implementation 'dev.failsafe:failsafe:3.3.2'

testImplementation "org.apache.httpcomponents:httpclient"
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
compileOnly 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.eventmesh.connector.http.source.connector;

import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.eventmesh.connector.http.source.protocol;

import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import io.vertx.ext.web.Route;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.connector.http.source.protocol;

import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.source.protocol.impl.CloudEventProtocol;
import org.apache.eventmesh.connector.http.source.protocol.impl.CommonProtocol;
import org.apache.eventmesh.connector.http.source.protocol.impl.GitHubProtocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.eventmesh.connector.http.source.protocol.impl;

import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.eventmesh.connector.http.source.protocol.impl;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.eventmesh.connector.http.source.protocol.impl;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.http.source.data.CommonResponse;
import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
import org.apache.eventmesh.connector.http.source.protocol.Protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,36 @@

package org.apache.eventmesh.connector.http.source.connector;


import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;

import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import java.net.URL;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;


import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

class HttpSourceConnectorTest {

private HttpSourceConnector connector;
private SourceConnectorConfig config;
private CloseableHttpClient httpClient;
private String uri;
private OkHttpClient httpClient;
private String url;
private final String expectedMessage = "testHttpMessage";

@BeforeEach
Expand All @@ -56,19 +57,18 @@ void setUp() throws Exception {
connector.init(sourceConfig);
connector.start();

uri = new URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString();

httpClient = HttpClients.createDefault();
url = new URL("http", "127.0.0.1", config.getPort(), config.getPath()).toString();
httpClient = new OkHttpClient();
}

@Test
void testPoll() throws Exception {
final int batchSize = 10;
// test binary content mode
for (int i = 0; i < batchSize; i++) {
HttpResponse resp = mockBinaryRequest();
Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK);

try (Response resp = mockBinaryRequest()) {
Assertions.assertEquals(200, resp.code());
}
}
List<ConnectRecord> res = connector.poll();
Assertions.assertEquals(batchSize, res.size());
Expand All @@ -78,8 +78,9 @@ void testPoll() throws Exception {

// test structured content mode
for (int i = 0; i < batchSize; i++) {
HttpResponse resp = mockStructuredRequest();
Assertions.assertEquals(resp.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
try (Response resp = mockStructuredRequest()) {
Assertions.assertEquals(200, resp.code());
}
}
res = connector.poll();
Assertions.assertEquals(batchSize, res.size());
Expand All @@ -88,30 +89,39 @@ void testPoll() throws Exception {
}

// test invalid requests
HttpPost invalidPost = new HttpPost(uri);
invalidPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
invalidPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
HttpResponse resp = httpClient.execute(invalidPost);
Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, resp.getStatusLine().getStatusCode());
Request request = new Request.Builder()
.url(url)
.addHeader("Content-Type", "text/plain")
.addHeader("ce-id", String.valueOf(UUID.randomUUID()))
.build();

try (Response resp = httpClient.newCall(request).execute()) {
// verify the response code
Assertions.assertEquals(405, resp.code());
}

}

HttpResponse mockBinaryRequest() throws Exception {
HttpPost httpPost = new HttpPost(uri);
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
httpPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
httpPost.setHeader("ce-specversion", "1.0");
httpPost.setHeader("ce-type", "com.example.someevent");
httpPost.setHeader("ce-source", "/mycontext");
httpPost.setHeader("ce-subject", "test");
httpPost.setEntity(new StringEntity(expectedMessage));

return httpClient.execute(httpPost);
Response mockBinaryRequest() throws Exception {

RequestBody body = RequestBody.create(expectedMessage, MediaType.parse("text/plain"));

Request request = new Request.Builder()
.url(url)
.addHeader("Content-Type", "text/plain")
.addHeader("ce-id", String.valueOf(UUID.randomUUID()))
.addHeader("ce-specversion", "1.0")
.addHeader("ce-type", "com.example.someevent")
.addHeader("ce-source", "/mycontext")
.addHeader("ce-subject", "test")
.post(body)
.build();

return httpClient.newCall(request).execute();
}

HttpResponse mockStructuredRequest() throws Exception {
HttpPost httpPost = new HttpPost(uri);
// according to the CloudEvent specification, a json format event MUST use the media type `application/cloudevents+json`
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/cloudevents+json");
Response mockStructuredRequest() throws Exception {
// create a CloudEvent
TestEvent event = new TestEvent();
event.id = String.valueOf(UUID.randomUUID());
event.specversion = "1.0";
Expand All @@ -120,15 +130,23 @@ HttpResponse mockStructuredRequest() throws Exception {
event.subject = "test";
event.datacontenttype = "text/plain";
event.data = expectedMessage;
httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));

return httpClient.execute(httpPost);
RequestBody body = RequestBody.create(Objects.requireNonNull(JsonUtils.toJSONString(event)), MediaType.parse("application/cloudevents+json"));

Request request = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/cloudevents+json")
.post(body)
.build();

return httpClient.newCall(request).execute();

}

@AfterEach
void tearDown() throws Exception {
void tearDown() {
connector.stop();
httpClient.close();
httpClient.dispatcher().executorService().shutdown();
}

class TestEvent {
Expand Down

0 comments on commit e83abc5

Please sign in to comment.