Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a new content index if it doesn't exist #1668

Merged
merged 10 commits into from
Sep 27, 2023
68 changes: 46 additions & 22 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import time
from collections import defaultdict

from elasticsearch import NotFoundError as ElasticNotFoundError
from elasticsearch import (
NotFoundError as ElasticNotFoundError,
)
from elasticsearch.helpers import async_scan

from connectors.es import ESClient
from connectors.es import ESClient, Mappings
from connectors.es.settings import Settings
from connectors.filtering.basic_rule import BasicRuleEngine, parse
from connectors.logger import logger, tracer
from connectors.protocol import Filter, JobType
Expand Down Expand Up @@ -647,7 +650,7 @@ def __init__(self, elastic_config, logger_=None):
self._sink = None
self._sink_task = None

async def prepare_content_index(self, index, *, mappings=None):
async def prepare_content_index(self, index, language_code=None):
"""Creates the index, given a mapping if it does not exists."""
if not index.startswith("search-"):
raise ContentIndexNameInvalid(
Expand All @@ -660,29 +663,50 @@ async def prepare_content_index(self, index, *, mappings=None):
exists = await self.client.indices.exists(
index=index, expand_wildcards=expand_wildcards
)

mappings = Mappings.default_text_fields_mappings(is_connectors_index=True)

if exists:
# Update the index mappings if needed
self._logger.debug(f"{index} exists")
response = await self.client.indices.get_mapping(
index=index, expand_wildcards=expand_wildcards
await self._ensure_content_index_mappings(index, mappings, expand_wildcards)
else:
# Create a new index
self._logger.info(f"Creating content index: {index}")
await self._create_content_index(
index=index, language_code=language_code, mappings=mappings
)
existing_mappings = response[index].get("mappings", {})
if len(existing_mappings) == 0 and mappings:
self._logger.debug(
"Index %s has no mappings or it's empty. Adding mappings...", index
)
await self.client.indices.put_mapping(
index=index,
dynamic=mappings.get("dynamic", False),
dynamic_templates=mappings.get("dynamic_templates", []),
properties=mappings.get("properties", {}),
expand_wildcards=expand_wildcards,
)
self._logger.debug("Index %s mappings added", index)
else:
self._logger.debug("Index %s already has mappings. Skipping...", index)
return
self._logger.info(f"Content index successfully created: {index}")

async def _ensure_content_index_mappings(self, index, mappings, expand_wildcards):
vidok marked this conversation as resolved.
Show resolved Hide resolved
response = await self.client.indices.get_mapping(
index=index, expand_wildcards=expand_wildcards
)

existing_mappings = response[index].get("mappings", {})
if len(existing_mappings) == 0 and mappings:
self._logger.debug(
"Index %s has no mappings or it's empty. Adding mappings...", index
)
await self.client.indices.put_mapping(
index=index,
dynamic=mappings.get("dynamic", False),
dynamic_templates=mappings.get("dynamic_templates", []),
properties=mappings.get("properties", {}),
expand_wildcards=expand_wildcards,
)
self._logger.debug("Successfully added mappings for index %s", index)
else:
raise IndexMissing(f"Index {index} does not exist!")
self._logger.debug(
"Index %s already has mappings, skipping mappings creation", index
)

async def _create_content_index(self, index, mappings, language_code=None):
settings = Settings(language_code=language_code, analysis_icu=False).to_hash()

return await self.client.indices.create(
index=index, mappings=mappings, settings=settings
)

def done(self):
if self._extractor_task is not None and not self._extractor_task.done():
Expand Down
8 changes: 3 additions & 5 deletions connectors/sync_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import elasticsearch

from connectors.es import Mappings
from connectors.es.client import License, with_concurrency_control
from connectors.es.index import DocumentNotFoundError
from connectors.es.license import requires_platinum_license
Expand Down Expand Up @@ -195,12 +194,11 @@ async def _execute_content_sync_job(self, job_type, bulk_options):
sync_rules_enabled = self.connector.features.sync_rules_enabled()
if sync_rules_enabled:
await self.sync_job.validate_filtering(validator=self.data_provider)
mappings = Mappings.default_text_fields_mappings(
is_connectors_index=True,
)

logger.debug("Preparing the content index")

await self.elastic_server.prepare_content_index(
self.sync_job.index_name, mappings=mappings
index=self.sync_job.index_name, language_code=self.sync_job.language
)

content_extraction_enabled = (
Expand Down
85 changes: 65 additions & 20 deletions tests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
from unittest.mock import ANY, AsyncMock, Mock, call

import pytest
from elasticsearch import BadRequestError

from connectors.es.settings import TEXT_FIELD_MAPPING
from connectors.es import Mappings
from connectors.es.settings import Settings
from connectors.es.sink import (
OP_DELETE,
OP_INDEX,
OP_UPSERT,
AsyncBulkRunningError,
ContentIndexNameInvalid,
Extractor,
IndexMissing,
Sink,
SyncOrchestrator,
)
Expand Down Expand Up @@ -57,49 +58,93 @@ async def test_prepare_content_index_raise_error_when_index_name_invalid():


@pytest.mark.asyncio
async def test_prepare_content_index_raise_error_when_index_does_not_exist(
async def test_prepare_content_index_raise_error_when_index_creation_failed(
mock_responses,
):
index_name = "search-new-index"
config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"}
headers = {"X-Elastic-Product": "Elasticsearch"}
mock_responses.post(
"http://nowhere.com:9200/.elastic-connectors/_refresh", headers=headers
)
mock_responses.head(
"http://nowhere.com:9200/search-new-index?expand_wildcards=open",
f"http://nowhere.com:9200/{index_name}?expand_wildcards=open",
headers=headers,
status=404,
)
mock_responses.put(
"http://nowhere.com:9200/search-new-index",
f"http://nowhere.com:9200/{index_name}",
payload={"_id": "1"},
headers=headers,
)

es = SyncOrchestrator(config)

with pytest.raises(IndexMissing):
await es.prepare_content_index("search-new-index")
with mock.patch.object(
es.client.indices,
"create",
side_effect=[BadRequestError(message="test", body=None, meta=None)],
):
with pytest.raises(BadRequestError):
await es.prepare_content_index(index_name)

await es.close()

await es.close()

@pytest.mark.asyncio
async def test_prepare_content_index_create_index(
mock_responses,
):
index_name = "search-new-index"
config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"}
headers = {"X-Elastic-Product": "Elasticsearch"}
mock_responses.post(
"http://nowhere.com:9200/.elastic-connectors/_refresh", headers=headers
)
mock_responses.head(
f"http://nowhere.com:9200/{index_name}?expand_wildcards=open",
headers=headers,
status=404,
)
mock_responses.put(
f"http://nowhere.com:9200/{index_name}",
payload={"_id": "1"},
headers=headers,
)

es = SyncOrchestrator(config)

create_index_result = asyncio.Future()
create_index_result.set_result({"acknowledged": True})

mappings = Mappings.default_text_fields_mappings(is_connectors_index=True)

settings = Settings(analysis_icu=False).to_hash()

with mock.patch.object(
es.client.indices, "create", return_value=create_index_result
) as create_index_mock:
await es.prepare_content_index(index_name)

await es.close()

expected_params = {
"index": index_name,
"mappings": mappings,
"settings": settings,
}

create_index_mock.assert_called_with(**expected_params)


@pytest.mark.asyncio
async def test_prepare_content_index(mock_responses):
config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"}
headers = {"X-Elastic-Product": "Elasticsearch"}
# prepare-index, with mappings
dynamic_templates = {
"data": {
"match_mapping_type": "string",
"mapping": TEXT_FIELD_MAPPING,
}
}
mappings = {
"dynamic": True,
"dynamic_templates": dynamic_templates,
"properties": {"name": {"type": "keyword"}},
}

mappings = Mappings.default_text_fields_mappings(is_connectors_index=True)

mock_responses.head(
"http://nowhere.com:9200/search-new-index?expand_wildcards=open",
headers=headers,
Expand All @@ -124,7 +169,7 @@ async def test_prepare_content_index(mock_responses):
return_value=put_mappings_result,
) as put_mapping_mock:
index_name = "search-new-index"
await es.prepare_content_index(index_name, mappings=mappings)
await es.prepare_content_index(index_name)

await es.close()

Expand Down