Skip to content

Commit eb1ffde

Browse files
committed
Allowing to connect to RabbitMQ brokers that require SSL
Amended doc accordingly.
1 parent d0fde39 commit eb1ffde

File tree

5 files changed

+14
-0
lines changed

5 files changed

+14
-0
lines changed

config.properties.example

+1
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ kafka.acks=1
239239
#rabbitmq_routing_key_template=%db%.%table%
240240
#rabbitmq_message_persistent=false
241241
#rabbitmq_declare_exchange=true
242+
#rabbitmq_use_ssl=false
242243

243244
# *** redis ***
244245

docs/docs/config.md

+1
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ rabbitmq_exchange_autodelete | BOOLEAN | If set, the exchange is deleted wh
156156
rabbitmq_routing_key_template | STRING | A string template for the routing key, `%db%` and `%table%` will be substituted. | `%db%.%table%`.
157157
rabbitmq_message_persistent | BOOLEAN | Eanble message persistence. | false
158158
rabbitmq_declare_exchange | BOOLEAN | Should declare the exchange for rabbitmq publisher | true
159+
rabbitmq_use_ssl | BOOLEAN | If true, will connect to the server using SSL. | false
159160

160161
_See also:_ [RabbitMQ Producer Documentation](/producers#rabbitmq)
161162

docs/docs/producers.md

+1
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ The remaining configurable properties are:
308308
- This config controls the routing key, where `%db%` and `%table%` are placeholders that will be substituted at runtime
309309
- `rabbitmq_message_persistent` - defaults to **false**
310310
- `rabbitmq_declare_exchange` - defaults to **true**
311+
- `rabbitmq_use_ssl` - defaults to **false**
311312

312313
For more details on these options, you are encouraged to the read official RabbitMQ documentation here: [https://www.rabbitmq.com/documentation.html](https://www.rabbitmq.com/documentation.html)
313314

src/main/java/com/zendesk/maxwell/MaxwellConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,10 @@ public class MaxwellConfig extends AbstractConfig {
545545
*/
546546
public boolean rabbitmqDeclareExchange;
547547

548+
/**
549+
* {@link com.zendesk.maxwell.producer.RabbitmqProducer} use SSL
550+
*/
551+
public boolean rabbitmqUseSSL;
548552

549553
/**
550554
* {@link com.zendesk.maxwell.producer.NatsProducer} URL
@@ -955,6 +959,7 @@ protected MaxwellOptionParser buildOptionParser() {
955959
parser.accepts( "rabbitmq_routing_key_template", "A string template for the routing key, '%db%' and '%table%' will be substituted. Default is '%db%.%table%'." ).withRequiredArg();
956960
parser.accepts( "rabbitmq_message_persistent", "Message persistence. Defaults to false" ).withOptionalArg();
957961
parser.accepts( "rabbitmq_declare_exchange", "Should declare the exchange for rabbitmq publisher. Defaults to true" ).withOptionalArg();
962+
parser.accepts( "rabbitmq_use_ssl", "If true, will connect to the server using SSL. Defaults to false" ).withOptionalArg();
958963

959964
parser.section( "redis" );
960965

@@ -1107,6 +1112,7 @@ private void setup(OptionSet options, Properties properties) {
11071112
this.rabbitmqRoutingKeyTemplate = fetchStringOption("rabbitmq_routing_key_template", options, properties, "%db%.%table%");
11081113
this.rabbitmqMessagePersistent = fetchBooleanOption("rabbitmq_message_persistent", options, properties, false);
11091114
this.rabbitmqDeclareExchange = fetchBooleanOption("rabbitmq_declare_exchange", options, properties, true);
1115+
this.rabbitmqUseSSL = fetchBooleanOption("rabbitmq_use_ssl", options, properties, false);
11101116

11111117
this.natsUrl = fetchStringOption("nats_url", options, properties, "nats://localhost:4222");
11121118
this.natsSubject = fetchStringOption("nats_subject", options, properties, "%{database}.%{table}");

src/main/java/com/zendesk/maxwell/producer/RabbitmqProducer.java

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212

13+
import javax.net.ssl.SSLContext;
1314
import java.io.IOException;
1415
import java.net.URISyntaxException;
1516
import java.security.KeyManagementException;
@@ -61,6 +62,10 @@ public RabbitmqProducer(MaxwellContext context) {
6162
factory.setHandshakeTimeout(config.rabbitmqHandshakeTimeout);
6263
}
6364

65+
if ( config.rabbitmqUseSSL ) {
66+
factory.useSslProtocol(SSLContext.getDefault());
67+
}
68+
6469
this.channel = factory.newConnection().createChannel();
6570
if(context.getConfig().rabbitmqDeclareExchange) {
6671
this.channel.exchangeDeclare(exchangeName, context.getConfig().rabbitmqExchangeType, context.getConfig().rabbitMqExchangeDurable, context.getConfig().rabbitMqExchangeAutoDelete, null);

0 commit comments

Comments
 (0)