From e6ff07f0fa1321dba09e3b79d50adba51a5707ed Mon Sep 17 00:00:00 2001 From: Shriyansh Agnihotri Date: Tue, 10 Dec 2024 16:08:44 +0530 Subject: [PATCH] Adding skip browser close to carry forward context from one test to another, not recommended feature --- testzeus_hercules/__main__.py | 16 +++++-- testzeus_hercules/config.py | 55 ++++++++++++++++++----- testzeus_hercules/core/runner.py | 50 ++++++++++++++++----- testzeus_hercules/utils/gherkin_helper.py | 30 ++++++++++--- 4 files changed, 117 insertions(+), 34 deletions(-) diff --git a/testzeus_hercules/__main__.py b/testzeus_hercules/__main__.py index 48dccaf..b851244 100644 --- a/testzeus_hercules/__main__.py +++ b/testzeus_hercules/__main__.py @@ -8,6 +8,7 @@ get_junit_xml_base_path, get_source_log_folder_path, set_default_test_id, + get_dont_close_browser, ) from testzeus_hercules.core.runner import SingleCommandInputRunner from testzeus_hercules.telemetry import EventData, EventType, add_event @@ -40,13 +41,16 @@ def sequential_process() -> None: 6. Merges all JUnit XML results into a single file. 7. Logs the location of the final result file. """ - list_of_feats = process_feature_file() + dont_close_browser = get_dont_close_browser() + list_of_feats = process_feature_file(pass_background_to_all=dont_close_browser) input_gherkin_file_path = get_input_gherkin_file_path() # get name of the feature file using os package feature_file_name = os.path.basename(input_gherkin_file_path) result_of_tests = [] - final_result_file_name = f"{get_junit_xml_base_path()}/{feature_file_name}_result.xml" + final_result_file_name = ( + f"{get_junit_xml_base_path()}/{feature_file_name}_result.xml" + ) add_event(EventType.RUN, EventData(detail="Total Runs: " + str(len(list_of_feats)))) for feat in list_of_feats: file_path = feat["output_file"] @@ -65,6 +69,7 @@ def sequential_process() -> None: runner = SingleCommandInputRunner( stake_id=stake_id, command=cmd, + dont_terminate_browser_after_run=dont_close_browser, ) asyncio.run(runner.start()) @@ -94,14 +99,17 @@ def sequential_process() -> None: proofs_video_path=runner.browser_manager.get_latest_video_path(), network_logs_path=runner.browser_manager.request_response_log_file, logs_path=get_source_log_folder_path(stake_id), - planner_thoughts_path=get_source_log_folder_path(stake_id) + "/chat_messages.json", + planner_thoughts_path=get_source_log_folder_path(stake_id) + + "/chat_messages.json", ) ) JUnitXMLGenerator.merge_junit_xml(result_of_tests, final_result_file_name) logger.info(f"Results published in junitxml file: {final_result_file_name}") # building html from junitxml - final_result_html_file_name = f"{get_junit_xml_base_path()}/{feature_file_name}_result.html" + final_result_html_file_name = ( + f"{get_junit_xml_base_path()}/{feature_file_name}_result.html" + ) prepare_html([final_result_file_name, final_result_html_file_name]) logger.info(f"Results published in html file: {final_result_html_file_name}") diff --git a/testzeus_hercules/config.py b/testzeus_hercules/config.py index 396790a..f839ab1 100644 --- a/testzeus_hercules/config.py +++ b/testzeus_hercules/config.py @@ -16,9 +16,15 @@ def arguments() -> None: - parser = argparse.ArgumentParser(description="Hercules: The World's First Open-Source AI Agent for End-to-End Testing") - parser.add_argument("--input-file", type=str, help="Path to the input file.", required=False) - parser.add_argument("--output-path", type=str, help="Path to the output directory.", required=False) + parser = argparse.ArgumentParser( + description="Hercules: The World's First Open-Source AI Agent for End-to-End Testing" + ) + parser.add_argument( + "--input-file", type=str, help="Path to the input file.", required=False + ) + parser.add_argument( + "--output-path", type=str, help="Path to the output directory.", required=False + ) parser.add_argument( "--test-data-path", type=str, @@ -76,11 +82,17 @@ def arguments() -> None: agents_llm_config_file_ref_key = os.environ.get("AGENTS_LLM_CONFIG_FILE_REF_KEY") -if (llm_model_name and llm_model_api_key) and (agents_llm_config_file or agents_llm_config_file_ref_key): - logger.error("Provide either LLM_MODEL_NAME and LLM_MODEL_API_KEY together, or AGENTS_LLM_CONFIG_FILE and AGENTS_LLM_CONFIG_FILE_REF_KEY together, not both.") +if (llm_model_name and llm_model_api_key) and ( + agents_llm_config_file or agents_llm_config_file_ref_key +): + logger.error( + "Provide either LLM_MODEL_NAME and LLM_MODEL_API_KEY together, or AGENTS_LLM_CONFIG_FILE and AGENTS_LLM_CONFIG_FILE_REF_KEY together, not both." + ) exit(1) -if (not llm_model_name or not llm_model_api_key) and (not agents_llm_config_file or not agents_llm_config_file_ref_key): +if (not llm_model_name or not llm_model_api_key) and ( + not agents_llm_config_file or not agents_llm_config_file_ref_key +): logger.error( "Either LLM_MODEL_NAME and LLM_MODEL_API_KEY must be set together, or AGENTS_LLM_CONFIG_FILE and AGENTS_LLM_CONFIG_FILE_REF_KEY must be set together. user --llm-model and --llm-model-api-key in hercules command" ) @@ -97,12 +109,18 @@ def arguments() -> None: PROJECT_TEMP_PATH = os.path.join(PROJECT_ROOT, "temp") PROJECT_TEST_ROOT = os.path.join(PROJECT_ROOT, "test") -INPUT_GHERKIN_FILE_PATH = os.environ.get("INPUT_GHERKIN_FILE_PATH") or os.path.join(PROJECT_ROOT, "input/test.feature") +INPUT_GHERKIN_FILE_PATH = os.environ.get("INPUT_GHERKIN_FILE_PATH") or os.path.join( + PROJECT_ROOT, "input/test.feature" +) TMP_GHERKIN_PATH = os.path.join(PROJECT_ROOT, "gherkin_files") -JUNIT_XML_BASE_PATH = os.environ.get("JUNIT_XML_BASE_PATH") or os.path.join(PROJECT_ROOT, "output") +JUNIT_XML_BASE_PATH = os.environ.get("JUNIT_XML_BASE_PATH") or os.path.join( + PROJECT_ROOT, "output" +) SOURCE_LOG_FOLDER_PATH = os.path.join(PROJECT_ROOT, "log_files") -TEST_DATA_PATH = os.environ.get("TEST_DATA_PATH") or os.path.join(PROJECT_ROOT, "test_data") +TEST_DATA_PATH = os.environ.get("TEST_DATA_PATH") or os.path.join( + PROJECT_ROOT, "test_data" +) SCREEN_SHOT_PATH = os.path.join(PROJECT_ROOT, "proofs") if "HF_HOME" not in os.environ: @@ -112,6 +130,13 @@ def arguments() -> None: os.environ["TOKENIZERS_PARALLELISM"] = "false" +def get_dont_close_browser() -> bool: + """ + Check if the system should close the browser after running the test. + """ + return os.environ.get("DONT_CLOSE_BROWSER", "false").lower().strip() == "true" + + def get_cdp_config() -> dict | None: """ Get the CDP config. @@ -192,7 +217,9 @@ def get_input_gherkin_file_path() -> str: base_path = os.path.dirname(INPUT_GHERKIN_FILE_PATH) if not os.path.exists(base_path): os.makedirs(base_path) - logger.info(f"Created INPUT_GHERKIN_FILE_PATH folder at: {INPUT_GHERKIN_FILE_PATH}") + logger.info( + f"Created INPUT_GHERKIN_FILE_PATH folder at: {INPUT_GHERKIN_FILE_PATH}" + ) return INPUT_GHERKIN_FILE_PATH @@ -236,7 +263,9 @@ def get_source_log_folder_path(test_id: Optional[str] = None) -> str: source_log_folder_path = os.path.join(SOURCE_LOG_FOLDER_PATH, test_id) if not os.path.exists(source_log_folder_path): os.makedirs(source_log_folder_path) - logger.info(f"Created source_log_folder_path folder at: {source_log_folder_path}") + logger.info( + f"Created source_log_folder_path folder at: {source_log_folder_path}" + ) return source_log_folder_path @@ -299,4 +328,6 @@ def get_project_temp_path(test_id: Optional[str] = None) -> str: "BROWSER_TYPE": get_browser_type(), "CAPTURE_NETWORK": should_capture_network(), } -add_event(EventType.CONFIG, EventData(detail="General Config", additional_data=config_brief)) +add_event( + EventType.CONFIG, EventData(detail="General Config", additional_data=config_brief) +) diff --git a/testzeus_hercules/core/runner.py b/testzeus_hercules/core/runner.py index 879f961..3a8e3c4 100644 --- a/testzeus_hercules/core/runner.py +++ b/testzeus_hercules/core/runner.py @@ -27,6 +27,7 @@ def __init__( planner_max_chat_round: int = 50, browser_nav_max_chat_round: int = 10, stake_id: str | None = None, + dont_terminate_browser_after_run: bool = False, ): self.planner_number_of_rounds = planner_max_chat_round self.browser_number_of_rounds = browser_nav_max_chat_round @@ -34,8 +35,11 @@ def __init__( self.autogen_wrapper = None self.is_running = False self.stake_id = stake_id + self.dont_terminate_browser_after_run = dont_terminate_browser_after_run - self.save_chat_logs_to_files = os.getenv("SAVE_CHAT_LOGS_TO_FILE", "True").lower() in ["true", "1"] + self.save_chat_logs_to_files = os.getenv( + "SAVE_CHAT_LOGS_TO_FILE", "True" + ).lower() in ["true", "1"] self.planner_agent_name = "planner_agent" self.shutdown_event = asyncio.Event() @@ -56,7 +60,9 @@ async def initialize(self) -> None: browser_nav_max_chat_round=self.browser_number_of_rounds, ) - self.browser_manager = PlaywrightManager(gui_input_mode=False, stake_id=self.stake_id) + self.browser_manager = PlaywrightManager( + gui_input_mode=False, stake_id=self.stake_id + ) await self.browser_manager.async_initialize() async def process_command(self, command: str) -> tuple[Any, float]: @@ -80,13 +86,19 @@ async def process_command(self, command: str) -> tuple[Any, float]: if command: self.is_running = True start_time = time.time() - current_url = await self.browser_manager.get_current_url() if self.browser_manager else None + current_url = ( + await self.browser_manager.get_current_url() + if self.browser_manager + else None + ) self.browser_manager.log_user_message(command) # type: ignore result = None logger.info(f"Processing command: {command}") if self.autogen_wrapper: await self.browser_manager.update_processing_state("processing") # type: ignore - result = await self.autogen_wrapper.process_command(command, current_url) + result = await self.autogen_wrapper.process_command( + command, current_url + ) await self.browser_manager.update_processing_state("done") # type: ignore end_time = time.time() elapsed_time = round(end_time - start_time, 2) @@ -95,7 +107,11 @@ async def process_command(self, command: str) -> tuple[Any, float]: if result is not None: chat_history = result.chat_history # type: ignore last_message = chat_history[-1] if chat_history else None # type: ignore - if last_message and "terminate" in last_message and last_message["terminate"] == "yes": + if ( + last_message + and "terminate" in last_message + and last_message["terminate"] == "yes" + ): await self.browser_manager.notify_user(last_message, "answer") # type: ignore await self.browser_manager.notify_user(f"Task Completed ({elapsed_time}s).", "info") # type: ignore @@ -107,7 +123,9 @@ async def save_planner_chat_messages(self) -> None: """ Saves chat messages to a file or logs them based on configuration. """ - messages = self.autogen_wrapper.agents_map[self.planner_agent_name].chat_messages + messages = self.autogen_wrapper.agents_map[ + self.planner_agent_name + ].chat_messages messages_str_keys = {str(key): value for key, value in messages.items()} res_output_thoughts_logs_di = {} for key, value in messages_str_keys.items(): @@ -126,7 +144,9 @@ async def save_planner_chat_messages(self) -> None: try: res_content = json.loads(content) except json.JSONDecodeError: - logger.debug(f"Failed to decode JSON: {content}, keeping as multiline string") + logger.debug( + f"Failed to decode JSON: {content}, keeping as multiline string" + ) res_content = content res_output_thoughts_logs_di[key][idx]["content"] = res_content @@ -180,7 +200,9 @@ async def start(self) -> None: """ await self.initialize() while not self.is_running: - command: str = await async_input("Enter your command (or type 'exit' to quit): ") + command: str = await async_input( + "Enter your command (or type 'exit' to quit): " + ) await self.process_command(command) if self.shutdown_event.is_set(): break @@ -192,7 +214,12 @@ class SingleCommandInputRunner(BaseRunner): A runner that handles input command and return the result. """ - def __init__(self, command: str, *args: Any, **kwargs: Any) -> None: + def __init__( + self, + command: str, + *args: Any, + **kwargs: Any, + ) -> None: super().__init__(*args, **kwargs) self.command = command self.result = None @@ -204,5 +231,6 @@ async def start(self) -> None: """ await self.initialize() self.result, self.execution_time = await self.process_command(self.command) - _ = await self.process_command("exit") - await self.wait_for_exit() + if not self.dont_terminate_browser_after_run: + _ = await self.process_command("exit") + await self.wait_for_exit() diff --git a/testzeus_hercules/utils/gherkin_helper.py b/testzeus_hercules/utils/gherkin_helper.py index d29be66..dbce740 100644 --- a/testzeus_hercules/utils/gherkin_helper.py +++ b/testzeus_hercules/utils/gherkin_helper.py @@ -9,7 +9,9 @@ input_gherkin_file_path = get_input_gherkin_file_path() -def split_feature_file(input_file: str, output_dir: str) -> List[Dict[str, str]]: +def split_feature_file( + input_file: str, output_dir: str, dont_append_header: bool = False +) -> List[Dict[str, str]]: """ Splits a single BDD feature file into multiple feature files, with each file containing a single scenario. The script preserves the feature-level content that should be shared across all scenario files. @@ -17,6 +19,7 @@ def split_feature_file(input_file: str, output_dir: str) -> List[Dict[str, str]] Parameters: input_file (str): Path to the input BDD feature file. output_dir (str): Path to the directory where the split feature files will be saved. + dont_append_header (bool): If True, the Feature header is only added to the first extracted scenario file. Returns: list: A list of dictionaries containing feature, scenario, and output file path. @@ -67,13 +70,22 @@ def split_feature_file(input_file: str, output_dir: str) -> List[Dict[str, str]] f_scenario = f_scenario.replace(comment_line, "") if already_visited_scenarios[scenario_title] > 0: - scenario_title = f"{scenario_title} - {already_visited_scenarios[scenario_title]}" + scenario_title = ( + f"{scenario_title} - {already_visited_scenarios[scenario_title]}" + ) scenario_filename = f"{scenario_title.replace(' ', '_')}_{already_visited_scenarios[scenario_title]}.feature" output_file = os.path.join(output_dir, scenario_filename) already_visited_scenarios[scenario_title] += 1 + if dont_append_header and i > 0: + file_content = ( + f"{prev_comment_lines}\n{all_scenarios[i]}{scenario_title}{f_scenario}" + ) + else: + file_content = f"{feature_header}\n\n{prev_comment_lines}\n{all_scenarios[i]}{scenario_title}{f_scenario}" + with open(output_file, "w") as f: - f.write(f"{feature_header}\n\n{prev_comment_lines}\n{all_scenarios[i]}{scenario_title}{f_scenario}") + f.write(file_content) prev_comment_lines = comment_lines scenario_di = { @@ -105,17 +117,21 @@ def serialize_feature_file(file_path: str) -> str: return feature_content -def process_feature_file() -> List[Dict[str, str]]: +def process_feature_file(pass_background_to_all: bool = True) -> List[Dict[str, str]]: """ Processes a Gherkin feature file by splitting it into smaller parts. Returns: List[Dict[str, str]]: A list of dictionaries containing the split parts of the feature file. """ - return split_feature_file(input_gherkin_file_path, tmp_gherkin_path) + return split_feature_file( + input_gherkin_file_path, + tmp_gherkin_path, + dont_append_header=not pass_background_to_all, + ) -def split_test() -> None: +def split_test(pass_background_to_all: bool = True) -> None: """ Parses command line arguments and splits the feature file into individual scenario files. """ @@ -135,7 +151,7 @@ def split_test() -> None: # feature_file_path = args.feature_file_path # output_dir = args.output_dir # list_of_feats = split_feature_file(feature_file_path, output_dir) - list_of_feats = process_feature_file() + list_of_feats = process_feature_file(pass_background_to_all=pass_background_to_all) for feat in list_of_feats: file_path = feat["output_file"] print(serialize_feature_file(file_path))