Skip to content

Commit

Permalink
revert changes for test_autoreducer_high_memory
Browse files Browse the repository at this point in the history
  • Loading branch information
backmari committed Aug 30, 2024
1 parent a3ea27a commit 222d48e
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 31 deletions.
178 changes: 148 additions & 30 deletions tests/test_autoreducer_high_memory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""This is to test that the reduction tasks go to the correct autoreducer node
depending on if it requires high memoery or not"""

import psycopg2
import requests
import time

from tests.utils import db_utils
from dotenv import dotenv_values


class TestAutoreducerQueues:
Expand All @@ -14,16 +15,111 @@ class TestAutoreducerQueues:
IPTS = "IPTS-1234"
run_number = 12345

def set_reduction_request_queue(self, conn, queue_name):
def setup_class(cls):
config = {**dotenv_values(".env"), **dotenv_values(".env.ci")}
assert config
cls.conn = psycopg2.connect(
database=config["DATABASE_NAME"],
user=config["DATABASE_USER"],
password=config["DATABASE_PASS"],
port=config["DATABASE_PORT"],
host="localhost",
)
time.sleep(1)

def teardown_class(cls):
cls.conn.close()

def login(self, next, username, password):
# taken from test_RunPageView.py - consolidate as helper somewhere?
URL = "http://localhost/users/login?next="
client = requests.session()

# Retrieve the CSRF token first
client.get(URL) # sets the cookie
csrftoken = client.cookies["csrftoken"]

login_data = dict(username=username, password=password, csrfmiddlewaretoken=csrftoken)
return client.post(URL + next, data=login_data, timeout=None)

def create_test_data(self):
"""create the instrument, ipts and datarun if they don't already exist
returns the id for the created rundata"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()

cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()

if inst_id is None:
cursor.execute("INSERT INTO report_instrument (name) VALUES (%s);", (self.instrument,))
cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()
conn.commit()

cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
ipts_id = cursor.fetchone()
if ipts_id is None:
cursor.execute(
"INSERT INTO report_ipts (expt_name, created_on) VALUES (%s, %s);",
("IPTS-1234", "2020-05-20 13:02:52.281964;"),
)
cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
ipts_id = cursor.fetchone()
conn.commit()

cursor.execute(
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
(self.run_number, ipts_id[0], inst_id[0]),
)
run_id = cursor.fetchone()
if run_id is None:
cursor.execute(
"INSERT INTO report_datarun (run_number, ipts_id_id, instrument_id_id, file, created_on) "
"VALUES (%s, %s, %s, %s, %s);",
(
self.run_number,
ipts_id[0],
inst_id[0],
"/SNS/VULCAN/IPTS-1234/nexus/VULCAN_12345.nxs.h5",
"2020-05-20 13:02:52.281964;",
),
)
cursor.execute(
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
(self.run_number, ipts_id[0], inst_id[0]),
)
run_id = cursor.fetchone()
conn.commit()

return run_id

def get_status_queue_id(self, cursor, queue_name):
"""return the if for the statusqueue for the provided name"""
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
queue_id = cursor.fetchone()

if queue_id is None:
cursor.execute(
"INSERT INTO report_statusqueue (name, is_workflow_input) VALUES (%s, %s);", (queue_name, False)
)
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
queue_id = cursor.fetchone()

return queue_id[0]

def set_reduction_request_queue(self, queue_name):
"""create the task to send REDUCTION.REQUEST to the provided queue"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()

cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()[0]

queue_id = db_utils.get_status_queue_id(conn, queue_name)
success_queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.COMPLETE")
reduction_request_queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.REQUEST")
queue_id = self.get_status_queue_id(cursor, queue_name)
success_queue_id = self.get_status_queue_id(cursor, "REDUCTION.COMPLETE")
reduction_request_queue_id = self.get_status_queue_id(cursor, "REDUCTION.REQUEST")

cursor.execute(
"SELECT id FROM report_task where instrument_id_id = %s AND input_queue_id_id = %s;",
Expand Down Expand Up @@ -57,55 +153,77 @@ def set_reduction_request_queue(self, conn, queue_name):
)
conn.commit()

def get_autoreducer_hostname(self, conn, run_id):
def clear_previous_runstatus(self, run_id):
"""remove all previous run statuses for the given run_id"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
# delete all information entries for runstatus
cursor.execute(
"DELETE FROM report_information WHERE run_status_id_id IN (SELECT id FROM report_runstatus "
"WHERE run_id_id = %s);",
run_id,
)
cursor.execute("DELETE FROM report_runstatus WHERE run_id_id = %s;", run_id)
conn.commit()

def get_autoreducer_hostname(self, run_id):
"""return the hostname that executed the task that is stored in the report information"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.STARTED")
queue_id = self.get_status_queue_id(cursor, "REDUCTION.STARTED")
cursor.execute("SELECT id FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
runstatus_id = cursor.fetchone()[0]
cursor.execute("SELECT description FROM report_information WHERE run_status_id_id = %s", (runstatus_id,))
description = cursor.fetchone()[0]
return description

def test_normal_reduction_queue(self, db_connection, request_page):
def check_run_status_exist(self, run_id, queue_name):
"""return if the run status was created for the given run_id and queue_name"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
queue_id = self.get_status_queue_id(cursor, queue_name)
cursor.execute("SELECT * FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
return cursor.fetchone() is not None

def test_normal_reduction_queue(self):
# switch to the REDUCTION.DATA_READY queue and check that the task goes to the correct node
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
db_utils.clear_previous_runstatus(db_connection, run_id)
run_id = self.create_test_data()
self.clear_previous_runstatus(run_id)

self.set_reduction_request_queue(db_connection, "REDUCTION.DATA_READY")
self.set_reduction_request_queue("REDUCTION.DATA_READY")

# login and send reduction request
response = request_page("/report/vulcan/12345/reduce/", self.user, self.pwd)
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
assert response.status_code == 200
assert response.url.endswith("/report/vulcan/12345/")

# wait for database to get updated
time.sleep(30.0)
time.sleep(1.0)

assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
assert not db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.HIMEM.DATA_READY")
assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
assert self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
assert not self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")

assert self.get_autoreducer_hostname(db_connection, run_id) == "autoreducer"
assert self.get_autoreducer_hostname(run_id) == "autoreducer"

def test_himem_reduction_queue(self, db_connection, request_page):
def test_himem_reduction_queue(self):
# switch to the REDUCTION.HIMEM.DATA_READY queue and check that the task goes to the correct node
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
db_utils.clear_previous_runstatus(db_connection, run_id)
run_id = self.create_test_data()
self.clear_previous_runstatus(run_id)

self.set_reduction_request_queue(db_connection, "REDUCTION.HIMEM.DATA_READY")
self.set_reduction_request_queue("REDUCTION.HIMEM.DATA_READY")
# login and send reduction request
response = request_page("/report/vulcan/12345/reduce/", self.user, self.pwd)
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
assert response.status_code == 200
assert response.url.endswith("/report/vulcan/12345/")

# wait for database to get updated
time.sleep(30.0)
time.sleep(1.0)

assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
assert not db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.HIMEM.DATA_READY")
assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
assert not self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
assert self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")

assert self.get_autoreducer_hostname(db_connection, run_id) == "autoreducer.himem"
assert self.get_autoreducer_hostname(run_id) == "autoreducer.himem"
2 changes: 1 addition & 1 deletion tests/test_autoreducer_memory_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_reduction_script_exceeds_memory_limit(self, db_connection, request_page
assert response.url.endswith("/report/pg3/54321/")

# wait for job to be terminated and database to get updated
time.sleep(3.0)
time.sleep(10.0)

assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
Expand Down

0 comments on commit 222d48e

Please sign in to comment.