From e24b9a1a885999ea3411239b1f1dde4f526beeea Mon Sep 17 00:00:00 2001 From: Ian Hellen Date: Fri, 1 Sep 2023 15:01:38 -0700 Subject: [PATCH] Fixing regular expression error for connection string Adding additional tests for connection strings Fixing connection string handling and tests for azure_monitor_driver.py --- msticpy/common/wsconfig.py | 27 +++++++++--- msticpy/data/drivers/azure_monitor_driver.py | 10 +++-- tests/common/test_wsconfig.py | 42 +++++++++++++++++++ .../data/drivers/test_azure_monitor_driver.py | 6 +++ 4 files changed, 76 insertions(+), 9 deletions(-) diff --git a/msticpy/common/wsconfig.py b/msticpy/common/wsconfig.py index 3bf3b28a8..f678b50b9 100644 --- a/msticpy/common/wsconfig.py +++ b/msticpy/common/wsconfig.py @@ -248,17 +248,34 @@ def from_settings(cls, settings: Dict[str, Any]) -> "WorkspaceConfig": @classmethod def from_connection_string(cls, connection_str: str) -> "WorkspaceConfig": """Create a WorkstationConfig from a connection string.""" - tenant_regex = r".*tenant\(\s?['\"](?P[\w]+)['\"].*" - workspace_regex = r".*workspace\(\s?['\"](?P[\w]+)['\"].*" - tenant_id = workspace_id = None - if match := re.match(tenant_regex, connection_str): + tenant_regex = r""" + .*tenant\s?=\s?['\"]\{? + (?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) + \}?['\"].*""" + workspace_regex = r""" + .*workspace\s?=\s?['\"]\{? + (?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) + \}?['\"].*""" + ws_name_regex = r".*alias\s?=\s?['\"]\{?(?P\w+)['\"].*" + + tenant_id = workspace_id = workspace_name = None + if match := re.match(tenant_regex, connection_str, re.IGNORECASE | re.VERBOSE): tenant_id = match.groupdict()["tenant_id"] - if match := re.match(workspace_regex, connection_str): + else: + raise ValueError("Could not find tenant ID in connection string.") + if match := re.match( + workspace_regex, connection_str, re.IGNORECASE | re.VERBOSE + ): workspace_id = match.groupdict()["workspace_id"] + else: + raise ValueError("Could not find workspace ID in connection string.") + if match := re.match(ws_name_regex, connection_str, re.IGNORECASE | re.VERBOSE): + workspace_name = match.groupdict()["workspace_name"] return cls( config={ cls.CONF_WS_ID_KEY: workspace_id, # type: ignore[dict-item] cls.CONF_TENANT_ID_KEY: tenant_id, # type: ignore[dict-item] + cls.CONF_WS_NAME_KEY: workspace_name, # type: ignore[dict-item] } ) diff --git a/msticpy/data/drivers/azure_monitor_driver.py b/msticpy/data/drivers/azure_monitor_driver.py index c7e2f775b..2ab6817c9 100644 --- a/msticpy/data/drivers/azure_monitor_driver.py +++ b/msticpy/data/drivers/azure_monitor_driver.py @@ -14,6 +14,7 @@ azure/monitor-query-readme?view=azure-python """ +import contextlib import logging from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast @@ -423,10 +424,11 @@ def _get_workspaces(self, connection_str: Optional[str] = None, **kwargs): ) elif isinstance(connection_str, str): self._def_connection_str = connection_str - ws_config = WorkspaceConfig.from_connection_string(connection_str) - logger.info( - "WorkspaceConfig created from connection_str %s", connection_str - ) + with contextlib.suppress(ValueError): + ws_config = WorkspaceConfig.from_connection_string(connection_str) + logger.info( + "WorkspaceConfig created from connection_str %s", connection_str + ) elif isinstance(connection_str, WorkspaceConfig): logger.info("WorkspaceConfig as parameter %s", connection_str.workspace_id) ws_config = connection_str diff --git a/tests/common/test_wsconfig.py b/tests/common/test_wsconfig.py index df35d5715..500afe501 100644 --- a/tests/common/test_wsconfig.py +++ b/tests/common/test_wsconfig.py @@ -157,3 +157,45 @@ def test_wsconfig_single_ws(): and _NAMED_WS["WorkspaceId"] in wstest_config.code_connect_str and _NAMED_WS["TenantId"] in wstest_config.code_connect_str ) + + +_TENANT = "d8d9d2f2-5d2d-4d7e-9c5c-5d6d9d1d8d9d" +_WS_ID = "f8d9d2f2-5d2d-4d7e-9c5c-5d6d9d1d8d9e" +_CLI_ID = "18d9d2f2-5d2d-4d7e-9c5c-5d6d9d1d8d9f" +_WS_NAME = "Workspace" +_CONFIG_STR_TEST_CASES = ( + ( + f"loganalytics://code;workspace='{_WS_ID}';alias='{_WS_NAME}';tenant='{_TENANT}'", + True, + ), + ( + f"loganalytics://tenant='{_TENANT}';clientid='{_CLI_ID}';clientsecret='[PLACEHOLDER]';workspace='{_WS_ID}';alias='{_WS_NAME}'", + True, + ), + ( + f"loganalytics://username='User';password='[PLACEHOLDER]';workspace='{_WS_ID}';alias='{_WS_NAME}';tenant='{_TENANT}'", + True, + ), + ( + f"loganalytics://anonymous;workspace='{_WS_ID}';alias='{_WS_NAME}';tenant='{_TENANT}'", + True, + ), + (f"loganalytics://code;workspace='{_WS_ID}';alias='{_WS_NAME}'", False), + (f"loganalytics://code;alias='{_WS_NAME}';tenant='{_TENANT}'", False), +) + + +@pytest.mark.parametrize("config_str, is_valid", _CONFIG_STR_TEST_CASES) +def test_wsconfig_config_str(config_str, is_valid): + """Test capture of config from connections strings.""" + if is_valid: + ws = WorkspaceConfig.from_connection_string(config_str) + if "workspace" in config_str: + check.equal(ws["workspace_id"], _WS_ID) + if "tenant" in config_str: + check.equal(ws["tenant_id"], _TENANT) + if "alias" in config_str: + check.equal(ws["workspace_name"], _WS_NAME) + else: + with pytest.raises(ValueError): + WorkspaceConfig.from_connection_string(config_str) diff --git a/tests/data/drivers/test_azure_monitor_driver.py b/tests/data/drivers/test_azure_monitor_driver.py index 50a6586dc..a4df6150e 100644 --- a/tests/data/drivers/test_azure_monitor_driver.py +++ b/tests/data/drivers/test_azure_monitor_driver.py @@ -103,6 +103,11 @@ def get_token(self, *args, **kwargs): "a927809c-8142-43e1-96b3-4ad87cfe95a4", ] +_VALID_CONN_STR = ( + f"loganalytics://tenant='{_WS_IDS[0]}';workspace='{_WS_IDS[1]}'" + f";alias='wksp';clientid='{_WS_IDS[0]}';client_secret='{_WS_IDS[1]}'" +) + _TEST_CONNECT_PARAMS = ( ( {"auth_types": ["cli", "environment", "msi"]}, @@ -110,6 +115,7 @@ def get_token(self, *args, **kwargs): ), ({"auth_types": "cli"}, [("_connect_auth_types", ["cli"])]), ({"tenant_id": "test"}, [("_az_tenant_id", "test")]), + ({"connection_str": _VALID_CONN_STR}, [("_workspace_id", _WS_IDS[1])]), ({"connection_str": "test"}, [(None, MsticpyKqlConnectionError)]), ( {"mp_az_auth": ["cli", "environment", "msi"]},