diff --git a/.env b/.env
index 69042564..391c9f93 100644
--- a/.env
+++ b/.env
@@ -13,10 +13,10 @@ WORKFLOW_USER=wkflowmgr
WORKFLOW_PASS=wkflowmgr
WORKER_USER=worker
WORKER_PASS=worker
-ACTIVEMQ_ADMIN_USER=admin
-ACTIVEMQ_ADMIN_PASS=admin
-ACTIVEMQ_QUEUE_QUERY_URL="http://activemq:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName={queue}/QueueSize"
-ACTIVEMQ_REDUCTION_QUEUES=["CATALOG.ONCAT.DATA_READY", "REDUCTION.DATA_READY", "REDUCTION.HIMEM.DATA_READY", "REDUCTION_CATALOG.DATA_READY"]
+AMQ_USER=artemis
+AMQ_PASSWORD=artemis
+AMQ_QUEUE_QUERY_URL="http://activemq:8161/console/jolokia/read/org.apache.activemq.artemis:broker=%22Artemis-Broker%22,address=%22{queue}%22,component=addresses/MessageCount"
+AMQ_REDUCTION_QUEUES=["CATALOG.ONCAT.DATA_READY", "REDUCTION.DATA_READY", "REDUCTION.HIMEM.DATA_READY", "REDUCTION_CATALOG.DATA_READY"]
AMQ_BROKER=[["activemq", 61613]]
diff --git a/config/artemis/artemis-roles.properties b/config/artemis/artemis-roles.properties
new file mode 100644
index 00000000..d7b0f51f
--- /dev/null
+++ b/config/artemis/artemis-roles.properties
@@ -0,0 +1,20 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+amq = artemis,icat,user,username,wkflowmgr
+view = artemis,icat,user,username,wkflowmgr
+update = artemis,icat,user,username,wkflowmgr
diff --git a/config/artemis/artemis-users.properties b/config/artemis/artemis-users.properties
new file mode 100644
index 00000000..60c73e71
--- /dev/null
+++ b/config/artemis/artemis-users.properties
@@ -0,0 +1,23 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+artemis = ENC(1024:520D0FDC605E6F707FDE66B5DBFD5C314FA2D9A1EC3F8A3DD9955A26E7953742:0A28714B470DEA15664221B67B0587004DA34CA8541DC15CCB5C454BB90CC3CC6E47962F29C2F51AF59AF90F996A62515748B7C8C2BA74CC5C9AA4726C3581D5)
+icat = icat
+system = manager
+user = password
+username = worker
+wkflowmgr = wkflowmgr
diff --git a/config/artemis/broker.xml b/config/artemis/broker.xml
new file mode 100644
index 00000000..ccb67084
--- /dev/null
+++ b/config/artemis/broker.xml
@@ -0,0 +1,280 @@
+
+
+
+
+
+
+
+ Artemis-Broker
+
+
+ true
+
+
+ 1
+
+
+ ASYNCIO
+
+ data/paging
+
+ data/bindings
+
+ data/journal
+
+ data/large-messages
+
+
+
+
+
+
+ true
+
+ 2
+
+ 10
+
+ 4096
+
+ 10M
+
+
+ 44000
+
+
+
+ 4096
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5000
+
+
+ 98
+
+
+ true
+
+ 120000
+
+ 60000
+
+ HALT
+
+
+ 300000
+
+
+
+
+
+ -1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;stompEnableMessageId=true;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false
+
+
+ tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;stompEnableMessageId=true;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true
+
+
+ tcp://0.0.0.0:61613?anycastPrefix=/queue/;multicastPrefix=/topic/;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;stompEnableMessageId=true;useEpoll=true;connectionTtl=300000
+
+
+ tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;stompEnableMessageId=true;useEpoll=true
+
+
+ tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;stompEnableMessageId=true;useEpoll=true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ -1
+ 10
+ PAGE
+ true
+ true
+
+
+
+ DLQ
+ ExpiryQueue
+ 0
+
+ 10
+ PAGE
+ true
+ true
+ false
+ false
+
+ ANYCAST
+
+
+ 10M
+
+
+
+ -1
+ -1
+
+
+
+ -1
+ 20M
+
+
+ -1
+ -1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/config/artemis/management.xml b/config/artemis/management.xml
new file mode 100644
index 00000000..2e21ba9a
--- /dev/null
+++ b/config/artemis/management.xml
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docker-compose.yml b/docker-compose.yml
index 8134e69a..289e4bcd 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -103,15 +103,20 @@ services:
retries: 5
activemq:
- image: apache/activemq-classic
+ user: root
+ image: apache/activemq-artemis:latest-alpine
hostname: activemq
+ env_file:
+ - .env
+ environment:
+ AMQ_RESET_CONFIG: "true"
ports:
- - 8161:8161
- - 61613:61613
+ - 8161:8161
+ - 61613:61613
volumes:
- - ./src/workflow_app/workflow/icat_activemq.xml:/opt/apache-activemq/conf/activemq.xml
+ - ./config/artemis:/var/lib/artemis-instance/etc-override
healthcheck:
- test: "curl --silent --show-error -u admin:admin -H 'Origin:http://localhost' 'http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,service=Health' | grep --silent -e 'Good' -e 'Getting Worried'"
+ test: wget --no-verbose --tries=1 --spider http://localhost:8161 || exit 1
interval: 5s
timeout: 5s
retries: 5
diff --git a/src/dasmon_app/dasmon_listener/amq_consumer.py b/src/dasmon_app/dasmon_listener/amq_consumer.py
index 707f1a2d..cd14e228 100644
--- a/src/dasmon_app/dasmon_listener/amq_consumer.py
+++ b/src/dasmon_app/dasmon_listener/amq_consumer.py
@@ -574,8 +574,8 @@ def connect(self):
self._connection = self.get_connection()
logging.info("[%s] Subscribing to %s", self._consumer_name, str(self._queues))
- for q in self._queues:
- self._connection.subscribe(destination=q, id=1, ack="auto")
+ for i, q in enumerate(self._queues):
+ self._connection.subscribe(destination=q, id=i, ack="auto")
def _disconnect(self):
"""
diff --git a/src/webmon_app/reporting/dasmon/view_util.py b/src/webmon_app/reporting/dasmon/view_util.py
index ecd612d2..52359a18 100644
--- a/src/webmon_app/reporting/dasmon/view_util.py
+++ b/src/webmon_app/reporting/dasmon/view_util.py
@@ -23,6 +23,7 @@
import logging
import time
import socket
+from json.decoder import JSONDecodeError
import reporting.report.view_util as report_view_util
import requests
import reporting.pvmon.view_util as pvmon_view_util
@@ -622,20 +623,24 @@ def reduction_queue_sizes():
"""Send request to activemq to get the reduction queue size using jolokia api"""
logger = logging.getLogger(LOGNAME)
results = []
- url_template = settings.ACTIVEMQ_QUEUE_QUERY_URL
+ url_template = settings.AMQ_QUEUE_QUERY_URL
headers = headers = {"Origin": socket.gethostname()}
- auth = (settings.ACTIVEMQ_ADMIN_USER, settings.ACTIVEMQ_ADMIN_PASS)
- for queue in settings.ACTIVEMQ_REDUCTION_QUEUES:
+ auth = (settings.AMQ_USER, settings.AMQ_PASSWORD)
+ for queue in settings.AMQ_REDUCTION_QUEUES:
url = url_template.format(queue=queue)
try:
response = requests.get(url, auth=auth, headers=headers, timeout=1)
- if response.status_code == 200:
- results.append({"queue": queue, "size": response.json()["value"]})
- else:
- logger.error("Error getting queue size for %s: %s", queue, response.text)
except requests.exceptions.RequestException as e:
logger.error("Error getting queue size for %s: %s", queue, str(e))
+ else:
+ try:
+ if response.status_code == 200:
+ results.append({"queue": queue, "size": response.json()["value"]})
+ else:
+ logger.error("Error getting queue size for %s: %s", queue, response.text)
+ except (JSONDecodeError, KeyError):
+ logger.error("Error getting queue size for %s: %s", queue, response.text)
return results
diff --git a/src/webmon_app/reporting/reporting_app/settings/base.py b/src/webmon_app/reporting/reporting_app/settings/base.py
index 9473336d..1570ba8c 100644
--- a/src/webmon_app/reporting/reporting_app/settings/base.py
+++ b/src/webmon_app/reporting/reporting_app/settings/base.py
@@ -327,10 +327,10 @@ def validate_ldap_settings(server_uri, user_dn_template):
ICAT_USER = environ.get("ICAT_USER")
ICAT_PASSCODE = environ.get("ICAT_PASS")
-ACTIVEMQ_ADMIN_USER = environ.get("ACTIVEMQ_ADMIN_USER")
-ACTIVEMQ_ADMIN_PASS = environ.get("ACTIVEMQ_ADMIN_PASS")
-ACTIVEMQ_QUEUE_QUERY_URL = environ.get("ACTIVEMQ_QUEUE_QUERY_URL")
-ACTIVEMQ_REDUCTION_QUEUES = json.loads(environ.get("ACTIVEMQ_REDUCTION_QUEUES", "[]"))
+AMQ_USER = environ.get("AMQ_USER")
+AMQ_PASSWORD = environ.get("AMQ_PASSWORD")
+AMQ_QUEUE_QUERY_URL = environ.get("AMQ_QUEUE_QUERY_URL")
+AMQ_REDUCTION_QUEUES = json.loads(environ.get("AMQ_REDUCTION_QUEUES", "[]"))
HELPLINE_EMAIL = "adara_support@ornl.gov"
diff --git a/src/webmon_app/reporting/templates/dasmon/diagnostics.html b/src/webmon_app/reporting/templates/dasmon/diagnostics.html
index 587afc00..242f4b55 100644
--- a/src/webmon_app/reporting/templates/dasmon/diagnostics.html
+++ b/src/webmon_app/reporting/templates/dasmon/diagnostics.html
@@ -142,6 +142,7 @@
{% endif %}
+{% if reduction_queue_size %}
Reduction queue length:
-
+{% endif %}
{% endblock %}
diff --git a/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py b/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py
index 8624db62..6f8a9b3d 100644
--- a/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py
+++ b/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py
@@ -900,8 +900,8 @@ def test_reduction_queue_sizes(self, mock_get):
from reporting.dasmon.view_util import reduction_queue_sizes
with self.settings(
- ACTIVEMQ_QUEUE_QUERY_URL="http://activemq:8161/{queue}",
- ACTIVEMQ_REDUCTION_QUEUES=[f"queue{n}" for n in range(4)],
+ AMQ_QUEUE_QUERY_URL="http://activemq:8161/{queue}",
+ AMQ_REDUCTION_QUEUES=[f"queue{n}" for n in range(4)],
):
results = reduction_queue_sizes()
assert len(results) == 2
diff --git a/src/workflow_app/workflow/icat_activemq.xml b/src/workflow_app/workflow/icat_activemq.xml
deleted file mode 100644
index 57633c19..00000000
--- a/src/workflow_app/workflow/icat_activemq.xml
+++ /dev/null
@@ -1,170 +0,0 @@
-
-
-
-
-
-
-
- file:${activemq.conf}/credentials.properties
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-