Skip to content

Commit

Permalink
Merge branch 'main' into build-ins
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK authored Aug 28, 2024
2 parents ee37839 + 3ee4549 commit 96e50bd
Show file tree
Hide file tree
Showing 15 changed files with 377 additions and 25 deletions.
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "ibm.ibmmq"
version = "1.0.0"
version = "1.1.0"
authors = ["Ballerina"]
keywords = ["ibm.ibmmq", "client", "messaging", "network", "pubsub"]
repository = "https://github.com/ballerina-platform/module-ballerinax-ibm.ibmmq"
Expand All @@ -12,8 +12,8 @@ distribution = "2201.9.0"
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "ibm.ibmmq-native"
version = "1.0.0"
path = "../native/build/libs/ibm.ibmmq-native-1.0.0.jar"
version = "1.1.0"
path = "../native/build/libs/ibm.ibmmq-native-1.1.0-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "org.json"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "ibm.ibmmq"
version = "1.0.0"
version = "1.1.0"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "jballerina.java"},
Expand Down
1 change: 1 addition & 0 deletions ballerina/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ publishing {

updateTomlFiles.dependsOn copyStdlibs

build.dependsOn "generatePomFileForMavenPublication"
build.dependsOn ":${packageName}-native:build"
build.dependsOn startIBMMQServer
build.finalizedBy stopIBMMQServer
Expand Down
175 changes: 175 additions & 0 deletions ballerina/tests/queue_producer_consumer_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,178 @@ function produceAndConsumerMessageWithMultipleHeaderTypesWithJsonPayloadTest() r
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue", "matchOptions"]
}
function produceConsumeWithMsgId() returns error? {
QueueManager queueManager = check new (
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
userID = "app", password = "password");
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);

byte[] providedMsgId = "msg-id-1".toBytes();
string messageContent = "This is a sample message with a message-id.";
check queue->put({
messageId: providedMsgId,
payload: messageContent.toBytes()
});

Message? message = check queue->get(matchOptions = { messageId: providedMsgId });
test:assertTrue(message is Message, "Could not retrieve a message for a valid message identifier");

byte[]? payload = message?.payload;
test:assertEquals(string:fromBytes(check payload.ensureType()), messageContent);

check queue->close();
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue", "matchOptions"]
}
function produceConsumeWithInvalidMsgId() returns error? {
QueueManager queueManager = check new (
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
userID = "app", password = "password");
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);

string messageContent = "This is a sample message with a message-id.";
check queue->put({
payload: messageContent.toBytes()
});

Message? message = check queue->get(matchOptions = { messageId: "test-msg-id-1".toBytes() });
test:assertTrue(message is (), "Retrieved a message for an invalid message identifier");

check queue->close();
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue", "matchOptions"]
}
function produceConsumeWithCorrId() returns error? {
QueueManager queueManager = check new (
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
userID = "app", password = "password");
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);

byte[] providedCorrId = "corr-id-1".toBytes();
string messageContent = "This is a sample message with a correlation-id.";
check queue->put({
correlationId: providedCorrId,
payload: messageContent.toBytes()
});

Message? message = check queue->get(matchOptions = { correlationId: providedCorrId });
test:assertTrue(message is Message, "Could not retrieve a message for a valid correlation identifier");

byte[]? correlationId = message?.correlationId;
test:assertTrue(correlationId is byte[], "Could not find the correlation identifier for the message");

byte[]? payload = message?.payload;
test:assertEquals(string:fromBytes(check payload.ensureType()), messageContent);

check queue->close();
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue", "matchOptions"]
}
function produceConsumeWithInvalidCorrId() returns error? {
QueueManager queueManager = check new (
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
userID = "app", password = "password");
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);

string messageContent = "This is a sample message with a message-id.";
check queue->put({
payload: messageContent.toBytes()
});

Message? message = check queue->get(matchOptions = { correlationId: "test-corr-id-1".toBytes() });
test:assertTrue(message is (), "Retrieved a message for an invalid correlation identifier");

check queue->close();
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue", "matchOptions"]
}
function produceConsumeWithMsgIdAndCorrId() returns error? {
QueueManager queueManager = check new (
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
userID = "app", password = "password");
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);

byte[] providedMsgId = "msg-id-1".toBytes();
byte[] providedCorrId = "corr-id-1".toBytes();
string messageContent = "This is a sample message with a message-id and a correlation-id.";
check queue->put({
messageId: providedMsgId,
correlationId: providedCorrId,
payload: messageContent.toBytes()
});

Message? message = check queue->get(matchOptions = { messageId: providedMsgId, correlationId: providedCorrId });
test:assertTrue(message is Message, "Could not retrieve a message for a valid message identifier and correlation identifier");

byte[]? payload = message?.payload;
test:assertEquals(string:fromBytes(check payload.ensureType()), messageContent);

check queue->close();
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue", "matchOptions"],
dependsOn: [produceConsumeWithMsgIdAndCorrId]
}
function produceConsumeWithInvalidMsgIdAndCorrId() returns error? {
QueueManager queueManager = check new (
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
userID = "app", password = "password");
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);

byte[] providedMsgId = "msg-id-1".toBytes();
byte[] providedCorrId = "corr-id-1".toBytes();
string messageContent = "This is a sample message with a message-id and a correlation-id.";
check queue->put({
correlationId: providedCorrId,
payload: messageContent.toBytes()
});

Message? message = check queue->get(matchOptions = { messageId: providedMsgId, correlationId: providedCorrId });
test:assertTrue(message is (), "Retrieved a message for an invalid message-id and a correct correlation identifier");

check queue->close();
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue", "matchOptions"],
dependsOn: [produceConsumeWithInvalidMsgIdAndCorrId]
}
function produceConsumeWithMsgIdAndInvalidCorrId() returns error? {
QueueManager queueManager = check new (
name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN",
userID = "app", password = "password");
Queue queue = check queueManager.accessQueue("DEV.QUEUE.2", MQOO_OUTPUT | MQOO_INPUT_AS_Q_DEF);

byte[] providedMsgId = "msg-id-1".toBytes();
byte[] providedCorrId = "corr-id-1".toBytes();
string messageContent = "This is a sample message with a message-id and a correlation-id.";
check queue->put({
messageId: providedMsgId,
payload: messageContent.toBytes()
});

Message? message = check queue->get(matchOptions = { messageId: providedMsgId, correlationId: providedCorrId });
test:assertTrue(message is (), "Retrieved a message for a correct message-id and an invalid correlation identifier");

check queue->close();
check queueManager.disconnect();
}

11 changes: 11 additions & 0 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,20 @@ public type CertKey record {|
# + options - Get message option
# + waitInterval - The maximum time (in seconds) that a `get` call waits for a suitable message to
# arrive. It is used in conjunction with `ibmmq.MQGMO_WAIT`.
# + matchOptions - Message selection criteria
public type GetMessageOptions record {|
int options = MQGMO_NO_WAIT;
int waitInterval = 10;
MatchOptions matchOptions?;
|};

# Represents the selection criteria that determine which message is retrieved.
#
# + messageId - The message identifier of the message which needs to be retrieved
# + correlationId - The Correlation identifier of the message which needs to be retrieved
public type MatchOptions record {|
byte[] messageId?;
byte[] correlationId?;
|};

# Represents an IBM MQ message property.
Expand Down
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased]

### Added

- [Add Support for Retrieving Messages from IBM MQ by Matching Correlation ID and Message ID](https://github.com/ballerina-platform/ballerina-library/issues/6918)

## [1.0.0] - 2024-07-08

### Changed

- [Decouple IBM MQ java client jar from the IBM MQ connector](https://github.com/ballerina-platform/ballerina-library/issues/6287)
Expand Down
15 changes: 14 additions & 1 deletion docs/spec/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,27 @@ public type Message record {|

## 4. Client Options

- GetMessageOptions record represents client options which can be used when retrieving messages from an IBM MQ destination.
- `GetMessageOptions` record represents client options which can be used when retrieving messages from an IBM MQ destination.

```ballerina
public type GetMessageOptions record {|
# Get message option
int options = MQGMO_NO_WAIT;
# The maximum time (in seconds) that a `get` call waits for a suitable message to arrive. It is used in conjunction with `ibmmq.MQGMO_WAIT`.
int waitInterval = 10;
# Message selection criteria
MatchOptions matchOptions?;
|};
```

- `MatchOptions` record represents the selection criteria that determine which message is retrieved.

```ballerina
public type MatchOptions record {|
# The message identifier of the message which needs to be retrieved
byte[] messageId?;
# The Correlation identifier of the message which needs to be retrieved
byte[] correlationId?;
|};
```

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
org.gradle.caching=true
group=io.ballerina.lib
version=1.0.1-SNAPSHOT
version=1.1.0-SNAPSHOT

ballerinaLangVersion=2201.9.0
checkstylePluginVersion=10.12.1
Expand Down
46 changes: 37 additions & 9 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPropertyDescriptor;
import com.ibm.mq.constants.MQConstants;
import com.ibm.mq.headers.MQHeaderList;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
import io.ballerina.lib.ibm.ibmmq.config.MatchOptions;
import io.ballerina.lib.ibm.ibmmq.headers.MQRFH2Header;
import io.ballerina.runtime.api.PredefinedTypes;
import io.ballerina.runtime.api.Runtime;
Expand Down Expand Up @@ -66,7 +69,6 @@
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTY;
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_PROPERTIES;
import static io.ballerina.lib.ibm.ibmmq.Constants.MESSAGE_TYPE_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.OPTIONS;
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_CONTEXT;
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_COPY_OPTIONS;
import static io.ballerina.lib.ibm.ibmmq.Constants.PD_OPTIONS;
Expand All @@ -79,7 +81,6 @@
import static io.ballerina.lib.ibm.ibmmq.Constants.PUT_APPLICATION_TYPE_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QM_NAME_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.REPLY_TO_QUEUE_NAME_FIELD;
import static io.ballerina.lib.ibm.ibmmq.Constants.WAIT_INTERVAL;
import static io.ballerina.lib.ibm.ibmmq.ModuleUtils.getModule;
import static io.ballerina.lib.ibm.ibmmq.headers.MQCIHHeader.createMQCIHHeaderFromBHeader;
import static io.ballerina.lib.ibm.ibmmq.headers.MQIIHHeader.createMQIIHHeaderFromBHeader;
Expand Down Expand Up @@ -293,13 +294,40 @@ private static BMap populateDescriptorFromMQPropertyDescriptor(MQPropertyDescrip
return descriptor;
}

public static MQGetMessageOptions getGetMessageOptions(BMap<BString, Object> bOptions) {
int waitInterval = bOptions.getIntValue(WAIT_INTERVAL).intValue();
int options = bOptions.getIntValue(OPTIONS).intValue();
MQGetMessageOptions getMessageOptions = new MQGetMessageOptions();
getMessageOptions.waitInterval = waitInterval * 1000;
getMessageOptions.options = options;
return getMessageOptions;
public static MQMessage getMqMessage(MatchOptions matchOptions) {
MQMessage message = new MQMessage();
if (Objects.isNull(matchOptions)) {
return message;
}

if (Objects.nonNull(matchOptions.messageId())) {
message.messageId = matchOptions.messageId();
}
if (Objects.nonNull(matchOptions.correlationId())) {
message.correlationId = matchOptions.correlationId();
}
return message;
}

public static MQGetMessageOptions getMqGetMsgOptions(GetMessageOptions getMsgOptions) {
MQGetMessageOptions mqGetMsgOptions = new MQGetMessageOptions();
mqGetMsgOptions.waitInterval = getMsgOptions.waitInterval();
mqGetMsgOptions.options = getMsgOptions.options();

MatchOptions matchOptions = getMsgOptions.matchOptions();
if (Objects.isNull(matchOptions)) {
return mqGetMsgOptions;
}

int matchOpt = MQConstants.MQMO_NONE;
if (Objects.nonNull(matchOptions.messageId())) {
matchOpt |= MQConstants.MQMO_MATCH_MSG_ID;
}
if (Objects.nonNull(matchOptions.correlationId())) {
matchOpt |= MQConstants.MQMO_MATCH_CORREL_ID;
}
mqGetMsgOptions.matchOptions = matchOpt;
return mqGetMsgOptions;
}

private static Object getBHeaders(Runtime runtime, MQMessage mqMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public interface Constants {
BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");
BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
BString OPTIONS = StringUtils.fromString("options");
BString MATCH_OPTIONS = StringUtils.fromString("matchOptions");
BString FORMAT_FIELD = StringUtils.fromString("format");
BString MESSAGE_ID_FIELD = StringUtils.fromString("messageId");
BString CORRELATION_ID_FIELD = StringUtils.fromString("correlationId");
Expand Down
12 changes: 7 additions & 5 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.constants.CMQC;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.Future;
import io.ballerina.runtime.api.values.BError;
Expand Down Expand Up @@ -60,15 +61,16 @@ public static Object put(Environment environment, BObject queueObject, BMap<BStr
return null;
}

public static Object get(Environment environment, BObject queueObject, BMap<BString, Object> options) {
public static Object get(Environment environment, BObject queueObject, BMap<BString, Object> bGetMsgOptions) {
MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE);
MQGetMessageOptions getMessageOptions = CommonUtils.getGetMessageOptions(options);
GetMessageOptions getMsgOptions = new GetMessageOptions(bGetMsgOptions);
MQMessage mqMessage = CommonUtils.getMqMessage(getMsgOptions.matchOptions());
MQGetMessageOptions mqGetMsgOptions = CommonUtils.getMqGetMsgOptions(getMsgOptions);
Future future = environment.markAsync();
QUEUE_EXECUTOR_SERVICE.execute(() -> {
try {
MQMessage message = new MQMessage();
queue.get(message, getMessageOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), message));
queue.get(mqMessage, mqGetMsgOptions);
future.complete(CommonUtils.getBMessageFromMQMessage(environment.getRuntime(), mqMessage));
} catch (MQException e) {
if (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) {
future.complete(null);
Expand Down
Loading

0 comments on commit 96e50bd

Please sign in to comment.