Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import sys
import re
import logging
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

# from selenium import webdriver
# from selenium.webdriver.chrome.options import Options


for handler in logging.root.handlers[:]:
Expand Down Expand Up @@ -48,33 +49,20 @@ def __init__(self):
self.retry_delay = 10


def _get_chrome_options(self, extra_options=None):
"""Get standardized Chrome options for headless browsing"""
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--ignore-ssl-errors")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_argument("--allow-running-insecure-content")
if extra_options:
for option in extra_options:
chrome_options.add_argument(option)
# def _get_chrome_options(self, extra_options=None):
# """Get standardized Chrome options for headless browsing"""
# chrome_options = Options()
# chrome_options.add_argument("--headless")
# chrome_options.add_argument("--no-sandbox")
# chrome_options.add_argument("--disable-dev-shm-usage")
# chrome_options.add_argument("--ignore-ssl-errors")
# chrome_options.add_argument("--ignore-certificate-errors")
# chrome_options.add_argument("--allow-running-insecure-content")
# if extra_options:
# for option in extra_options:
# chrome_options.add_argument(option)

return chrome_options

def _execute_command(self, command, description="command", raise_on_error=True):
"""Execute shell command with proper error handling"""
try:
logging.info(f"Executing {description}: {command}")
result = subprocess.check_output(command, shell=True, executable='/bin/bash')
return result.decode('utf-8')
except subprocess.CalledProcessError as e:
error_msg = f"Failed to execute {description}: {e}"
logging.error(error_msg)
if raise_on_error:
raise Exception(error_msg)
return None
# return chrome_options


def json_reader(self, tc, JSON_PATH):
Expand Down Expand Up @@ -137,7 +125,7 @@ def docker_compose_up(self, value):
"""Execute docker compose up and verify container status"""
try:
logging.info("Starting Docker containers with docker compose up...")
self._execute_command("docker compose up -d", description='docker compose up')
subprocess.check_output("docker compose up -d", shell=True, executable='/bin/bash')
time.sleep(5)
return self._verify_container_status(value)
except Exception as e:
Expand Down Expand Up @@ -208,77 +196,81 @@ def start_pipeline_and_check(self, value):
return
logging.info("Unsupported app type for pipeline start")
return
# Check current status
status_output = subprocess.check_output("./sample_status.sh", shell=True, executable='/bin/bash').decode('utf-8')
logging.info(f"sample_status.sh output: {status_output}")
if "No running pipelines" not in status_output:
raise Exception("Pipelines are already running")
logging.info("No pipelines are currently running - ready to start new pipeline")
# Start pipelines
cmd = "./sample_start.sh"
result = subprocess.run(cmd, shell=True, executable='/bin/bash', capture_output=True, text=True)
output = result.stdout
if app == "SP":
success_message = "Pipelines initialized."
if success_message not in output:
raise Exception(f"Pipeline start failed. Expected message not found: '{success_message}'")
return None
# app == "LD": extract response IDs
response_ids = []
for line in output.split('\n'):
id_matches = re.findall(r'[0-9a-f]{32}', line)
for match in id_matches:
if match not in response_ids:
response_ids.append(match)
if response_ids:
logging.info(f"Found {len(response_ids)} response IDs for LD: {response_ids}")
return response_ids
logging.error("No response IDs found in LD pipeline start output")
raise Exception("LD pipeline start did not return any response IDs")

else:
# Check current status
status_output = subprocess.check_output("./sample_status.sh", shell=True, executable='/bin/bash').decode('utf-8')
logging.info(f"sample_status.sh output: {status_output}")
if "No running pipelines" not in status_output:
raise Exception("Pipelines are already running")
logging.info("No pipelines are currently running - ready to start new pipeline")
# Start pipelines
cmd = "./sample_start.sh"
result = subprocess.run(cmd, shell=True, executable='/bin/bash', capture_output=True, text=True)
output = result.stdout
if app == "SP" or app == "LD":
success_message = "Pipelines initialized."
if success_message not in output:
raise Exception(f"Pipeline start failed. Expected message not found: '{success_message}'")
return None
response_ids = []
for line in output.split('\n'):
id_matches = re.findall(r'[0-9a-f]{32}', line)
for match in id_matches:
if match not in response_ids:
response_ids.append(match)
if response_ids:
logging.info(f"Found {len(response_ids)} response IDs for LD: {response_ids}")
return response_ids
raise Exception


def get_pipeline_status(self, value):
"""Optimized pipeline status check with real-time monitoring"""
try:
os.chdir(self.metro_path)
logging.info("Checking pipeline status with sample_status.sh")
with subprocess.Popen("./sample_status.sh", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, executable='/bin/bash') as process:
fps_reports = []
start_time = time.time()
# Monitor for up to 15 seconds or until we get sufficient data
while time.time() - start_time < 15:
line = process.stdout.readline()
if not line:
time.sleep(0.1)
continue
line = line.strip()
logging.info(f"Status: {line}")
# Extract FPS data efficiently
if "pipelines fps:" in line:
try:
start_idx = line.find('pipelines fps:')
open_idx = line.find('(', start_idx)
close_idx = line.find(')', open_idx)
if open_idx != -1 and close_idx != -1 and close_idx > open_idx:
inside = line[open_idx+1:close_idx].strip()
parts = [p for p in inside.split() if p]
fps_values = []
for p in parts:
try:
fps_values.append(float(p))
except:
continue
if fps_values:
fps_reports.append(fps_values)
avg_fps = sum(fps_values) / len(fps_values)
logging.info(f"FPS: {fps_values} (avg: {avg_fps:.2f})")
except Exception as e:
logging.warning(f"Failed to parse FPS line: {e}")
# Early exit if we have enough FPS data
if len(fps_reports) >= 2:
logging.info("Sufficient FPS data collected, terminating early")
break
return self._validate_fps_data(fps_reports)
app = value.get("app")
if app == "SI":
logging.info("SI app - skipping pipeline status. Not yet implemented")
return
else:
with subprocess.Popen("./sample_status.sh", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, executable='/bin/bash') as process:
fps_reports = []
start_time = time.time()
# Monitor for up to 15 seconds or until we get sufficient data
while time.time() - start_time < 15:
line = process.stdout.readline()
if not line:
time.sleep(0.1)
continue
line = line.strip()
logging.info(f"Status: {line}")
# Extract FPS data efficiently
if "pipelines fps:" in line:
try:
start_idx = line.find('pipelines fps:')
open_idx = line.find('(', start_idx)
close_idx = line.find(')', open_idx)
if open_idx != -1 and close_idx != -1 and close_idx > open_idx:
inside = line[open_idx+1:close_idx].strip()
parts = [p for p in inside.split() if p]
fps_values = []
for p in parts:
try:
fps_values.append(float(p))
except:
continue
if fps_values:
fps_reports.append(fps_values)
avg_fps = sum(fps_values) / len(fps_values)
logging.info(f"FPS: {fps_values} (avg: {avg_fps:.2f})")
except Exception as e:
logging.warning(f"Failed to parse FPS line: {e}")
# Early exit if we have enough FPS data
if len(fps_reports) >= 2:
logging.info("Sufficient FPS data collected, terminating early")
break
return self._validate_fps_data(fps_reports)
except Exception as e:
raise Exception(f"Pipeline status check failed: {e}")

Expand Down Expand Up @@ -381,61 +373,61 @@ def search_element(self, logFile, keyword):
return False


def verify_grafana_url(self, value):
"""Verify Grafana Dashboard at different ports based on deployment type"""
driver = None
try:
logging.info(f"Verifying Grafana Dashboard")
chrome_options = self._get_chrome_options()
driver = webdriver.Chrome(options=chrome_options)
driver.implicitly_wait(10)
if value.get("app") == "SI":
login_url = f"http://{hostIP}:3000/login"
dashboard_url = f"http://{hostIP}:3000/dashboards"
post_success_log = "Grafana Dashboard is accessible and showing data for SI"
else:
logging.info("Detected docker deployment - using standard grafana path")
login_url = f"https://{hostIP}/grafana/login"
dashboard_url = f"https://{hostIP}/grafana/dashboards"
post_success_log = "Grafana Dashboard is accessible and showing data"

# Navigate to login page and ensure it's accessible
driver.get(login_url)
assert "404" not in driver.title, "Grafana login page not accessible"
# Perform login
username_input = driver.find_element("name", "user")
password_input = driver.find_element("name", "password")
username_input.send_keys("admin")
password_input.send_keys("admin")
driver.find_element("css selector", "button[type='submit']").click()
driver.implicitly_wait(5)

# Handle docker password change prompt if it appears
if value.get("app") != "SI":
try:
if "change-password" in driver.current_url or "password" in driver.page_source.lower():
logging.info("Password change prompt detected, skipping...")
try:
skip_button = driver.find_element("xpath", "//button[contains(text(), 'Skip')]")
skip_button.click()
except:
driver.get(login_url.replace('/login', ''))
except:
pass

# Verify login success and dashboard accessibility
assert "Grafana" in driver.title or "Home" in driver.page_source, "Grafana login failed"
driver.get(dashboard_url)
driver.implicitly_wait(10)
assert "No data" not in driver.page_source, "Grafana dashboard is not showing data"
logging.info(post_success_log)
return True
except Exception as e:
logging.error(f"Failed to verify Grafana URL: {e}")
raise Exception(f"Grafana URL verification failed: {e}")
finally:
if driver:
driver.quit()
# def verify_grafana_url(self, value):
# """Verify Grafana Dashboard at different ports based on deployment type"""
# driver = None
# try:
# logging.info(f"Verifying Grafana Dashboard")
# chrome_options = self._get_chrome_options()
# driver = webdriver.Chrome(options=chrome_options)
# driver.implicitly_wait(10)
# if value.get("app") == "SI":
# login_url = f"http://{hostIP}:3000/login"
# dashboard_url = f"http://{hostIP}:3000/dashboards"
# post_success_log = "Grafana Dashboard is accessible and showing data for SI"
# else:
# logging.info("Detected docker deployment - using standard grafana path")
# login_url = f"https://{hostIP}/grafana/login"
# dashboard_url = f"https://{hostIP}/grafana/dashboards"
# post_success_log = "Grafana Dashboard is accessible and showing data"

# # Navigate to login page and ensure it's accessible
# driver.get(login_url)
# assert "404" not in driver.title, "Grafana login page not accessible"
# # Perform login
# username_input = driver.find_element("name", "user")
# password_input = driver.find_element("name", "password")
# username_input.send_keys("admin")
# password_input.send_keys("admin")
# driver.find_element("css selector", "button[type='submit']").click()
# driver.implicitly_wait(5)

# # Handle docker password change prompt if it appears
# if value.get("app") != "SI":
# try:
# if "change-password" in driver.current_url or "password" in driver.page_source.lower():
# logging.info("Password change prompt detected, skipping...")
# try:
# skip_button = driver.find_element("xpath", "//button[contains(text(), 'Skip')]")
# skip_button.click()
# except:
# driver.get(login_url.replace('/login', ''))
# except:
# pass

# # Verify login success and dashboard accessibility
# assert "Grafana" in driver.title or "Home" in driver.page_source, "Grafana login failed"
# driver.get(dashboard_url)
# driver.implicitly_wait(10)
# assert "No data" not in driver.page_source, "Grafana dashboard is not showing data"
# logging.info(post_success_log)
# return True
# except Exception as e:
# logging.error(f"Failed to verify Grafana URL: {e}")
# raise Exception(f"Grafana URL verification failed: {e}")
# finally:
# if driver:
# driver.quit()


def stop_pipeline_and_check(self, value):
Expand Down Expand Up @@ -485,8 +477,8 @@ def _verify_no_pipelines_running(self):
except Exception as e:
logging.error(f"Error verifying pipeline stop: {e}")
return False


def docker_compose_down(self):
"""Bring down docker-compose services for the metro project and report remaining containers.

Expand All @@ -496,14 +488,11 @@ def docker_compose_down(self):
logging.info('Stopping services with docker compose down')
os.chdir(self.metro_path)
try:
self._execute_command("docker compose down -v", description='docker compose down')
logging.info("Docker compose down executed successfully")
subprocess.check_output("docker compose down -v", shell=True, executable='/bin/bash')
time.sleep(3)
logging.info('Verifying no services are running')

docker_ps_output = self._execute_command("docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}'", description='docker ps')
if docker_ps_output is None:
docker_ps_output = ""
docker_ps_output = subprocess.check_output("docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}'", shell=True, executable='/bin/bash').decode('utf-8')
logging.info(f"Current running containers: {docker_ps_output}")
lines = docker_ps_output.strip().split('\n')[1:]
running_containers = []
Expand All @@ -527,5 +516,5 @@ def docker_compose_down(self):
else:
logging.info("No project-related containers are running")
logging.info("Services stopped successfully")
except subprocess.CalledProcessError as e:
raise Exception
except Exception as e:
logging.error(f"Error in docker_compose_down: {e}")
Loading