Skip to content

Commit

Permalink
Finish connectors migration to SEDA (RedHatInsights#2333)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg authored Nov 13, 2023
1 parent 23db38f commit 5177d83
Show file tree
Hide file tree
Showing 11 changed files with 14 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class ConnectorConfig {
private static final String REDELIVERY_DELAY = "notifications.connector.redelivery.delay";
private static final String REDELIVERY_MAX_ATTEMPTS = "notifications.connector.redelivery.max-attempts";
private static final String SEDA_CONCURRENT_CONSUMERS = "notifications.connector.seda.concurrent-consumers";
private static final String SEDA_ENABLED = "notifications.connector.seda.enabled";
private static final String SEDA_QUEUE_SIZE = "notifications.connector.seda.queue-size";

@ConfigProperty(name = ENDPOINT_CACHE_MAX_SIZE, defaultValue = "100")
Expand Down Expand Up @@ -69,9 +68,6 @@ public class ConnectorConfig {
@ConfigProperty(name = SEDA_CONCURRENT_CONSUMERS, defaultValue = "1")
int sedaConcurrentConsumers;

@ConfigProperty(name = SEDA_ENABLED, defaultValue = "false")
boolean sedaEnabled;

// https://camel.apache.org/components/4.0.x/seda-component.html#_component_option_queueSize
@ConfigProperty(name = SEDA_QUEUE_SIZE, defaultValue = "1000")
int sedaQueueSize;
Expand All @@ -95,7 +91,6 @@ protected void log(Map<String, Object> additionalConfig) {
config.put(REDELIVERY_DELAY, redeliveryDelay);
config.put(REDELIVERY_MAX_ATTEMPTS, redeliveryMaxAttempts);
config.put(SEDA_CONCURRENT_CONSUMERS, sedaConcurrentConsumers);
config.put(SEDA_ENABLED, sedaEnabled);
config.put(SEDA_QUEUE_SIZE, sedaQueueSize);
config.putAll(additionalConfig);

Expand Down Expand Up @@ -153,10 +148,6 @@ public int getSedaConcurrentConsumers() {
return sedaConcurrentConsumers;
}

public boolean isSedaEnabled() {
return sedaEnabled;
}

public int getSedaQueueSize() {
return sedaQueueSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ public void configure() throws Exception {
.handled(true)
.process(exceptionProcessor);

if (connectorConfig.isSedaEnabled()) {
configureSedaComponent();
}
configureSedaComponent();

from(buildKafkaEndpoint())
.routeId(ENGINE_TO_CONNECTOR)
Expand All @@ -58,14 +56,7 @@ public void configure() throws Exception {
.removeHeaders("*")
.process(incomingCloudEventProcessor)
.to(log(getClass().getName()).level("DEBUG").showProperties(true))
// TODO The following lines should be removed when all connectors are migrated to SEDA.
.choice()
.when(exchange -> connectorConfig.isSedaEnabled())
.to(seda(ENGINE_TO_CONNECTOR))
.endChoice()
.otherwise()
.to(direct(ENGINE_TO_CONNECTOR))
.end();
.to(seda(ENGINE_TO_CONNECTOR));

configureRoute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,18 @@ public void process(Exchange exchange) {

process(t, exchange);

if (connectorConfig.isSedaEnabled()) {
/*
* There is currently a bug in Camel that will cause a NullPointerException throw when SEDA is used and the
* exchange is passed to producerTemplate#send. To work around that bug, we're cloning the current exchange
* and removing all Camel internal properties related to the exception that is being processed.
*/
Exchange exchangeCopy = exchange.copy();
exchangeCopy.removeProperty(ERRORHANDLER_BRIDGE);
exchangeCopy.removeProperty(EXCEPTION_CAUGHT);
exchangeCopy.removeProperty(FAILURE_ENDPOINT);
exchangeCopy.removeProperty(FAILURE_ROUTE_ID);
exchangeCopy.removeProperty(FATAL_FALLBACK_ERROR_HANDLER);
producerTemplate.send("direct:" + CONNECTOR_TO_ENGINE, exchangeCopy);
} else {
producerTemplate.send("direct:" + CONNECTOR_TO_ENGINE, exchange);
}
/*
* There is currently a bug in Camel that will cause a NullPointerException throw when SEDA is used and the
* exchange is passed to producerTemplate#send. To work around that bug, we're cloning the current exchange
* and removing all Camel internal properties related to the exception that is being processed.
*/
Exchange exchangeCopy = exchange.copy();
exchangeCopy.removeProperty(ERRORHANDLER_BRIDGE);
exchangeCopy.removeProperty(EXCEPTION_CAUGHT);
exchangeCopy.removeProperty(FAILURE_ENDPOINT);
exchangeCopy.removeProperty(FAILURE_ROUTE_ID);
exchangeCopy.removeProperty(FATAL_FALLBACK_ERROR_HANDLER);
producerTemplate.send("direct:" + CONNECTOR_TO_ENGINE, exchangeCopy);
}

protected final void logDefault(Throwable t, Exchange exchange) {
Expand Down
1 change: 0 additions & 1 deletion connector-drawer/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ notifications.connector.redelivery.counter-name=camel.drawer.retry.counter

# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

Expand Down
1 change: 0 additions & 1 deletion connector-email/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ notifications.connector.name=email_subscription
notifications.connector.redelivery.counter-name=camel.email.retry.counter
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ notifications.connector.name=google_chat
notifications.connector.redelivery.counter-name=camel.google.chat.retry.counter
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ notifications.connector.name=teams
notifications.connector.redelivery.counter-name=camel.teams.retry.counter
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ notifications.connector.name=servicenow
notifications.connector.redelivery.counter-name=camel.servicenow.retry.counter
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

Expand Down
1 change: 0 additions & 1 deletion connector-slack/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ notifications.connector.name=slack
notifications.connector.redelivery.counter-name=camel.slack.retry.counter
# The following value matches the default size of the OkHttp connection pool.
notifications.connector.seda.concurrent-consumers=5
notifications.connector.seda.enabled=true
# The following value matches the default size of the OkHttp connection pool.
notifications.connector.seda.queue-size=5

Expand Down
1 change: 0 additions & 1 deletion connector-splunk/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ notifications.connector.name=splunk
notifications.connector.redelivery.counter-name=camel.splunk.retry.counter
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ notifications.connector.redelivery.counter-name=camel.webhook.retry.counter
notifications.connector.alternative.names=ansible
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.concurrent-consumers=20
notifications.connector.seda.enabled=true
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20

Expand Down

0 comments on commit 5177d83

Please sign in to comment.