Skip to content

Commit

Permalink
RHCLOUD-27242 Introduce SEDA in webhook connector (RedHatInsights#2196)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg authored Sep 25, 2023
1 parent ae5b49c commit 8edf38d
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 27 deletions.
32 changes: 26 additions & 6 deletions .rhcicd/clowdapp-connector-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ objects:
value: ${ENV_NAME}
- name: NOTIFICATIONS_CONNECTOR_ENDPOINT_CACHE_MAX_SIZE
value: ${NOTIFICATIONS_CONNECTOR_ENDPOINT_CACHE_MAX_SIZE}
- name: NOTIFICATIONS_CONNECTOR_HTTPS_CONNECT_TIMEOUT_MS
value: ${NOTIFICATIONS_CONNECTOR_HTTPS_CONNECT_TIMEOUT_MS}
- name: NOTIFICATIONS_CONNECTOR_HTTPS_SOCKET_TIMEOUT_MS
value: ${NOTIFICATIONS_CONNECTOR_HTTPS_SOCKET_TIMEOUT_MS}
- name: NOTIFICATIONS_CONNECTOR_HTTP_CONNECT_TIMEOUT_MS
value: ${NOTIFICATIONS_CONNECTOR_HTTP_CONNECT_TIMEOUT_MS}
- name: NOTIFICATIONS_CONNECTOR_HTTP_CONNECTIONS_PER_ROUTE
value: ${NOTIFICATIONS_CONNECTOR_HTTP_CONNECTIONS_PER_ROUTE}
- name: NOTIFICATIONS_CONNECTOR_HTTP_MAX_TOTAL_CONNECTIONS
value: ${NOTIFICATIONS_CONNECTOR_HTTP_MAX_TOTAL_CONNECTIONS}
- name: NOTIFICATIONS_CONNECTOR_HTTP_SOCKET_TIMEOUT_MS
value: ${NOTIFICATIONS_CONNECTOR_HTTP_SOCKET_TIMEOUT_MS}
- name: NOTIFICATIONS_CONNECTOR_KAFKA_INCOMING_MAX_POLL_INTERVAL_MS
value: ${KAFKA_MAX_POLL_INTERVAL_MS}
- name: NOTIFICATIONS_CONNECTOR_KAFKA_INCOMING_MAX_POLL_RECORDS
Expand All @@ -71,6 +75,10 @@ objects:
value: ${NOTIFICATIONS_CONNECTOR_REDELIVERY_DELAY}
- name: NOTIFICATIONS_CONNECTOR_REDELIVERY_MAX_ATTEMPTS
value: ${NOTIFICATIONS_CONNECTOR_REDELIVERY_MAX_ATTEMPTS}
- name: NOTIFICATIONS_CONNECTOR_SEDA_CONCURRENT_CONSUMERS
value: ${NOTIFICATIONS_CONNECTOR_SEDA_CONCURRENT_CONSUMERS}
- name: NOTIFICATIONS_CONNECTOR_SEDA_QUEUE_SIZE
value: ${NOTIFICATIONS_CONNECTOR_SEDA_QUEUE_SIZE}
- name: QUARKUS_HTTP_PORT
value: ${QUARKUS_HTTP_PORT}
- name: QUARKUS_LOG_CATEGORY__COM_REDHAT_CLOUD_NOTIFICATIONS__LEVEL
Expand Down Expand Up @@ -135,10 +143,16 @@ parameters:
- name: NOTIFICATIONS_CONNECTOR_ENDPOINT_CACHE_MAX_SIZE
description: Maximum size of the Camel endpoints cache
value: "100"
- name: NOTIFICATIONS_CONNECTOR_HTTPS_CONNECT_TIMEOUT_MS
- name: NOTIFICATIONS_CONNECTOR_HTTP_CONNECT_TIMEOUT_MS
description: Maximum time in milliseconds allowed to establish an HTTPS connection
value: "30000"
- name: NOTIFICATIONS_CONNECTOR_HTTPS_SOCKET_TIMEOUT_MS
- name: NOTIFICATIONS_CONNECTOR_HTTP_CONNECTIONS_PER_ROUTE
description: Maximum number of HTTP connections per route
value: "20"
- name: NOTIFICATIONS_CONNECTOR_HTTP_MAX_TOTAL_CONNECTIONS
description: Maximum number of HTTP connections
value: "200"
- name: NOTIFICATIONS_CONNECTOR_HTTP_SOCKET_TIMEOUT_MS
description: Maximum time in milliseconds allowed to wait for HTTPS data
value: "30000"
- name: NOTIFICATIONS_CONNECTOR_REDELIVERY_DELAY
Expand All @@ -147,6 +161,12 @@ parameters:
- name: NOTIFICATIONS_CONNECTOR_REDELIVERY_MAX_ATTEMPTS
description: Maximum number of redelivery attempts (initial call not included)
value: "2"
- name: NOTIFICATIONS_CONNECTOR_SEDA_CONCURRENT_CONSUMERS
description: Number of concurrent threads processing exchanges with SEDA
value: "20"
- name: NOTIFICATIONS_CONNECTOR_SEDA_QUEUE_SIZE
description: Maximum capacity of the SEDA queue
value: "20"
- name: NOTIFICATIONS_LOG_LEVEL
description: Log level of Notifications
value: INFO
Expand Down
7 changes: 5 additions & 2 deletions connector-webhook/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@
</dependency>

<!-- Camel -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-http</artifactId>
</dependency>

<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean</artifactId>
<artifactId>camel-quarkus-seda</artifactId>
</dependency>

<!-- Scope: test -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,74 @@
@ApplicationScoped
public class WebhookConnectorConfig extends ConnectorConfig {

private static final String HTTPS_CONNECT_TIMEOUT_MS = "notifications.connector.https.connect-timeout-ms";
private static final String HTTPS_SOCKET_TIMEOUT_MS = "notifications.connector.https.socket-timeout-ms";
private static final String HTTP_CONNECT_TIMEOUT_MS = "notifications.connector.http.connect-timeout-ms";
private static final String HTTP_CONNECTIONS_PER_ROUTE = "notifications.connector.http.connections-per-route";
private static final String HTTP_MAX_TOTAL_CONNECTIONS = "notifications.connector.http.max-total-connections";
private static final String HTTP_SOCKET_TIMEOUT_MS = "notifications.connector.http.socket-timeout-ms";
private static final String ALTERNATIVE_NAMES = "notifications.connector.alternative.names";
private static final String SEDA_CONCURRENT_CONSUMERS = "notifications.connector.seda.concurrent-consumers";
private static final String SEDA_QUEUE_SIZE = "notifications.connector.seda.queue-size";

@ConfigProperty(name = HTTPS_CONNECT_TIMEOUT_MS, defaultValue = "2500")
int httpsConnectTimeout;
@ConfigProperty(name = HTTP_CONNECT_TIMEOUT_MS, defaultValue = "2500")
int httpConnectTimeout;

@ConfigProperty(name = HTTPS_SOCKET_TIMEOUT_MS, defaultValue = "2500")
int httpsSocketTimeout;
@ConfigProperty(name = HTTP_CONNECTIONS_PER_ROUTE, defaultValue = "20")
int httpConnectionsPerRoute;

@ConfigProperty(name = HTTP_MAX_TOTAL_CONNECTIONS, defaultValue = "200")
int httpMaxTotalConnections;

@ConfigProperty(name = HTTP_SOCKET_TIMEOUT_MS, defaultValue = "2500")
int httpSocketTimeout;

@ConfigProperty(name = ALTERNATIVE_NAMES, defaultValue = "ansible")
List<String> alternativeNames;

@ConfigProperty(name = SEDA_CONCURRENT_CONSUMERS, defaultValue = "20")
int sedaConcurrentConsumers;

@ConfigProperty(name = SEDA_QUEUE_SIZE, defaultValue = "20")
int sedaQueueSize;

@Override
public void log() {
Map<String, Object> additionalEntries = Map.of(
HTTPS_CONNECT_TIMEOUT_MS, httpsConnectTimeout,
HTTPS_SOCKET_TIMEOUT_MS, httpsSocketTimeout,
ALTERNATIVE_NAMES, alternativeNames
HTTP_CONNECT_TIMEOUT_MS, httpConnectTimeout,
HTTP_CONNECTIONS_PER_ROUTE, httpConnectionsPerRoute,
HTTP_MAX_TOTAL_CONNECTIONS, httpMaxTotalConnections,
HTTP_SOCKET_TIMEOUT_MS, httpSocketTimeout,
ALTERNATIVE_NAMES, alternativeNames,
SEDA_CONCURRENT_CONSUMERS, sedaConcurrentConsumers,
SEDA_QUEUE_SIZE, sedaQueueSize
);
log(additionalEntries);
}

public int getHttpsConnectTimeout() {
return httpsConnectTimeout;
public int getHttpConnectTimeout() {
return httpConnectTimeout;
}

public int getHttpConnectionsPerRoute() {
return httpConnectionsPerRoute;
}

public int getHttpsSocketTimeout() {
return httpsSocketTimeout;
public int getHttpMaxTotalConnections() {
return httpMaxTotalConnections;
}

public int getHttpSocketTimeout() {
return httpSocketTimeout;
}

public List<String> getAlternativeNames() {
return alternativeNames;
}

public int getSedaConcurrentConsumers() {
return sedaConcurrentConsumers;
}

public int getSedaQueueSize() {
return sedaQueueSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import jakarta.inject.Inject;
import org.apache.camel.builder.endpoint.dsl.HttpEndpointBuilderFactory;
import org.apache.camel.component.http.HttpComponent;
import org.apache.camel.component.seda.SedaComponent;
import org.apache.hc.core5.util.Timeout;
import org.apache.http.conn.ssl.NoopHostnameVerifier;

Expand All @@ -31,17 +32,22 @@ public class WebhookRouteBuilder extends EngineToConnectorRouteBuilder {

public static final String CLOUD_EVENT_TYPE_PREFIX = "com.redhat.console.notification.toCamel.";
private static final String APPLICATION_JSON = "application/json";
private static final String SEDA = "seda";

@Inject
WebhookConnectorConfig webhookConnectorConfig;

@Override
public void configureRoute() {

configureTimeout(getContext().getComponent("http", HttpComponent.class));
configureTimeout(getContext().getComponent("https", HttpComponent.class));
configureHttpComponent("http");
configureHttpComponent("https");
configureSedaComponent();

from(direct(ENGINE_TO_CONNECTOR))
.to(seda(SEDA));

from(seda(SEDA))
.setHeader(CONTENT_TYPE, constant(APPLICATION_JSON))
.routeId(webhookConnectorConfig.getConnectorName())
.choice()
Expand All @@ -68,10 +74,20 @@ public void configureRoute() {
.to(direct(SUCCESS));
}

private void configureTimeout(HttpComponent httpComponent) {
httpComponent.setConnectTimeout(Timeout.ofMilliseconds(webhookConnectorConfig.getHttpsConnectTimeout()));
httpComponent.setSoTimeout(Timeout.ofMilliseconds(webhookConnectorConfig.getHttpsSocketTimeout()));
httpComponent.setFollowRedirects(true);
private void configureHttpComponent(String componentName) {
HttpComponent component = getContext().getComponent(componentName, HttpComponent.class);
component.setConnectTimeout(Timeout.ofMilliseconds(webhookConnectorConfig.getHttpConnectTimeout()));
component.setSoTimeout(Timeout.ofMilliseconds(webhookConnectorConfig.getHttpSocketTimeout()));
component.setConnectionsPerRoute(webhookConnectorConfig.getHttpConnectionsPerRoute());
component.setMaxTotalConnections(webhookConnectorConfig.getHttpMaxTotalConnections());
component.setFollowRedirects(true);
}

private void configureSedaComponent() {
SedaComponent component = getContext().getComponent("seda", SedaComponent.class);
component.setConcurrentConsumers(webhookConnectorConfig.getSedaConcurrentConsumers());
component.setQueueSize(webhookConnectorConfig.getSedaQueueSize());
component.setDefaultBlockWhenFull(true);
}

private HttpEndpointBuilderFactory.HttpEndpointBuilder buildUnsecureSslEndpoint() {
Expand Down

0 comments on commit 8edf38d

Please sign in to comment.