From 9b0e7a28641401344971a66a380e21e88dd754a6 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Tue, 19 Nov 2024 12:48:28 +1100 Subject: [PATCH] Switch ActiveMQ from classic to artemis --- .env | 8 +- config/artemis/artemis-roles.properties | 20 ++ config/artemis/artemis-users.properties | 23 ++ config/artemis/broker.xml | 280 ++++++++++++++++++ config/artemis/management.xml | 65 ++++ docker-compose.yml | 15 +- .../dasmon_listener/amq_consumer.py | 4 +- src/webmon_app/reporting/dasmon/view_util.py | 19 +- .../reporting/reporting_app/settings/base.py | 8 +- .../templates/dasmon/diagnostics.html | 3 +- .../tests/test_dasmon/test_view_util.py | 4 +- src/workflow_app/workflow/icat_activemq.xml | 170 ----------- 12 files changed, 424 insertions(+), 195 deletions(-) create mode 100644 config/artemis/artemis-roles.properties create mode 100644 config/artemis/artemis-users.properties create mode 100644 config/artemis/broker.xml create mode 100644 config/artemis/management.xml delete mode 100644 src/workflow_app/workflow/icat_activemq.xml diff --git a/.env b/.env index 69042564..391c9f93 100644 --- a/.env +++ b/.env @@ -13,10 +13,10 @@ WORKFLOW_USER=wkflowmgr WORKFLOW_PASS=wkflowmgr WORKER_USER=worker WORKER_PASS=worker -ACTIVEMQ_ADMIN_USER=admin -ACTIVEMQ_ADMIN_PASS=admin -ACTIVEMQ_QUEUE_QUERY_URL="http://activemq:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName={queue}/QueueSize" -ACTIVEMQ_REDUCTION_QUEUES=["CATALOG.ONCAT.DATA_READY", "REDUCTION.DATA_READY", "REDUCTION.HIMEM.DATA_READY", "REDUCTION_CATALOG.DATA_READY"] +AMQ_USER=artemis +AMQ_PASSWORD=artemis +AMQ_QUEUE_QUERY_URL="http://activemq:8161/console/jolokia/read/org.apache.activemq.artemis:broker=%22Artemis-Broker%22,address=%22{queue}%22,component=addresses/MessageCount" +AMQ_REDUCTION_QUEUES=["CATALOG.ONCAT.DATA_READY", "REDUCTION.DATA_READY", "REDUCTION.HIMEM.DATA_READY", "REDUCTION_CATALOG.DATA_READY"] AMQ_BROKER=[["activemq", 61613]] diff --git a/config/artemis/artemis-roles.properties b/config/artemis/artemis-roles.properties new file mode 100644 index 00000000..d7b0f51f --- /dev/null +++ b/config/artemis/artemis-roles.properties @@ -0,0 +1,20 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +amq = artemis,icat,user,username,wkflowmgr +view = artemis,icat,user,username,wkflowmgr +update = artemis,icat,user,username,wkflowmgr diff --git a/config/artemis/artemis-users.properties b/config/artemis/artemis-users.properties new file mode 100644 index 00000000..60c73e71 --- /dev/null +++ b/config/artemis/artemis-users.properties @@ -0,0 +1,23 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +artemis = ENC(1024:520D0FDC605E6F707FDE66B5DBFD5C314FA2D9A1EC3F8A3DD9955A26E7953742:0A28714B470DEA15664221B67B0587004DA34CA8541DC15CCB5C454BB90CC3CC6E47962F29C2F51AF59AF90F996A62515748B7C8C2BA74CC5C9AA4726C3581D5) +icat = icat +system = manager +user = password +username = worker +wkflowmgr = wkflowmgr diff --git a/config/artemis/broker.xml b/config/artemis/broker.xml new file mode 100644 index 00000000..ccb67084 --- /dev/null +++ b/config/artemis/broker.xml @@ -0,0 +1,280 @@ + + + + + + + + Artemis-Broker + + + true + + + 1 + + + ASYNCIO + + data/paging + + data/bindings + + data/journal + + data/large-messages + + + + + + + true + + 2 + + 10 + + 4096 + + 10M + + + 44000 + + + + 4096 + + + + + + + + + + + + + + + + + + + + + 5000 + + + 98 + + + true + + 120000 + + 60000 + + HALT + + + 300000 + + + + + + -1 + + + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;stompEnableMessageId=true;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;stompEnableMessageId=true;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true + + + tcp://0.0.0.0:61613?anycastPrefix=/queue/;multicastPrefix=/topic/;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;stompEnableMessageId=true;useEpoll=true;connectionTtl=300000 + + + tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;stompEnableMessageId=true;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;stompEnableMessageId=true;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + 10 + PAGE + true + true + false + false + + ANYCAST + + + 10M + + + + -1 + -1 + + + + -1 + 20M + + + -1 + -1 + + + + +
+ + + +
+
+ + + +
+
+ + + +
+
diff --git a/config/artemis/management.xml b/config/artemis/management.xml new file mode 100644 index 00000000..2e21ba9a --- /dev/null +++ b/config/artemis/management.xml @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docker-compose.yml b/docker-compose.yml index 8134e69a..289e4bcd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -103,15 +103,20 @@ services: retries: 5 activemq: - image: apache/activemq-classic + user: root + image: apache/activemq-artemis:latest-alpine hostname: activemq + env_file: + - .env + environment: + AMQ_RESET_CONFIG: "true" ports: - - 8161:8161 - - 61613:61613 + - 8161:8161 + - 61613:61613 volumes: - - ./src/workflow_app/workflow/icat_activemq.xml:/opt/apache-activemq/conf/activemq.xml + - ./config/artemis:/var/lib/artemis-instance/etc-override healthcheck: - test: "curl --silent --show-error -u admin:admin -H 'Origin:http://localhost' 'http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,service=Health' | grep --silent -e 'Good' -e 'Getting Worried'" + test: wget --no-verbose --tries=1 --spider http://localhost:8161 || exit 1 interval: 5s timeout: 5s retries: 5 diff --git a/src/dasmon_app/dasmon_listener/amq_consumer.py b/src/dasmon_app/dasmon_listener/amq_consumer.py index 707f1a2d..cd14e228 100644 --- a/src/dasmon_app/dasmon_listener/amq_consumer.py +++ b/src/dasmon_app/dasmon_listener/amq_consumer.py @@ -574,8 +574,8 @@ def connect(self): self._connection = self.get_connection() logging.info("[%s] Subscribing to %s", self._consumer_name, str(self._queues)) - for q in self._queues: - self._connection.subscribe(destination=q, id=1, ack="auto") + for i, q in enumerate(self._queues): + self._connection.subscribe(destination=q, id=i, ack="auto") def _disconnect(self): """ diff --git a/src/webmon_app/reporting/dasmon/view_util.py b/src/webmon_app/reporting/dasmon/view_util.py index ecd612d2..52359a18 100644 --- a/src/webmon_app/reporting/dasmon/view_util.py +++ b/src/webmon_app/reporting/dasmon/view_util.py @@ -23,6 +23,7 @@ import logging import time import socket +from json.decoder import JSONDecodeError import reporting.report.view_util as report_view_util import requests import reporting.pvmon.view_util as pvmon_view_util @@ -622,20 +623,24 @@ def reduction_queue_sizes(): """Send request to activemq to get the reduction queue size using jolokia api""" logger = logging.getLogger(LOGNAME) results = [] - url_template = settings.ACTIVEMQ_QUEUE_QUERY_URL + url_template = settings.AMQ_QUEUE_QUERY_URL headers = headers = {"Origin": socket.gethostname()} - auth = (settings.ACTIVEMQ_ADMIN_USER, settings.ACTIVEMQ_ADMIN_PASS) - for queue in settings.ACTIVEMQ_REDUCTION_QUEUES: + auth = (settings.AMQ_USER, settings.AMQ_PASSWORD) + for queue in settings.AMQ_REDUCTION_QUEUES: url = url_template.format(queue=queue) try: response = requests.get(url, auth=auth, headers=headers, timeout=1) - if response.status_code == 200: - results.append({"queue": queue, "size": response.json()["value"]}) - else: - logger.error("Error getting queue size for %s: %s", queue, response.text) except requests.exceptions.RequestException as e: logger.error("Error getting queue size for %s: %s", queue, str(e)) + else: + try: + if response.status_code == 200: + results.append({"queue": queue, "size": response.json()["value"]}) + else: + logger.error("Error getting queue size for %s: %s", queue, response.text) + except (JSONDecodeError, KeyError): + logger.error("Error getting queue size for %s: %s", queue, response.text) return results diff --git a/src/webmon_app/reporting/reporting_app/settings/base.py b/src/webmon_app/reporting/reporting_app/settings/base.py index 9473336d..1570ba8c 100644 --- a/src/webmon_app/reporting/reporting_app/settings/base.py +++ b/src/webmon_app/reporting/reporting_app/settings/base.py @@ -327,10 +327,10 @@ def validate_ldap_settings(server_uri, user_dn_template): ICAT_USER = environ.get("ICAT_USER") ICAT_PASSCODE = environ.get("ICAT_PASS") -ACTIVEMQ_ADMIN_USER = environ.get("ACTIVEMQ_ADMIN_USER") -ACTIVEMQ_ADMIN_PASS = environ.get("ACTIVEMQ_ADMIN_PASS") -ACTIVEMQ_QUEUE_QUERY_URL = environ.get("ACTIVEMQ_QUEUE_QUERY_URL") -ACTIVEMQ_REDUCTION_QUEUES = json.loads(environ.get("ACTIVEMQ_REDUCTION_QUEUES", "[]")) +AMQ_USER = environ.get("AMQ_USER") +AMQ_PASSWORD = environ.get("AMQ_PASSWORD") +AMQ_QUEUE_QUERY_URL = environ.get("AMQ_QUEUE_QUERY_URL") +AMQ_REDUCTION_QUEUES = json.loads(environ.get("AMQ_REDUCTION_QUEUES", "[]")) HELPLINE_EMAIL = "adara_support@ornl.gov" diff --git a/src/webmon_app/reporting/templates/dasmon/diagnostics.html b/src/webmon_app/reporting/templates/dasmon/diagnostics.html index 587afc00..242f4b55 100644 --- a/src/webmon_app/reporting/templates/dasmon/diagnostics.html +++ b/src/webmon_app/reporting/templates/dasmon/diagnostics.html @@ -142,6 +142,7 @@ {% endif %} +{% if reduction_queue_size %}

Reduction queue length:

@@ -157,7 +158,7 @@

- +{% endif %} {% endblock %} diff --git a/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py b/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py index 8624db62..6f8a9b3d 100644 --- a/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py +++ b/src/webmon_app/reporting/tests/test_dasmon/test_view_util.py @@ -900,8 +900,8 @@ def test_reduction_queue_sizes(self, mock_get): from reporting.dasmon.view_util import reduction_queue_sizes with self.settings( - ACTIVEMQ_QUEUE_QUERY_URL="http://activemq:8161/{queue}", - ACTIVEMQ_REDUCTION_QUEUES=[f"queue{n}" for n in range(4)], + AMQ_QUEUE_QUERY_URL="http://activemq:8161/{queue}", + AMQ_REDUCTION_QUEUES=[f"queue{n}" for n in range(4)], ): results = reduction_queue_sizes() assert len(results) == 2 diff --git a/src/workflow_app/workflow/icat_activemq.xml b/src/workflow_app/workflow/icat_activemq.xml deleted file mode 100644 index 57633c19..00000000 --- a/src/workflow_app/workflow/icat_activemq.xml +++ /dev/null @@ -1,170 +0,0 @@ - - - - - - - - file:${activemq.conf}/credentials.properties - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -