Skip to content

Commit

Permalink
Fixes #78 - Revamp API to use Sender and Receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
Automated workflow committed Nov 25, 2023
1 parent f65e99a commit 3dc573f
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import java.util.Map;

/**
* An event publisher.
* An event sender.
*
* @author Manfred Riem ([email protected])
* @param <T> the event type.
* @param <UT> the underlying event type.
*/
public interface EventPublisher<T, UT> extends AutoCloseable {
public interface EventSender<T, UT> extends AutoCloseable {

/**
* Get the delegate map.
Expand All @@ -48,11 +48,11 @@ public interface EventPublisher<T, UT> extends AutoCloseable {
Map<String, Object> getDelegate();

/**
* Publish a event.
* Send a event.
*
* @param event the event.
*/
void publish(T event);
void send(T event);

/**
* To underlying event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.manorrock.colibri.api.EventPublisher;
import com.manorrock.colibri.api.EventSender;
import java.util.HashMap;
import java.util.Map;

/**
* The Azure Event Bus implementation of an EventPublisher.
* The Azure Event Bus implementation of an EventSender.
*
* @author Manfred Riem ([email protected])
* @param <T> the type.
*/
public class AzureServiceBusEventPublisher<T> implements EventPublisher<T, byte[]> {
public class AzureServiceBusEventSender<T> implements EventSender<T, byte[]> {

/**
* Stores the client.
Expand All @@ -65,7 +65,7 @@ public class AzureServiceBusEventPublisher<T> implements EventPublisher<T, byte[
* @param connectionString the connection string.
* @param queueName the queue name.
*/
public AzureServiceBusEventPublisher(String connectionString, String queueName) {
public AzureServiceBusEventSender(String connectionString, String queueName) {
client = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sender()
Expand All @@ -88,7 +88,7 @@ public Map<String, Object> getDelegate() {
}

@Override
public void publish(T event) {
public void send(T event) {
ServiceBusMessage message = new ServiceBusMessage(toUnderlyingEvent(event));
client.sendMessage(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.manorrock.colibri.api.EventPublisher;
import com.manorrock.colibri.api.EventSender;
import java.util.HashMap;
import java.util.Map;

/**
* The Azure Event Bus implementation of an EventPublisher.
* The Azure Event Bus implementation of an EventSender.
*
* @author Manfred Riem ([email protected])
* @param <T> the type.
*/
public class AzureEventHubEventPublisher<T> implements EventPublisher<T, EventData> {
public class AzureEventHubEventSender<T> implements EventSender<T, EventData> {

/**
* Stores the client.
Expand All @@ -66,7 +66,7 @@ public class AzureEventHubEventPublisher<T> implements EventPublisher<T, EventDa
* @param connectionString the connection string.
* @param eventHubName the event hub name.
*/
public AzureEventHubEventPublisher(String connectionString, String eventHubName) {
public AzureEventHubEventSender(String connectionString, String eventHubName) {
this.connectionString = connectionString;
this.eventHubName = eventHubName;
client = new EventHubClientBuilder()
Expand All @@ -89,7 +89,7 @@ public Map<String, Object> getDelegate() {
}

@Override
public void publish(T event) {
public void send(T event) {
EventDataBatch batch = client.createBatch();
batch.tryAdd(toUnderlyingEvent(event));
client.send(batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
package com.manorrock.colibri.jms;

import com.manorrock.colibri.api.EventPublisher;
import com.manorrock.colibri.api.EventSender;
import jakarta.jms.ConnectionFactory;
import static jakarta.jms.DeliveryMode.NON_PERSISTENT;
import jakarta.jms.JMSContext;
Expand All @@ -39,12 +39,12 @@
import java.util.Map;

/**
* The JMS TextMessage implementation of an EventPublisher.
* The JMS TextMessage implementation of an EventSender.
*
* @author Manfred Riem ([email protected])
* @param <T> the type.
*/
public class JmsTextMessageEventPublisher<T> implements EventPublisher<T, String> {
public class JmsTextMessageEventSender<T> implements EventSender<T, String> {

/**
* Stores the context.
Expand All @@ -67,7 +67,7 @@ public class JmsTextMessageEventPublisher<T> implements EventPublisher<T, String
* @param connectionFactory the connection factory.
* @param destinationName the destination name.
*/
public JmsTextMessageEventPublisher(
public JmsTextMessageEventSender(
ConnectionFactory connectionFactory, String destinationName) {
context = connectionFactory.createContext();
queue = context.createQueue(destinationName);
Expand All @@ -90,7 +90,7 @@ public Map<String, Object> getDelegate() {
}

@Override
public void publish(T event) {
public void send(T event) {
producer.send(queue, (String) toUnderlyingEvent(event));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.junit.jupiter.api.Test;

/**
* The JUnit tests for the JmsTextMessageEventPublisher class.
* The JUnit tests for the JmsTextMessageEventReceiver class.
*
* @author Manfred Riem ([email protected])
*/
Expand All @@ -48,9 +48,9 @@ class JmsTextMessageEventReceiverTest {
@Test
void testReceive() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
try (JmsTextMessageEventPublisher<String> publisher
= new JmsTextMessageEventPublisher(connectionFactory, "colibri")) {
publisher.publish("Receive me");
try (JmsTextMessageEventSender<String> sender
= new JmsTextMessageEventSender(connectionFactory, "colibri")) {
sender.send("Receive me");
}
try (JmsTextMessageEventReceiver<String> receiver
= new JmsTextMessageEventReceiver(connectionFactory, "colibri")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@
import org.junit.jupiter.api.Test;

/**
* The JUnit tests for the JmsTextMessageEventPublisher class.
* The JUnit tests for the JmsTextMessageEventSender class.
*
* @author Manfred Riem ([email protected])
*/
class JmsTextMessageEventPublisherTest {
class JmsTextMessageEventSenderTest {

/**
* Test publish method.
* Test send method.
*
* @throws Exception when a serious error occurs.
*/
@Test
void testPublish() throws Exception {
void testSend() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
try (JmsTextMessageEventPublisher<String> publisher
= new JmsTextMessageEventPublisher(connectionFactory, "colibri")) {
publisher.publish("Publish me");
try (JmsTextMessageEventSender<String> sender
= new JmsTextMessageEventSender(connectionFactory, "colibri")) {
sender.send("Send me");
}
try (JmsTextMessageEventReceiver<String> receiver
= new JmsTextMessageEventReceiver(connectionFactory, "colibri")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@
*/
package com.manorrock.colibri.kafka;

import com.manorrock.colibri.api.EventPublisher;
import com.manorrock.colibri.api.EventSender;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* The Kafka implementation of an EventPublisher.
* The Kafka implementation of an EventSender.
*
* @author Manfred Riem ([email protected])
* @param <T> the type.
*/
public class KafkaEventPublisher<T> implements EventPublisher<T, String> {
public class KafkaEventSender<T> implements EventSender<T, String> {

/**
* Stores the producer.
Expand All @@ -60,7 +60,7 @@ public class KafkaEventPublisher<T> implements EventPublisher<T, String> {
* @param properties the configuration properties.
* @param topic the topic name.
*/
public KafkaEventPublisher(Properties properties, String topic) {
public KafkaEventSender(Properties properties, String topic) {
producer = new KafkaProducer<>(properties);
this.topic = topic;
}
Expand All @@ -79,7 +79,7 @@ public Map<String, Object> getDelegate() {
}

@Override
public void publish(T event) {
public void send(T event) {
ProducerRecord record = new ProducerRecord(topic, String.valueOf(System.nanoTime()), toUnderlyingEvent(event));
try {
producer.send(record).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.testcontainers.utility.DockerImageName;

/**
* The JUnit tests for the KafkaEventPublisher class.
* The JUnit tests for the KafkaEventSender class.
*
* @author Manfred Riem ([email protected])
*/
Expand All @@ -55,10 +55,10 @@ void testReceive() throws Exception {
properties.put("group.id", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaEventPublisher<String> publisher = new KafkaEventPublisher<>(
KafkaEventSender<String> sender = new KafkaEventSender<>(
properties,
"MyTopic");
publisher.publish("Hello World");
sender.send("Hello World");
properties = new Properties();
properties.put("bootstrap.servers", kafka.getBootstrapServers());
properties.put("auto.offset.reset", "earliest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,27 @@
import org.testcontainers.utility.DockerImageName;

/**
* The JUnit tests for the KafkaEventPublisher class.
* The JUnit tests for the KafkaEventSender class.
*
* @author Manfred Riem ([email protected])
*/
@Testcontainers
class KafkaEventPublisherTest {
class KafkaEventSenderTest {

@Container
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

@Test
void testPublish() throws Exception {
void testSend() throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", kafka.getBootstrapServers());
properties.put("group.id", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaEventPublisher<String> publisher = new KafkaEventPublisher<>(
KafkaEventSender<String> sender = new KafkaEventSender<>(
properties,
"MyTopic");
publisher.publish("Hello World");
sender.send("Hello World");
properties = new Properties();
properties.put("bootstrap.servers", kafka.getBootstrapServers());
properties.put("auto.offset.reset", "earliest");
Expand Down

0 comments on commit 3dc573f

Please sign in to comment.