From 0175fceafb1059fc65ac63c296c9748856119458 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Tue, 7 Jan 2025 15:06:14 +0100 Subject: [PATCH] [Agent] Check for soft deleted connectors --- connectors/agent/connector_record_manager.py | 4 +++- connectors/es/index.py | 12 ++++++++++++ connectors/protocol/connectors.py | 9 +++++++-- tests/protocol/test_connectors.py | 6 +++--- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/connectors/agent/connector_record_manager.py b/connectors/agent/connector_record_manager.py index 686025ecc..18de334b0 100644 --- a/connectors/agent/connector_record_manager.py +++ b/connectors/agent/connector_record_manager.py @@ -50,7 +50,9 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None random_connector_name_id = generate_random_id(length=4) connector_name = f"[Elastic-managed] {service_type} connector {random_connector_name_id}" - if not await self.connector_index.connector_exists(connector_id): + if not await self.connector_index.connector_exists( + connector_id, include_deleted=True + ): try: await self.connector_index.connector_put( connector_id=connector_id, diff --git a/connectors/es/index.py b/connectors/es/index.py index 4e6b753a2..61a979c4f 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -27,6 +27,13 @@ class TemporaryConnectorApiWrapper(ESClient): def __init__(self, elastic_config): super().__init__(elastic_config) + async def connector_get(self, connector_id, deleted): + return await self.client.perform_request( + "GET", + f"/_connector/{connector_id}&deleted={deleted}", + headers={"accept": "application/json"}, + ) + async def connector_check_in(self, connector_id): return await self.client.perform_request( "PUT", @@ -98,6 +105,11 @@ async def connector_check_in(self, connector_id): partial(self._api_wrapper.connector_check_in, connector_id) ) + async def connector_get(self, connector_id, deleted=False): + return await self._retrier.execute_with_retry( + partial(self._api_wrapper.connector_get, connector_id, deleted) + ) + async def connector_put( self, connector_id, service_type, connector_name, index_name, is_native ): diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index c69cd9dea..e170897f2 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -179,9 +179,9 @@ async def connector_put( is_native=is_native, ) - async def connector_exists(self, connector_id): + async def connector_exists(self, connector_id, include_deleted=False): try: - doc = await self.fetch_by_id(connector_id) + doc = await self.get_connector(connector_id, include_deleted) return doc is not None except DocumentNotFoundError: return False @@ -191,6 +191,11 @@ async def connector_exists(self, connector_id): ) raise e + async def get_connector(self, connector_id, include_deleted=False): + return await self.api.connector_get( + connector_id=connector_id, deleted=include_deleted + ) + async def connector_update_scheduling( self, connector_id, full=None, incremental=None, access_control=None ): diff --git a/tests/protocol/test_connectors.py b/tests/protocol/test_connectors.py index d77f762c5..29b2e9c5b 100644 --- a/tests/protocol/test_connectors.py +++ b/tests/protocol/test_connectors.py @@ -1656,7 +1656,7 @@ async def test_connector_exists_returns_true_when_found(): } index = ConnectorIndex(config) - index.fetch_by_id = AsyncMock(return_value={"id": "1"}) + index.get_connector = AsyncMock(return_value={"id": "1"}) exists = await index.connector_exists("1") assert exists is True @@ -1671,7 +1671,7 @@ async def test_connector_exists_returns_false_when_not_found(): } index = ConnectorIndex(config) - index.fetch_by_id = AsyncMock(side_effect=DocumentNotFoundError) + index.get_connector = AsyncMock(side_effect=DocumentNotFoundError) exists = await index.connector_exists("1") assert exists is False @@ -1686,7 +1686,7 @@ async def test_connector_exists_raises_non_404_exception(): } index = ConnectorIndex(config) - index.fetch_by_id = AsyncMock(side_effect=Exception("Fetch error")) + index.get_connector = AsyncMock(side_effect=Exception("Fetch error")) with pytest.raises(Exception, match="Fetch error"): await index.connector_exists("1")