Skip to content

Commit

Permalink
Merge pull request #57 from ayeshLK/pmo-dev
Browse files Browse the repository at this point in the history
Introduce support to provide `MQPutMessageOptions` when invoking `put` operation on `ibmmq:Queue` or `ibmmq:Topic`
  • Loading branch information
ayeshLK authored Sep 4, 2024
2 parents f2ff85f + 6a90f8b commit fe71a5c
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 13 deletions.
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "ibm.ibmmq"
version = "1.1.0"
version = "1.1.1"
authors = ["Ballerina"]
keywords = ["ibm.ibmmq", "client", "messaging", "network", "pubsub"]
repository = "https://github.com/ballerina-platform/module-ballerinax-ibm.ibmmq"
Expand All @@ -12,8 +12,8 @@ distribution = "2201.9.0"
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "ibm.ibmmq-native"
version = "1.1.0"
path = "../native/build/libs/ibm.ibmmq-native-1.1.0.jar"
version = "1.1.1"
path = "../native/build/libs/ibm.ibmmq-native-1.1.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "org.json"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "ibm.ibmmq"
version = "1.1.0"
version = "1.1.1"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "jballerina.java"},
Expand Down
44 changes: 44 additions & 0 deletions ballerina/constants.bal
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,50 @@ public const int MQSO_CREATE = 2;

const string DEFAULT_BLANK_VALUE = " ";

# The request is to operate within the normal unit-of-work protocols. The message is not visible outside the
# unit of work until the unit of work is committed. If the unit of work is backed out, the message is deleted.
public const int MQPMO_SYNCPOINT = 2;

# The request is to operate outside the normal unit-of-work protocols. The message is available immediately,
# and it cannot be deleted by backing out a unit of work.
public const int MQPMO_NO_SYNCPOINT = 4;

# Both identity and origin context are set to indicate no context.
public const int MQPMO_NO_CONTEXT = 16384;

# The message is to have default context information associated with it, for both identity and origin.
public const int MQPMO_DEFAULT_CONTEXT = 32;

# The message is to have context information associated with it.
public const int MQPMO_SET_IDENTITY_CONTEXT = 1024;

# The message is to have context information associated with it.
public const int MQPMO_SET_ALL_CONTEXT = 2048;

# This option forces the MQPUT or MQPUT1 call to fail if the queue manager is in the quiescing state.
public const int MQPMO_FAIL_IF_QUIESCING = 8192;

# The queue manager replaces the contents of the MsgId field in MQMD with a new message identifier.
public const int MQPMO_NEW_MSG_ID = 64;

# The queue manager replaces the contents of the CorrelId field in MQMD with a new correlation identifier.
public const int MQPMO_NEW_CORREL_ID = 128;

# This option tells the queue manager how the application puts messages in groups and segments of logical messages.
public const int MQPMO_LOGICAL_ORDER = 32768;

# This indicates that the AlternateUserId field in the ObjDesc parameter of the MQPUT1 call contains a user identifier
# that is to be used to validate authority to put messages on the queue.
public const int MQPMO_ALTERNATE_USER_AUTHORITY = 4096;

# Use this option to fill ResolvedQName in the MQPMO structure with the name of the local queue to which
# the message is put, and ResolvedQMgrName with the name of the local queue manager that hosts the local queue.
public const int MQPMO_RESOLVE_LOCAL_Q = 262144;

# The MQPMO_ASYNC_RESPONSE option requests that an MQPUT or MQPUT1 operation is completed without the
# application waiting for the queue manager to complete the call.
public const int MQPMO_ASYNC_RESPONSE = 65536;

// SSL cipher suite related constants

# SSL cipher suite using ECDHE-ECDSA for key exchange with 3DES encryption and SHA integrity.
Expand Down
10 changes: 7 additions & 3 deletions ballerina/destination.bal
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import ballerina/jballerina.java;

# IBM MQ destination client type.
public type Destination distinct client object {
remote function put(Message message) returns Error?;
remote function put(Message message, int options = 0) returns Error?;

remote function get(*GetMessageOptions getMessageOptions) returns Message|Error?;

Expand All @@ -35,8 +35,10 @@ public isolated client class Queue {
# ```
#
# + message - IBM MQ message
# + options - Options controlling the action of the put operation. Can be a combination of
# one or more `ibmmq:MQPMO_*` options and values can combined using either '+' or '|'
# + return - An `ibmmq:Error` if the operation fails or else `()`
isolated remote function put(Message message) returns Error? =
isolated remote function put(Message message, int options = MQPMO_NO_SYNCPOINT) returns Error? =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Queue"
} external;
Expand Down Expand Up @@ -76,8 +78,10 @@ public isolated client class Topic {
#```
#
# + message - IBM MQ message
# + options - Options controlling the action of the put operation. Can be a combination of
# one or more `ibmmq:MQPMO_*` options and values can combined using either '+' or '|'
# + return - An `ibmmq:Error` if the operation fails or else `()`
isolated remote function put(Message message) returns Error? =
isolated remote function put(Message message, int options = MQPMO_NO_SYNCPOINT) returns Error? =
@java:Method {
'class: "io.ballerina.lib.ibm.ibmmq.Topic"
} external;
Expand Down
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased]

### Added

- [Introduce support to provide `MQPutMessageOptions` when invoking `put` operation on `ibmmq:Queue` or `ibmmq:Topic`](https://github.com/ballerina-platform/ballerina-library/issues/6966)

## [1.1.0] - 2024-08-28

### Added

- [Add Support for Retrieving Messages from IBM MQ by Matching Correlation ID and Message ID](https://github.com/ballerina-platform/ballerina-library/issues/6918)
Expand Down
8 changes: 6 additions & 2 deletions docs/spec/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,10 @@ An IBM MQ Queue enables applications to interact with an IBM MQ queue to exchang
# ```
#
# + message - IBM MQ message
# + options - Options controlling the action of the put operation. Can be a combination of
one or more `ibmmq:MQPMO_*` options and values can combined using either '+' or '|'
# + return - An `ibmmq:Error` if the operation fails or else `()`
isolated remote function put(ibmmq:Message message) returns ibmmq:Error?;
isolated remote function put(ibmmq:Message message, int options = ibmmq:MQPMO_NO_SYNCPOINT) returns ibmmq:Error?;
```
- To receive a message `get` function can be used.
Expand Down Expand Up @@ -492,8 +494,10 @@ An IBM MQ Topic enables applications to interact with an IBM MQ Topic to exchang
#```
#
# + message - IBM MQ message
# + options - Options controlling the action of the put operation. Can be a combination of
one or more `ibmmq:MQPMO_*` options and values can combined using either '+' or '|'
# + return - An `ibmmq:Error` if the operation fails or else `()`
isolated remote function put(ibmmq:Message message) returns ibmmq:Error?;
isolated remote function put(ibmmq:Message message, int options = ibmmq:MQPMO_NO_SYNCPOINT) returns ibmmq:Error?;
```
- To receive a message `get` function can be used.
Expand Down
8 changes: 6 additions & 2 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.constants.CMQC;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
Expand All @@ -44,13 +45,16 @@ public class Queue {
private static final ExecutorService QUEUE_EXECUTOR_SERVICE = Executors.newCachedThreadPool(
new MQThreadFactory("balx-ibmmq-queue-client-network-thread"));

public static Object put(Environment environment, BObject queueObject, BMap<BString, Object> message) {
public static Object put(Environment environment, BObject queueObject, BMap<BString, Object> message,
long options) {
MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE);
MQMessage mqMessage = CommonUtils.getMqMessageFromBMessage(message);
Future future = environment.markAsync();
QUEUE_EXECUTOR_SERVICE.execute(() -> {
try {
queue.put(mqMessage);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = (int) options;
queue.put(mqMessage, pmo);
future.complete(null);
} catch (MQException e) {
BError bError = createError(IBMMQ_ERROR,
Expand Down
7 changes: 5 additions & 2 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.CMQC;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
Expand All @@ -44,13 +45,15 @@ public class Topic {
private static final ExecutorService topicExecutorService = Executors.newCachedThreadPool(
new MQThreadFactory("balx-ibmmq-topic-client-network-thread"));

public static Object put(Environment environment, BObject topicObject, BMap message) {
public static Object put(Environment environment, BObject topicObject, BMap message, long options) {
MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC);
MQMessage mqMessage = CommonUtils.getMqMessageFromBMessage(message);
Future future = environment.markAsync();
topicExecutorService.execute(() -> {
try {
topic.put(mqMessage);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = (int) options;
topic.put(mqMessage, pmo);
future.complete(null);
} catch (Exception e) {
BError bError = createError(IBMMQ_ERROR,
Expand Down

0 comments on commit fe71a5c

Please sign in to comment.