Skip to content

Commit

Permalink
Load sensitive fields from secrets (#30)
Browse files Browse the repository at this point in the history
* Load sensitive fields from secrets

* Update

* Fix format

* Address comment

* Fix format
  • Loading branch information
jiangpengcheng committed Dec 8, 2023
1 parent 6c5b6ce commit c9acffa
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,73 @@
package org.apache.pulsar.ecosystem.io.activemq;

//import avro.shaded.com.google.common.base.Preconditions;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import lombok.Data;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
* ActiveMQ config.
*/
@Data
public class ActiveMQConnectorConfig implements Serializable {

@FieldDoc(
required = true,
defaultValue = "",
help = "The ActiveMQ protocol")
private String protocol;

@FieldDoc(
required = true,
defaultValue = "",
help = "The ActiveMQ host")
private String host;

@FieldDoc(
required = true,
defaultValue = "",
help = "The ActiveMQ port")
private String port;

@FieldDoc(
sensitive = true,
defaultValue = "",
help = "The username of ActiveMQ")
private String username;

@FieldDoc(
sensitive = true,
defaultValue = "",
help = "The password of ActiveMQ")
private String password;

@FieldDoc(
defaultValue = "",
help = "The queue name of ActiveMQ")
private String queueName;

@FieldDoc(
defaultValue = "",
help = "The topic name of ActiveMQ")
private String topicName;

private String activeMessageType = ActiveMQTextMessage.class.getSimpleName();

public static ActiveMQConnectorConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map), ActiveMQConnectorConfig.class);
public static ActiveMQConnectorConfig load(Map<String, Object> map, SourceContext sourceContext)
throws IOException {
return IOConfigUtils.loadWithSecrets(map, ActiveMQConnectorConfig.class, sourceContext);
}

public static ActiveMQConnectorConfig load(Map<String, Object> map, SinkContext sinkContext) throws IOException {
return IOConfigUtils.loadWithSecrets(map, ActiveMQConnectorConfig.class, sinkContext);
}

public void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void open(Map<String, Object> map, SinkContext sinkContext) throws Except
throw new IllegalStateException("Connector is already open");
}

config = ActiveMQConnectorConfig.load(map);
config = ActiveMQConnectorConfig.load(map, sinkContext);
config.validate();

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getBrokerUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void open(Map<String, Object> map, SourceContext sourceContext) throws Ex
throw new IllegalStateException("Connector is already open");
}

config = ActiveMQConnectorConfig.load(map);
config = ActiveMQConnectorConfig.load(map, sourceContext);
config.validate();

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getBrokerUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.io.IOException;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/**
* connector config test.
Expand All @@ -31,7 +34,8 @@ public class ConnectorConfigTest extends ActiveMQConnectorTestBase {

@Test
public void loadBasicConfigTest() throws IOException {
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig);
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("tcp", activeMQConnectorConfig.getProtocol());
Expand All @@ -42,9 +46,47 @@ public void loadBasicConfigTest() throws IOException {
Assert.assertEquals("admin", activeMQConnectorConfig.getPassword());
}

@Test
public void loadBasicConfigAndCredentialFromSecretTest() throws IOException {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
Mockito.when(sinkContext.getSecret("username"))
.thenReturn("guest");
Mockito.when(sinkContext.getSecret("password"))
.thenReturn("guest");
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("tcp", activeMQConnectorConfig.getProtocol());
Assert.assertEquals("localhost", activeMQConnectorConfig.getHost());
Assert.assertEquals("61616", activeMQConnectorConfig.getPort());
Assert.assertEquals("tcp://localhost:61616", activeMQConnectorConfig.getBrokerUrl());
Assert.assertEquals("guest", activeMQConnectorConfig.getUsername());
Assert.assertEquals("guest", activeMQConnectorConfig.getPassword());
}

@Test
public void loadBasicConfigAndCredentialFromSecretForSourceTest() throws IOException {
SourceContext sourceContext = Mockito.mock(SourceContext.class);
Mockito.when(sourceContext.getSecret("username"))
.thenReturn("guest");
Mockito.when(sourceContext.getSecret("password"))
.thenReturn("guest");
ActiveMQConnectorConfig activeMQConnectorConfig =
ActiveMQConnectorConfig.load(queueConfig, sourceContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("tcp", activeMQConnectorConfig.getProtocol());
Assert.assertEquals("localhost", activeMQConnectorConfig.getHost());
Assert.assertEquals("61616", activeMQConnectorConfig.getPort());
Assert.assertEquals("tcp://localhost:61616", activeMQConnectorConfig.getBrokerUrl());
Assert.assertEquals("guest", activeMQConnectorConfig.getUsername());
Assert.assertEquals("guest", activeMQConnectorConfig.getPassword());
}

@Test
public void loadQueueConfigTest() throws IOException {
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig);
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(queueConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("test-queue", activeMQConnectorConfig.getQueueName());
Expand All @@ -53,7 +95,8 @@ public void loadQueueConfigTest() throws IOException {

@Test
public void loadTopicConfigTest() throws IOException {
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig);
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals("test-topic", activeMQConnectorConfig.getTopicName());
Expand All @@ -62,13 +105,14 @@ public void loadTopicConfigTest() throws IOException {

@Test
public void loadMessageTypeConfig() throws IOException {
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig);
SinkContext sinkContext = Mockito.mock(SinkContext.class);
ActiveMQConnectorConfig activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals(ActiveMQTextMessage.class.getSimpleName(), activeMQConnectorConfig.getActiveMessageType());

topicConfig.put("activeMessageType", ActiveMQBytesMessage.class.getSimpleName());
activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig);
activeMQConnectorConfig = ActiveMQConnectorConfig.load(topicConfig, sinkContext);
activeMQConnectorConfig.validate();

Assert.assertEquals(ActiveMQBytesMessage.class.getSimpleName(), activeMQConnectorConfig.getActiveMessageType());
Expand Down

0 comments on commit c9acffa

Please sign in to comment.