Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion smallrye-reactive-messaging-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>16.3.0</version>
<version>26.55.0</version>
<scope>import</scope>
<type>pom</type>
</dependency>
Expand All @@ -30,6 +30,11 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
Expand All @@ -56,6 +61,16 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-connector-attribute-processor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,30 @@ public class PubSubConfig {
private final String host;
private final Integer port;

private final boolean otelEnabled;

public PubSubConfig(final String projectId, final String topic, final Path credentialPath, final boolean mockPubSubTopics,
final String host, final Integer port) {
final String host, final Integer port, boolean otelEnabled) {
this.projectId = Objects.requireNonNull(projectId, msg.mustNotBeNull("projectId"));
this.topic = Objects.requireNonNull(topic, msg.mustNotBeNull("topic"));
this.credentialPath = credentialPath;
this.subscription = null;
this.mockPubSubTopics = mockPubSubTopics;
this.host = host;
this.port = port;
this.otelEnabled = otelEnabled;
}

public PubSubConfig(final String projectId, final String topic, final Path credentialPath, final String subscription,
final boolean mockPubSubTopics, final String host, final Integer port) {
final boolean mockPubSubTopics, final String host, final Integer port, boolean otelEnabled) {
this.projectId = Objects.requireNonNull(projectId, msg.mustNotBeNull("projectId"));
this.topic = Objects.requireNonNull(topic, msg.mustNotBeNull("topic"));
this.credentialPath = credentialPath;
this.subscription = subscription;
this.mockPubSubTopics = mockPubSubTopics;
this.host = host;
this.port = port;
this.otelEnabled = otelEnabled;
}

public String getProjectId() {
Expand Down Expand Up @@ -68,6 +72,10 @@ public Integer getPort() {
return port;
}

public boolean isOtelEnabled() {
return otelEnabled;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public class PubSubConnector implements InboundConnector, OutboundConnector {
@ConfigProperty(name = "mock-pubsub-port")
private Optional<Integer> port;

@Inject
@ConfigProperty(name = "gcp-pubsub-otel-enabled", defaultValue = "false")
private boolean otelEnabled;

@Inject
private PubSubManager pubSubManager;

Expand All @@ -81,7 +85,7 @@ public void destroy(@Observes @Destroyed(ApplicationScoped.class) final Object c
@Override
public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
final PubSubConfig pubSubConfig = new PubSubConfig(getProjectId(config), getTopic(config), getCredentialPath(config),
getSubscription(config), mockPubSubTopics, host.orElse(null), port.orElse(null));
getSubscription(config), mockPubSubTopics, host.orElse(null), port.orElse(null), getOtelEnabled(config));

return Multi.createFrom().uni(Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> {
if (isUseAdminClient(config)) {
Expand All @@ -97,7 +101,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
@Override
public Flow.Subscriber<? extends Message<?>> getSubscriber(final Config config) {
final PubSubConfig pubSubConfig = new PubSubConfig(getProjectId(config), getTopic(config), getCredentialPath(config),
mockPubSubTopics, host.orElse(null), port.orElse(null));
mockPubSubTopics, host.orElse(null), port.orElse(null), getOtelEnabled(config));

return MultiUtils.via(m -> m.onItem()
.transformToUniAndConcatenate(message -> Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> {
Expand All @@ -114,6 +118,11 @@ private String getProjectId(Config config) {
.orElse(projectId);
}

private boolean getOtelEnabled(Config config) {
return config.getOptionalValue("otel-enabled", Boolean.class)
.orElse(otelEnabled);
}

boolean isUseAdminClient(Config config) {
return config.getOptionalValue("use-admin-client", Boolean.class).orElse(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Message;

Expand All @@ -36,7 +38,9 @@

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.reactive.messaging.tracing.TracingUtils;

@ApplicationScoped
public class PubSubManager {
Expand All @@ -48,6 +52,9 @@ public class PubSubManager {
private final List<MultiEmitter<? super Message<?>>> emitters = new CopyOnWriteArrayList<>();
private final List<ManagedChannel> channels = new CopyOnWriteArrayList<>();

@Inject
private Instance<OpenTelemetry> openTelemetryInstance;
Comment thread
ozangunalp marked this conversation as resolved.

public Publisher publisher(final PubSubConfig config) {
return publishers.computeIfAbsent(config, this::buildPublisher);
}
Expand Down Expand Up @@ -142,6 +149,10 @@ private Publisher buildPublisher(final PubSubConfig config) {
buildCredentialsProvider(config).ifPresent(publisherBuilder::setCredentialsProvider);
buildTransportChannelProvider(config).ifPresent(publisherBuilder::setChannelProvider);

final var openTelemetry = TracingUtils.getOpenTelemetry(openTelemetryInstance);
publisherBuilder.setOpenTelemetry(openTelemetry);
publisherBuilder.setEnableOpenTelemetryTracing(config.isOtelEnabled());

return publisherBuilder.build();
} catch (final IOException e) {
throw ex.illegalStateUnableToBuildPublisher(e);
Expand All @@ -157,6 +168,10 @@ private Subscriber buildSubscriber(final PubSubConfig config, final PubSubMessag
buildCredentialsProvider(config).ifPresent(subscriberBuilder::setCredentialsProvider);
buildTransportChannelProvider(config).ifPresent(subscriberBuilder::setChannelProvider);

final var openTelemetry = TracingUtils.getOpenTelemetry(openTelemetryInstance);
subscriberBuilder.setOpenTelemetry(openTelemetry);
subscriberBuilder.setEnableOpenTelemetryTracing(config.isOtelEnabled());

return subscriberBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,71 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.Flow;

import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral;
import org.jboss.weld.environment.se.Weld;
import org.jboss.weld.environment.se.WeldContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.TopicName;

import io.smallrye.config.SmallRyeConfigProviderResolver;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class PubSubTest extends PubSubTestBase {

private WeldContainer container;

private static final String TOPIC = "pubsub-test";
private String topic;

@BeforeEach
public void initTest() {
initConfiguration(TOPIC);
public void initTest(TestInfo testInfo) {
topic = testInfo.getTestMethod().map(Method::getName).orElse("") + "_" + UUID.randomUUID();
initConfiguration(topic);
final Weld weld = baseWeld();
weld.addBeanClass(ConsumptionBean.class);
addConfig(createSourceConfig(TOPIC, SUBSCRIPTION, PUBSUB_CONTAINER.getFirstMappedPort()));
addConfig(createSourceConfig(topic, SUBSCRIPTION, PUBSUB_CONTAINER.getFirstMappedPort()));
container = weld.initialize();
}

@AfterEach
public void afterEach() {
if (container != null) {
PubSubManager manager = container.select(PubSubManager.class).get();
deleteTopicIfExists(manager, topic);
container.shutdown();
}
SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig());
clear();
PubSubManager manager = container.select(PubSubManager.class).get();
deleteTopicIfExists(manager, TOPIC);
container.shutdown();
}

@Test
@Disabled("Failing on CI - to be investigated")
public void testSourceAndSink() {
final ConsumptionBean consumptionBean = container.select(ConsumptionBean.class).get();

// wait until the subscription is ready
final PubSubManager manager = container.select(PubSubManager.class).get();
await().until(() -> manager
.topicAdminClient(config)
.listTopicSubscriptions((TopicName) ProjectTopicName.of(PROJECT_ID, TOPIC))
.listTopicSubscriptions((TopicName) ProjectTopicName.of(PROJECT_ID, topic))
.getPage()
.getPageElementCount() > 0);

send("Hello-0", TOPIC);
send("Hello-0", topic);
await().until(() -> consumptionBean.getMessages().size() == 1);
assertThat(consumptionBean.getMessages().get(0)).isEqualTo("Hello-0");
for (int i = 1; i < 11; i++) {
send("Hello-" + i, TOPIC);
send("Hello-" + i, topic);
}
await().until(() -> consumptionBean.getMessages().size() == 11);
assertThat(consumptionBean.getMessages()).allSatisfy(s -> assertThat(s).startsWith("Hello-"));
Expand All @@ -84,17 +90,7 @@ private Flow.Subscriber<? extends Message<?>> createSinkSubscriber(final String
}

private PubSubConnector getConnector() {
return container.select(PubSubConnector.class, new Connector() {
@Override
public Class<? extends Annotation> annotationType() {
return Connector.class;
}

@Override
public String value() {
return PubSubConnector.CONNECTOR_NAME;
}
}).get();
return container.select(PubSubConnector.class, ConnectorLiteral.of(PubSubConnector.CONNECTOR_NAME)).get();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.smallrye.reactive.messaging.gcp.pubsub;

import static io.smallrye.reactive.messaging.gcp.pubsub.i18n.PubSubLogging.log;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -15,6 +18,8 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.pubsub.v1.TopicName;

import io.smallrye.config.inject.ConfigExtension;
Expand All @@ -27,6 +32,7 @@
import io.smallrye.reactive.messaging.providers.extension.MediatorManager;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
Expand Down Expand Up @@ -71,12 +77,32 @@ public static void stopPubSubContainer() {

public void initConfiguration(String topic) {
config = new PubSubConfig(PROJECT_ID, topic, null, true, "localhost",
PUBSUB_CONTAINER.getFirstMappedPort());
PUBSUB_CONTAINER.getFirstMappedPort(), false);
}

protected MapBasedConfig createSinkConfig(String channel, String topic, final int containerPort) {
final String prefix = "mp.messaging.outgoing." + channel + ".";
final Map<String, Object> config = new HashMap<>();
config.put(prefix.concat("connector"), PubSubConnector.CONNECTOR_NAME);
config.put(prefix.concat("topic"), topic);

// connector properties
config.put("gcp-pubsub-project-id", PROJECT_ID);
config.put("mock-pubsub-topics", true);
config.put("mock-pubsub-host", "localhost");
config.put("mock-pubsub-port", containerPort);

return new MapBasedConfig(config);
}

protected MapBasedConfig createSourceConfig(final String topic, final String subscription,
final int containerPort) {
final String prefix = "mp.messaging.incoming.source.";
return createSourceConfig("source", topic, subscription, containerPort);
}

protected MapBasedConfig createSourceConfig(String channel, final String topic, final String subscription,
final int containerPort) {
final String prefix = "mp.messaging.incoming." + channel + ".";
final Map<String, Object> config = new HashMap<>();
config.put(prefix.concat("connector"), PubSubConnector.CONNECTOR_NAME);
config.put(prefix.concat("topic"), topic);
Expand All @@ -100,6 +126,7 @@ static Weld baseWeld() {
weld.addExtension(new ConfigExtension());
weld.addBeanClass(MediatorFactory.class);
weld.addBeanClass(MediatorManager.class);
weld.addBeanClass(ConnectorFactories.class);
weld.addBeanClass(InternalChannelRegistry.class);
weld.addBeanClass(ConfiguredChannelFactory.class);
weld.addBeanClass(ChannelProducer.class);
Expand Down Expand Up @@ -133,14 +160,41 @@ static void clear() {
}
}

public void createTopicIfNotExists(PubSubManager manager, String topic) {
final TopicName topicName = TopicName.of(config.getProjectId(), topic);
try (var client = manager.topicAdminClient(config)) {
try {
client.getTopic(topicName);
} catch (final NotFoundException nf) {
try {
client.createTopic(topicName);
} catch (final AlreadyExistsException ae) {
log.topicExistAlready(topicName, ae);
}
}
}
}

void deleteTopicIfExists(PubSubManager manager, String topic) {
System.out.println("Deleting topic " + TopicName.of(PROJECT_ID, topic));
try {
manager.topicAdminClient(config)
.deleteTopic(TopicName.of(PROJECT_ID, topic));
try (var client = manager.topicAdminClient(config)) {
client.deleteTopic(TopicName.of(PROJECT_ID, topic));
} catch (com.google.api.gax.rpc.NotFoundException notFoundException) {
// The topic didn't exist.
}
}

void waitUntilSubscription(PubSubManager manager, String topic, String subscription) {
try (var client = manager.topicAdminClient(this.config)) {
await().until(() -> {
for (String sub : client.listTopicSubscriptions(TopicName.of(PROJECT_ID, topic)).iterateAll()) {
if (sub.endsWith(subscription)) {
return true;
}
}
return false;
});
}
}

}
Loading