Skip to content

Commit

Permalink
[Agent] Check for soft deleted connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb committed Jan 7, 2025
1 parent 2e22143 commit 0175fce
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
4 changes: 3 additions & 1 deletion connectors/agent/connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None
random_connector_name_id = generate_random_id(length=4)
connector_name = f"[Elastic-managed] {service_type} connector {random_connector_name_id}"

if not await self.connector_index.connector_exists(connector_id):
if not await self.connector_index.connector_exists(
connector_id, include_deleted=True
):
try:
await self.connector_index.connector_put(
connector_id=connector_id,
Expand Down
12 changes: 12 additions & 0 deletions connectors/es/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ class TemporaryConnectorApiWrapper(ESClient):
def __init__(self, elastic_config):
super().__init__(elastic_config)

async def connector_get(self, connector_id, deleted):
return await self.client.perform_request(
"GET",
f"/_connector/{connector_id}&deleted={deleted}",
headers={"accept": "application/json"},
)

async def connector_check_in(self, connector_id):
return await self.client.perform_request(
"PUT",
Expand Down Expand Up @@ -98,6 +105,11 @@ async def connector_check_in(self, connector_id):
partial(self._api_wrapper.connector_check_in, connector_id)
)

async def connector_get(self, connector_id, deleted=False):
return await self._retrier.execute_with_retry(
partial(self._api_wrapper.connector_get, connector_id, deleted)
)

async def connector_put(
self, connector_id, service_type, connector_name, index_name, is_native
):
Expand Down
9 changes: 7 additions & 2 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ async def connector_put(
is_native=is_native,
)

async def connector_exists(self, connector_id):
async def connector_exists(self, connector_id, include_deleted=False):
try:
doc = await self.fetch_by_id(connector_id)
doc = await self.get_connector(connector_id, include_deleted)
return doc is not None
except DocumentNotFoundError:
return False
Expand All @@ -191,6 +191,11 @@ async def connector_exists(self, connector_id):
)
raise e

async def get_connector(self, connector_id, include_deleted=False):
return await self.api.connector_get(
connector_id=connector_id, deleted=include_deleted
)

async def connector_update_scheduling(
self, connector_id, full=None, incremental=None, access_control=None
):
Expand Down
6 changes: 3 additions & 3 deletions tests/protocol/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,7 @@ async def test_connector_exists_returns_true_when_found():
}

index = ConnectorIndex(config)
index.fetch_by_id = AsyncMock(return_value={"id": "1"})
index.get_connector = AsyncMock(return_value={"id": "1"})

exists = await index.connector_exists("1")
assert exists is True
Expand All @@ -1671,7 +1671,7 @@ async def test_connector_exists_returns_false_when_not_found():
}

index = ConnectorIndex(config)
index.fetch_by_id = AsyncMock(side_effect=DocumentNotFoundError)
index.get_connector = AsyncMock(side_effect=DocumentNotFoundError)

exists = await index.connector_exists("1")
assert exists is False
Expand All @@ -1686,7 +1686,7 @@ async def test_connector_exists_raises_non_404_exception():
}

index = ConnectorIndex(config)
index.fetch_by_id = AsyncMock(side_effect=Exception("Fetch error"))
index.get_connector = AsyncMock(side_effect=Exception("Fetch error"))

with pytest.raises(Exception, match="Fetch error"):
await index.connector_exists("1")
Expand Down

0 comments on commit 0175fce

Please sign in to comment.