diff --git a/connectors/sources/azure_blob_storage.py b/connectors/sources/azure_blob_storage.py index d99838887..79430900e 100644 --- a/connectors/sources/azure_blob_storage.py +++ b/connectors/sources/azure_blob_storage.py @@ -16,7 +16,6 @@ "size": "size", "container": "container", } -DEFAULT_FILE_SIZE_LIMIT = 10485760 DEFAULT_RETRY_COUNT = 3 MAX_CONCURRENT_DOWNLOADS = ( 100 # Max concurrent download supported by Azure Blob Storage diff --git a/connectors/sources/confluence.py b/connectors/sources/confluence.py index 7a0d10520..186735922 100644 --- a/connectors/sources/confluence.py +++ b/connectors/sources/confluence.py @@ -30,7 +30,6 @@ ssl_context, ) -FILE_SIZE_LIMIT = 10485760 RETRY_INTERVAL = 2 SPACE = "space" BLOGPOST = "blogpost" @@ -55,7 +54,6 @@ } PING_URL = "rest/api/space?limit=1" MAX_CONCURRENT_DOWNLOADS = 50 # Max concurrent download supported by confluence -CHUNK_SIZE = 1024 MAX_CONCURRENCY = 50 QUEUE_SIZE = 1024 QUEUE_MEM_SIZE = 25 * 1024 * 1024 # Size in Megabytes diff --git a/connectors/sources/dropbox.py b/connectors/sources/dropbox.py index 1fc91ee74..6b82a7504 100644 --- a/connectors/sources/dropbox.py +++ b/connectors/sources/dropbox.py @@ -34,8 +34,6 @@ RETRY_COUNT = 3 DEFAULT_RETRY_AFTER = 300 # seconds RETRY_INTERVAL = 2 -CHUNK_SIZE = 1024 -FILE_SIZE_LIMIT = 10485760 # ~10 Megabytes MAX_CONCURRENT_DOWNLOADS = 100 LIMIT = 300 # Limit for fetching shared files per call PAPER = "paper" diff --git a/connectors/sources/github.py b/connectors/sources/github.py index 022c5964e..8e5b1417b 100644 --- a/connectors/sources/github.py +++ b/connectors/sources/github.py @@ -4,16 +4,12 @@ # you may not use this file except in compliance with the Elastic License 2.0. # """GitHub source module responsible to fetch documents from GitHub Cloud and Server.""" -import asyncio import time from enum import Enum from functools import cached_property, partial -import aiofiles import aiohttp import fastjsonschema -from aiofiles.os import remove -from aiofiles.tempfile import NamedTemporaryFile from aiohttp.client_exceptions import ClientResponseError from gidgethub.aiohttp import GitHubAPI @@ -26,7 +22,6 @@ from connectors.utils import ( CancellableSleeps, RetryStrategy, - convert_to_b64, decode_base64_value, retryable, ssl_context, @@ -41,7 +36,6 @@ RETRIES = 3 RETRY_INTERVAL = 2 -FILE_SIZE_LIMIT = 10485760 # ~ 10 Megabytes FORBIDDEN = 403 NODE_SIZE = 100 REVIEWS_COUNT = 45 @@ -920,6 +914,14 @@ def get_default_configuration(cls): "ui_restrictions": ["advanced"], "value": RETRIES, }, + "use_text_extraction_service": { + "display": "toggle", + "label": "Use text extraction service", + "order": 8, + "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", + "type": "bool", + "value": False, + }, } async def get_invalid_repos(self): @@ -1320,29 +1322,6 @@ async def _fetch_files(self, repo_name, default_branch): f"Something went wrong while fetching the files of {repo_name}. Exception: {exception}" ) - async def _get_document_with_content(self, url, attachment_name, document): - file_data = await self.github_client.get_github_item(resource=url) - temp_filename = "" - async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: - await async_buffer.write( - decode_base64_value(content=file_data["content"]) # pyright: ignore - ) - temp_filename = str(async_buffer.name) - - self._logger.debug(f"Calling convert_to_b64 for file : {attachment_name}") - await asyncio.to_thread(convert_to_b64, source=temp_filename) - async with aiofiles.open(file=temp_filename, mode="r") as async_buffer: - # base64 on macOS will add a EOL, so we strip() here - document["_attachment"] = (await async_buffer.read()).strip() - - try: - await remove(temp_filename) - except Exception as exception: - self._logger.warning( - f"Could not remove file from: {temp_filename}. Error: {exception}" - ) - return document - async def get_content(self, attachment, timestamp=None, doit=False): """Extracts the content for Apache TIKA supported file types. @@ -1354,31 +1333,36 @@ async def get_content(self, attachment, timestamp=None, doit=False): Returns: dictionary: Content document with _id, _timestamp and attachment content """ - attachment_size = int(attachment["size"]) - if not (doit and attachment_size > 0): + file_size = int(attachment["size"]) + if not (doit and file_size > 0): return - attachment_name = attachment["name"] - - if attachment_size > FILE_SIZE_LIMIT: - self._logger.warning( - f"File size {attachment_size} of file {attachment_name} is larger than {FILE_SIZE_LIMIT} bytes. Discarding file content" - ) + filename = attachment["name"] + file_extension = self.get_file_extension(filename) + if not self.can_file_be_downloaded(file_extension, filename, file_size): return - self._logger.debug(f"Downloading {attachment_name}") - document = { - "_id": f"{attachment['repo_name']}/{attachment['name']}", + "_id": f"{attachment['repo_name']}/{filename}", "_timestamp": attachment["_timestamp"], } - - return await self._get_document_with_content( - url=attachment["url"], - attachment_name=attachment_name, - document=document, + return await self.download_and_extract_file( + document, + filename, + file_extension, + partial( + self.download_func, + attachment["url"], + ), ) + async def download_func(self, url): + file_data = await self.github_client.get_github_item(resource=url) + if file_data: + yield decode_base64_value(content=file_data["content"]) + else: + yield + def _filter_rule_query(self, repo, query, query_type): """ Filters a query based on the query type. diff --git a/connectors/sources/google.py b/connectors/sources/google.py index 371721820..81e0ee018 100644 --- a/connectors/sources/google.py +++ b/connectors/sources/google.py @@ -22,7 +22,6 @@ RETRIES = 3 RETRY_INTERVAL = 2 -FILE_SIZE_LIMIT = 10485760 # ~ 10 Megabytes DEFAULT_TIMEOUT = 1 * 60 # 1 min DEFAULT_PAGE_SIZE = 100 diff --git a/connectors/sources/google_cloud_storage.py b/connectors/sources/google_cloud_storage.py index c17acbbb5..eb5ae8754 100644 --- a/connectors/sources/google_cloud_storage.py +++ b/connectors/sources/google_cloud_storage.py @@ -44,7 +44,6 @@ } DEFAULT_RETRY_COUNT = 3 DEFAULT_WAIT_MULTIPLIER = 2 -DEFAULT_FILE_SIZE_LIMIT = 10485760 STORAGE_EMULATOR_HOST = os.environ.get("STORAGE_EMULATOR_HOST") RUNNING_FTEST = ( "RUNNING_FTEST" in os.environ diff --git a/connectors/sources/google_drive.py b/connectors/sources/google_drive.py index 95d2911a3..bd6f54cd8 100644 --- a/connectors/sources/google_drive.py +++ b/connectors/sources/google_drive.py @@ -769,7 +769,7 @@ async def resolve_paths(self): return folders - async def _download_content(self, file, download_func): + async def _download_content(self, file, file_extension, download_func): """Downloads the file from Google Drive and returns the encoded file content. Args: @@ -781,7 +781,6 @@ async def _download_content(self, file, download_func): """ file_name = file["name"] - file_extension = self.get_file_extension(file_name) attachment, body, file_size = None, None, 0 async with self.create_temp_file(file_extension) as async_buffer: @@ -813,7 +812,12 @@ async def get_google_workspace_content(self, file, timestamp=None): dict: Content document with id, timestamp & text """ - file_name, file_id, file_mime_type = file["name"], file["id"], file["mime_type"] + file_name, file_id, file_mime_type, file_extension = ( + file["name"], + file["id"], + file["mime_type"], + f".{file['file_extension']}", + ) document = { "_id": file_id, @@ -821,6 +825,7 @@ async def get_google_workspace_content(self, file, timestamp=None): } attachment, body, file_size = await self._download_content( file=file, + file_extension=file_extension, download_func=partial( self.google_drive_client.api_call, resource="files", @@ -876,6 +881,7 @@ async def get_generic_file_content(self, file, timestamp=None): } attachment, body, _ = await self._download_content( file=file, + file_extension=file_extension, download_func=partial( self.google_drive_client.api_call, resource="files", diff --git a/connectors/sources/jira.py b/connectors/sources/jira.py index 373e190fc..cdf2795a1 100644 --- a/connectors/sources/jira.py +++ b/connectors/sources/jira.py @@ -38,10 +38,8 @@ RETRIES = 3 RETRY_INTERVAL = 2 DEFAULT_RETRY_SECONDS = 30 -FILE_SIZE_LIMIT = 10485760 FETCH_SIZE = 100 -CHUNK_SIZE = 1024 QUEUE_MEM_SIZE = 5 * 1024 * 1024 # Size in Megabytes MAX_CONCURRENCY = 5 MAX_CONCURRENT_DOWNLOADS = 100 # Max concurrent download supported by jira diff --git a/connectors/sources/onedrive.py b/connectors/sources/onedrive.py index 5adf9b041..c5bfbc2c0 100644 --- a/connectors/sources/onedrive.py +++ b/connectors/sources/onedrive.py @@ -5,17 +5,13 @@ # """OneDrive source module responsible to fetch documents from OneDrive. """ -import asyncio import os from datetime import datetime, timedelta from functools import cached_property, partial from urllib import parse -import aiofiles import aiohttp import fastjsonschema -from aiofiles.os import remove -from aiofiles.tempfile import NamedTemporaryFile from aiohttp.client_exceptions import ClientResponseError, ServerConnectionError from wcmatch import glob @@ -34,7 +30,6 @@ CacheWithTimeout, CancellableSleeps, RetryStrategy, - convert_to_b64, iso_utc, retryable, ) @@ -42,8 +37,6 @@ RETRIES = 3 RETRY_INTERVAL = 2 DEFAULT_RETRY_SECONDS = 30 -CHUNK_SIZE = 1024 -FILE_SIZE_LIMIT = 10485760 # ~10 Megabytes FETCH_SIZE = 999 DEFAULT_PARALLEL_CONNECTION_COUNT = 15 REQUEST_TIMEOUT = 300 @@ -486,33 +479,6 @@ async def ping(self): self._logger.exception("Error while connecting to OneDrive") raise - async def _get_document_with_content(self, attachment_name, document, url): - temp_filename = "" - - async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: - async for response in self.client.get(url=url): - async for data in response.content.iter_chunked(n=CHUNK_SIZE): - await async_buffer.write(data) - temp_filename = str(async_buffer.name) - - self._logger.debug( - f"Download completed for file: {attachment_name}. Calling convert_to_b64" - ) - await asyncio.to_thread( - convert_to_b64, - source=temp_filename, - ) - async with aiofiles.open(file=temp_filename, mode="r") as target_file: - # base64 on macOS will add a EOL, so we strip() here - document["_attachment"] = (await target_file.read()).strip() - try: - await remove(temp_filename) - except Exception as exception: - self._logger.warning( - f"Could not remove file: {temp_filename}. Error: {exception}" - ) - return document - async def get_content(self, file, download_url, timestamp=None, doit=False): """Extracts the content for allowed file types. diff --git a/connectors/sources/outlook.py b/connectors/sources/outlook.py index 0a35cbecc..a4e976e64 100644 --- a/connectors/sources/outlook.py +++ b/connectors/sources/outlook.py @@ -32,12 +32,10 @@ from connectors.logger import logger from connectors.source import BaseDataSource from connectors.utils import ( - TIKA_SUPPORTED_FILETYPES, CancellableSleeps, ConcurrentTasks, MemQueue, RetryStrategy, - get_base64_value, get_pem_format, html_to_text, retryable, @@ -45,7 +43,6 @@ RETRIES = 3 RETRY_INTERVAL = 2 -FILE_SIZE_LIMIT = 10485760 QUEUE_MEM_SIZE = 5 * 1024 * 1024 # Size in Megabytes MAX_CONCURRENCY = 10 @@ -700,14 +697,12 @@ def get_default_configuration(cls): "label": "Tenant ID", "order": 2, "type": "str", - "value": "", }, "client_id": { "depends_on": [{"field": "data_source", "value": OUTLOOK_CLOUD}], "label": "Client ID", "order": 3, "type": "str", - "value": "", }, "client_secret": { "depends_on": [{"field": "data_source", "value": OUTLOOK_CLOUD}], @@ -715,7 +710,6 @@ def get_default_configuration(cls): "order": 4, "sensitive": True, "type": "str", - "value": "", }, "exchange_server": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], @@ -723,7 +717,6 @@ def get_default_configuration(cls): "order": 5, "tooltip": "Exchange server's IP address. E.g. 127.0.0.1", "type": "str", - "value": "", }, "active_directory_server": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], @@ -731,14 +724,12 @@ def get_default_configuration(cls): "order": 6, "tooltip": "Active Directory server's IP address. E.g. 127.0.0.1", "type": "str", - "value": "", }, "username": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], "label": "Exchange server username", "order": 7, "type": "str", - "value": "", }, "password": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], @@ -746,7 +737,6 @@ def get_default_configuration(cls): "order": 8, "sensitive": True, "type": "str", - "value": "", }, "domain": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], @@ -754,7 +744,6 @@ def get_default_configuration(cls): "order": 9, "tooltip": "Domain name such as gmail.com, outlook.com", "type": "str", - "value": "", }, "ssl_enabled": { "depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}], @@ -772,35 +761,20 @@ def get_default_configuration(cls): "label": "SSL certificate", "order": 11, "type": "str", - "value": "", + }, + "use_text_extraction_service": { + "display": "toggle", + "label": "Use text extraction service", + "order": 7, + "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", + "type": "bool", + "value": False, }, } async def close(self): await self.client._get_user_instance.close() - def _pre_checks_for_get_content( - self, attachment_extension, attachment_name, attachment_size - ): - if attachment_extension == "": - self._logger.debug( - f"Files without extension are not supported, skipping {attachment_name}." - ) - return - - if attachment_extension.lower() not in TIKA_SUPPORTED_FILETYPES: - self._logger.debug( - f"Files with the extension {attachment_extension} are not supported, skipping {attachment_name}." - ) - return - - if attachment_size > FILE_SIZE_LIMIT: - self._logger.warning( - f"File size {attachment_size} of file {attachment_name} is larger than {FILE_SIZE_LIMIT} bytes. Discarding file content" - ) - return - return True - async def get_content(self, attachment, timezone, timestamp=None, doit=False): """Extracts the content for allowed file types. @@ -813,35 +787,40 @@ async def get_content(self, attachment, timezone, timestamp=None, doit=False): Returns: dictionary: Content document with _id, _timestamp and attachment content """ - attachment_size = attachment.size - if not (doit and attachment_size > 0): + file_size = attachment.size + if not (doit and file_size > 0): return - attachment_name = attachment.name - attachment_extension = ( - attachment_name[attachment_name.rfind(".") :] # noqa - if "." in attachment_name - else "" - ) - - if not self._pre_checks_for_get_content( - attachment_extension=attachment_extension, - attachment_name=attachment_name, - attachment_size=attachment_size, + filename = attachment.name + file_extension = self.get_file_extension(filename) + if not self.can_file_be_downloaded( + file_extension, + filename, + file_size, ): return - self._logger.debug(f"Downloading {attachment_name}") - document = { "_id": attachment.attachment_id.id, "_timestamp": ews_format_to_datetime( source_datetime=attachment.last_modified_time, timezone=timezone ), - "_attachment": get_base64_value(attachment.content), } + return await self.download_and_extract_file( + document, + filename, + file_extension, + partial(self.download_func, attachment.content), + ) - return document + async def download_func(self, content): + """This is a fake-download function + Its only purpose is to allow attachment content to be + written to a temp file. + This is because outlook doesn't download files, + it instead contains a key with bytes in its response. + """ + yield content async def _fetch_attachments(self, attachment_type, outlook_object, timezone): for attachment in outlook_object.attachments: diff --git a/connectors/sources/s3.py b/connectors/sources/s3.py index 0c45254c1..3f7638b84 100644 --- a/connectors/sources/s3.py +++ b/connectors/sources/s3.py @@ -18,8 +18,6 @@ from connectors.logger import logger, set_extra_logger from connectors.source import BaseDataSource -MAX_CHUNK_SIZE = 1048576 -DEFAULT_MAX_FILE_SIZE = 10485760 DEFAULT_PAGE_SIZE = 100 DEFAULT_MAX_RETRY_ATTEMPTS = 5 DEFAULT_CONNECTION_TIMEOUT = 90 diff --git a/connectors/sources/salesforce.py b/connectors/sources/salesforce.py index 840453c0c..575cae704 100644 --- a/connectors/sources/salesforce.py +++ b/connectors/sources/salesforce.py @@ -26,7 +26,6 @@ RETRIES = 3 RETRY_INTERVAL = 1 -CHUNK_SIZE = 1024 BASE_URL = "https://.my.salesforce.com" API_VERSION = "v58.0" diff --git a/connectors/sources/servicenow.py b/connectors/sources/servicenow.py index 60351ed27..eb7f01b30 100644 --- a/connectors/sources/servicenow.py +++ b/connectors/sources/servicenow.py @@ -33,9 +33,7 @@ RETRIES = 3 RETRY_INTERVAL = 2 -CHUNK_SIZE = 1024 QUEUE_MEM_SIZE = 25 * 1024 * 1024 # Size in Megabytes -FILE_SIZE_LIMIT = 10485760 # Size in Bytes CONCURRENT_TASKS = 1000 # Depends on total number of services and size of each service MAX_CONCURRENT_CLIENT_SUPPORT = 10 TABLE_FETCH_SIZE = 50 diff --git a/connectors/sources/sharepoint_server.py b/connectors/sources/sharepoint_server.py index 2e600ca42..7875fa102 100644 --- a/connectors/sources/sharepoint_server.py +++ b/connectors/sources/sharepoint_server.py @@ -5,7 +5,6 @@ # """SharePoint source module responsible to fetch documents from SharePoint Server. """ -import asyncio import os from functools import partial from urllib.parse import quote @@ -13,7 +12,6 @@ import aiofiles import aiohttp from aiofiles.os import remove -from aiofiles.tempfile import NamedTemporaryFile from aiohttp.client_exceptions import ServerDisconnectedError from connectors.logger import logger @@ -21,17 +19,12 @@ from connectors.utils import ( TIKA_SUPPORTED_FILETYPES, CancellableSleeps, - RetryStrategy, - convert_to_b64, - retryable, ssl_context, ) RETRY_INTERVAL = 2 DEFAULT_RETRY_SECONDS = 30 RETRIES = 3 -FILE_SIZE_LIMIT = 10485760 -CHUNK_SIZE = 1024 TOP = 5000 PING = "ping" SITES = "sites" @@ -168,60 +161,6 @@ async def close_session(self): await self.session.close() # pyright: ignore self.session = None - async def get_content( - self, document, file_relative_url, site_url, timestamp=None, doit=False - ): - """Get content of list items and drive items - - Args: - document (dictionary): Modified document. - file_relative_url (str): Relative url of file - site_url (str): Site path of SharePoint - timestamp (timestamp, optional): Timestamp of item last modified. Defaults to None. - doit (boolean, optional): Boolean value for whether to get content or not. Defaults to False. - - Returns: - dictionary: Content document with id, timestamp & text. - """ - document_size = int(document["size"]) - filename = ( - document["title"] if document["type"] == "File" else document["file_name"] - ) - if not (doit and document_size): - return - - if document_size > FILE_SIZE_LIMIT: - self._logger.warning( - f"File size {document_size} of file {filename} is larger than {FILE_SIZE_LIMIT} bytes. Discarding file content" - ) - return - - source_file_name = "" - - async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: - async for response in self._api_call( - url_name=ATTACHMENT, - host_url=self.host_url, - value=site_url, - file_relative_url=file_relative_url, - ): - async for data in response.content.iter_chunked( # pyright: ignore - CHUNK_SIZE - ): - await async_buffer.write(data) - - source_file_name = async_buffer.name - - await asyncio.to_thread( - convert_to_b64, - source=source_file_name, - ) - return { - "_id": document.get("id"), - "_timestamp": document.get("_timestamp"), - "_attachment": await self.convert_file_to_b64(source_file_name), - } - async def convert_file_to_b64(self, source_file_name): """This method converts the file content into b64 Args: @@ -240,61 +179,6 @@ async def convert_file_to_b64(self, source_file_name): ) return attachment_content - @retryable( - retries=RETRIES, - interval=RETRY_INTERVAL, - strategy=RetryStrategy.EXPONENTIAL_BACKOFF, - ) - async def get_site_pages_content( - self, document, list_response, timestamp=None, doit=False - ): - """Get content of site pages for SharePoint - - Args: - document (dictionary): Modified document. - list_response (dict): Dictionary of list item response - timestamp (timestamp, optional): Timestamp of item last modified. Defaults to None. - doit (boolean, optional): Boolean value for whether to get content or not. Defaults to False. - - Returns: - dictionary: Content document with id, timestamp & text. - """ - document_size = int(document["size"]) - if not (doit and document_size): - return - - filename = ( - document["title"] if document["type"] == "File" else document["file_name"] - ) - - if document_size > FILE_SIZE_LIMIT: - self._logger.warning( - f"File size {document_size} of file {filename} is larger than {FILE_SIZE_LIMIT} bytes. Discarding file content" - ) - return - - source_file_name = "" - - response_data = list_response["WikiField"] - - if response_data is None: - return - - async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: - await async_buffer.write(bytes(response_data, "utf-8")) - - source_file_name = async_buffer.name - - await asyncio.to_thread( - convert_to_b64, - source=source_file_name, - ) - return { - "_id": document.get("id"), - "_timestamp": document.get("_timestamp"), - "_attachment": await self.convert_file_to_b64(source_file_name), - } - async def _api_call(self, url_name, url="", **url_kwargs): """Make an API call to the SharePoint Server @@ -636,6 +520,14 @@ def get_default_configuration(cls): "type": "int", "ui_restrictions": ["advanced"], }, + "use_text_extraction_service": { + "display": "toggle", + "label": "Use text extraction service", + "order": 8, + "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", + "type": "bool", + "value": False, + }, } async def close(self): @@ -856,14 +748,112 @@ async def get_docs(self, filtering=None): else: if is_site_page: yield document, partial( - self.sharepoint_client.get_site_pages_content, + self.get_site_pages_content, document, item, ) else: yield document, partial( - self.sharepoint_client.get_content, + self.get_content, document, file_relative_url, site_url, ) + + async def get_content( + self, document, file_relative_url, site_url, timestamp=None, doit=False + ): + """Get content of list items and drive items + + Args: + document (dictionary): Modified document. + file_relative_url (str): Relative url of file + site_url (str): Site path of SharePoint + timestamp (timestamp, optional): Timestamp of item last modified. Defaults to None. + doit (boolean, optional): Boolean value for whether to get content or not. Defaults to False. + + Returns: + dictionary: Content document with id, timestamp & text. + """ + file_size = int(document["size"]) + if not (doit and file_size): + return + + filename = ( + document["title"] if document["type"] == "File" else document["file_name"] + ) + file_extension = self.get_file_extension(filename) + if not self.can_file_be_downloaded(file_extension, filename, file_size): + return + + file_doc = { + "_id": document.get("id"), + "_timestamp": document.get("_timestamp"), + } + return await self.download_and_extract_file( + file_doc, + filename, + file_extension, + partial( + self.generic_chunked_download_func, + partial( + self.sharepoint_client._api_call, + url_name=ATTACHMENT, + host_url=self.sharepoint_client.host_url, + value=site_url, + file_relative_url=file_relative_url, + ), + ), + ) + + async def get_site_pages_content( + self, document, list_response, timestamp=None, doit=False + ): + """Get content of site pages for SharePoint + + Args: + document (dictionary): Modified document. + list_response (dict): Dictionary of list item response + timestamp (timestamp, optional): Timestamp of item last modified. Defaults to None. + doit (boolean, optional): Boolean value for whether to get content or not. Defaults to False. + + Returns: + dictionary: Content document with id, timestamp & text. + """ + file_size = int(document["size"]) + if not (doit and file_size): + return + + filename = ( + document["title"] if document["type"] == "File" else document["file_name"] + ) + file_extension = self.get_file_extension(filename) + if not self.can_file_be_downloaded(file_extension, filename, file_size): + return + + response_data = list_response["WikiField"] + if response_data is None: + return + + file_doc = { + "_id": document.get("id"), + "_timestamp": document.get("_timestamp"), + } + return await self.download_and_extract_file( + file_doc, + filename, + file_extension, + partial( + self.download_func, + response_data, + ), + ) + + async def download_func(self, response_data): + """This is a fake-download function + Its only purpose is to allow response_data to be + written to a temp file. + This is because sharepoint server page content aren't download files, + it instead contains a key with bytes in its response. + """ + yield bytes(response_data, "utf-8") diff --git a/connectors/sources/zoom.py b/connectors/sources/zoom.py index 6c71eefeb..bfb7f0fd1 100644 --- a/connectors/sources/zoom.py +++ b/connectors/sources/zoom.py @@ -4,26 +4,20 @@ # you may not use this file except in compliance with the Elastic License 2.0. # """Zoom source module responsible to fetch documents from Zoom.""" -import asyncio import os from contextlib import asynccontextmanager from datetime import datetime, timedelta from functools import cached_property, partial -import aiofiles import aiohttp -from aiofiles.os import remove -from aiofiles.tempfile import NamedTemporaryFile from aiohttp.client_exceptions import ClientResponseError from connectors.logger import logger from connectors.source import BaseDataSource from connectors.utils import ( - TIKA_SUPPORTED_FILETYPES, CacheWithTimeout, CancellableSleeps, RetryStrategy, - convert_to_b64, get_base64_value, iso_utc, retryable, @@ -33,7 +27,6 @@ RETRY_INTERVAL = 2 CHAT_PAGE_SIZE = 50 MEETING_PAGE_SIZE = 300 -FILE_SIZE_LIMIT = 10485760 if "OVERRIDE_URL" in os.environ: override_url = os.environ["OVERRIDE_URL"] @@ -304,7 +297,8 @@ async def get_chats(self, user_id, chat_type): yield chat async def get_file_content(self, download_url): - return await self.api_client.content(url=download_url) + content = await self.api_client.content(url=download_url) + yield content.encode("utf-8") class ZoomDataSource(BaseDataSource): @@ -357,6 +351,14 @@ def get_default_configuration(cls): "type": "int", "validations": [{"type": "greater_than", "constraint": -1}], }, + "use_text_extraction_service": { + "display": "toggle", + "label": "Use text extraction service", + "order": 6, + "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", + "type": "bool", + "value": False, + }, } async def validate_config(self): @@ -384,85 +386,33 @@ def _format_doc(self, doc, doc_time): ) return doc - def _pre_checks_for_get_content( - self, attachment_extension, attachment_name, attachment_size - ): - if attachment_extension == "": - self._logger.warning( - f"Files without extension are not supported, skipping {attachment_name}." - ) - return False - - if attachment_extension.lower() not in TIKA_SUPPORTED_FILETYPES: - self._logger.warning( - f"Files with the extension {attachment_extension} are not supported, skipping {attachment_name}." - ) - return False - - if attachment_size > FILE_SIZE_LIMIT: - self._logger.warning( - f"File size {attachment_size} of file {attachment_name} is larger than {FILE_SIZE_LIMIT} bytes. Discarding file content" - ) - return False - return True - - async def _get_document_with_content(self, doc): - document = { - "_id": doc["id"], - "_timestamp": doc["date_time"], - } - - temp_filename = "" - - try: - async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer: - temp_filename = str(async_buffer.name) - response = await self.client.get_file_content( - download_url=doc["download_url"] - ) - - if not response: - return - await async_buffer.write(response.encode("utf-8")) - - await asyncio.to_thread( - convert_to_b64, - source=temp_filename, - ) - async with aiofiles.open(file=temp_filename, mode="r") as target_file: - # base64 on macOS will add a EOL, so we strip() here - document["_attachment"] = (await target_file.read()).strip() - return document - finally: - try: - await remove(temp_filename) - except Exception as exception: - self._logger.warning( - f"Error while deleting the file: {temp_filename} from disk. Error: {exception}" - ) - - async def get_content(self, doc, timestamp=None, doit=False): - attachment_size = doc["file_size"] - if not (doit and attachment_size > 0): + async def get_content(self, chat_file, timestamp=None, doit=False): + file_size = chat_file["file_size"] + if not (doit and file_size > 0): return - attachment_name = doc["file_name"] - - attachment_extension = ( - attachment_name[attachment_name.rfind(".") :] - if "." in attachment_name - else "" - ) - - if not self._pre_checks_for_get_content( - attachment_extension=attachment_extension, - attachment_name=attachment_name, - attachment_size=attachment_size, + filename = chat_file["file_name"] + file_extension = self.get_file_extension(filename) + if not self.can_file_be_downloaded( + file_extension, + filename, + file_size, ): return - self._logger.debug(f"Downloading {attachment_name}") - return await self._get_document_with_content(doc) + document = { + "_id": chat_file["id"], + "_timestamp": chat_file["date_time"], + } + return await self.download_and_extract_file( + document, + filename, + file_extension, + partial( + self.client.get_file_content, + download_url=chat_file["download_url"], + ), + ) async def fetch_previous_meeting_details(self, meeting_id): previous_meeting = await self.client.get_past_meeting(meeting_id=meeting_id) @@ -544,4 +494,4 @@ async def get_docs(self, filtering=None): chat_file["id"] = chat_file.get("file_id") yield self._format_doc( doc=chat_file, doc_time=chat_file.get("date_time") - ), partial(self.get_content, doc=chat_file.copy()) + ), partial(self.get_content, chat_file=chat_file.copy()) diff --git a/tests/sources/fixtures/github/connector.json b/tests/sources/fixtures/github/connector.json index 287a64df5..293eb90ab 100644 --- a/tests/sources/fixtures/github/connector.json +++ b/tests/sources/fixtures/github/connector.json @@ -73,6 +73,21 @@ "ui_restrictions": ["advanced"], "value": 3, "validations": [{"type": "less_than", "constraint": 10}] + }, + "use_text_extraction_service": { + "default_value": null, + "depends_on": [], + "display": "toggle", + "label": "Use text extraction service", + "options": [], + "order": 8, + "required": true, + "sensitive": false, + "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", + "type": "bool", + "ui_restrictions": [], + "validations": [], + "value": false } }, "filtering": [ diff --git a/tests/sources/fixtures/sharepoint_server/connector.json b/tests/sources/fixtures/sharepoint_server/connector.json index 9f5d5353d..fb3de84d4 100644 --- a/tests/sources/fixtures/sharepoint_server/connector.json +++ b/tests/sources/fixtures/sharepoint_server/connector.json @@ -50,6 +50,21 @@ "type": "int", "ui_restrictions": ["advanced"], "value": null + }, + "use_text_extraction_service": { + "default_value": null, + "depends_on": [], + "display": "toggle", + "label": "Use text extraction service", + "options": [], + "order": 7, + "required": true, + "sensitive": false, + "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", + "type": "bool", + "ui_restrictions": [], + "validations": [], + "value": false } }, "custom_scheduling": {}, diff --git a/tests/sources/fixtures/zoom/connector.json b/tests/sources/fixtures/zoom/connector.json index 461235cf8..acec03d4f 100644 --- a/tests/sources/fixtures/zoom/connector.json +++ b/tests/sources/fixtures/zoom/connector.json @@ -75,6 +75,21 @@ "value": 4, "order": 5, "ui_restrictions": [] + }, + "use_text_extraction_service": { + "default_value": null, + "depends_on": [], + "display": "toggle", + "label": "Use text extraction service", + "options": [], + "order": 6, + "required": true, + "sensitive": false, + "tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.", + "type": "bool", + "ui_restrictions": [], + "validations": [], + "value": false } }, "custom_scheduling": {}, diff --git a/tests/sources/test_github.py b/tests/sources/test_github.py index 0e0d02d1c..418af5d05 100644 --- a/tests/sources/test_github.py +++ b/tests/sources/test_github.py @@ -413,13 +413,14 @@ @asynccontextmanager -async def create_github_source(): +async def create_github_source(use_text_extraction_service=False): async with create_source( GitHubDataSource, data_source="github-server", token="changeme", repositories="*", ssl_enabled=False, + use_text_extraction_service=use_text_extraction_service, ) as source: yield source @@ -673,6 +674,32 @@ async def test_get_content_with_md_file(): assert actual_response == expected_response +@pytest.mark.asyncio +async def test_get_content_with_md_file_with_extraction_service(): + with patch( + "connectors.content_extraction.ContentExtraction.extract_text", + return_value="Test File !!! U+1F602", + ), patch( + "connectors.content_extraction.ContentExtraction.get_extraction_config", + return_value={"host": "http://localhost:8090"}, + ): + expected_response = { + "_id": "demo_repo/source.md", + "_timestamp": "2023-04-17T12:55:01Z", + "body": "Test File !!! U+1F602", + } + async with create_github_source(use_text_extraction_service=True) as source: + with patch.object( + source.github_client._get_client, + "getitem", + side_effect=[MOCK_RESPONSE_ATTACHMENTS[1]], + ): + actual_response = await source.get_content( + attachment=MOCK_ATTACHMENT, doit=True + ) + assert actual_response == expected_response + + @pytest.mark.asyncio @pytest.mark.parametrize( "size, expected_content", diff --git a/tests/sources/test_outlook.py b/tests/sources/test_outlook.py index 8cad303b9..7f2d7a6d6 100644 --- a/tests/sources/test_outlook.py +++ b/tests/sources/test_outlook.py @@ -4,6 +4,7 @@ # you may not use this file except in compliance with the Elastic License 2.0. # """Tests the Outlook source class methods""" +from contextlib import asynccontextmanager from unittest import mock from unittest.mock import AsyncMock, MagicMock, patch @@ -29,6 +30,11 @@ "_timestamp": "2023-12-12T01:01:01Z", "_attachment": "IyBUaGlzIGlzIHRoZSBkdW1teSBmaWxl", } +EXPECTED_CONTENT_EXTRACTED = { + "_id": "attachment_id_1", + "_timestamp": "2023-12-12T01:01:01Z", + "body": str(RESPONSE_CONTENT), +} TIMEZONE = "Asia/Kolkata" MAIL = "mail" @@ -349,6 +355,39 @@ def __init__(self, *args, **kwargs): self.content = StreamReader +@asynccontextmanager +async def create_outlook_source( + data_source=OUTLOOK_CLOUD, + tenant_id="foo", + client_id="bar", + client_secret="faa", + exchange_server="127.0.0.1", + active_directory_server="127.0.0.1", + username="fee", + password="fuu", + domain="outlook.com", + ssl_enabled=False, + ssl_ca="", + use_text_extraction_service=False, +): + async with create_source( + OutlookDataSource, + data_source=data_source, + tenant_id=tenant_id, + client_id=client_id, + client_secret=client_secret, + exchange_server=exchange_server, + active_directory_server=active_directory_server, + username=username, + password=password, + domain=domain, + ssl_enabled=ssl_enabled, + ssl_ca=ssl_ca, + use_text_extraction_service=use_text_extraction_service, + ) as source: + yield source + + def get_json_mock(mock_response, status): async_mock = AsyncMock() async_mock.__aenter__ = AsyncMock( @@ -415,7 +454,7 @@ async def test_validate_configuration_with_invalid_dependency_fields_raises_erro extras, ): # Setup - async with create_source(OutlookDataSource, **extras) as source: + async with create_outlook_source(**extras) as source: # Execute with pytest.raises(ConfigurableFieldValueError): await source.validate_config() @@ -452,7 +491,7 @@ async def test_validate_configuration_with_invalid_dependency_fields_raises_erro async def test_validate_config_with_valid_dependency_fields_does_not_raise_error( extras, ): - async with create_source(OutlookDataSource, **extras) as source: + async with create_outlook_source(**extras) as source: await source.validate_config() @@ -467,7 +506,7 @@ async def test_ping_for_server(mock_connection): None, ) - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: source.client.is_cloud = False await source.ping() @@ -483,7 +522,7 @@ async def test_ping_for_server_for_failed_connection(mock_connection): None, ) - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: source.client.is_cloud = False with pytest.raises(UsersFetchFailed): await source.ping() @@ -491,7 +530,7 @@ async def test_ping_for_server_for_failed_connection(mock_connection): @pytest.mark.asyncio async def test_ping_for_cloud(): - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: with mock.patch( "aiohttp.ClientSession.post", return_value=get_json_mock( @@ -520,7 +559,7 @@ async def test_ping_for_cloud_for_failed_connection( mock_time_to_sleep_between_retries, raised_exception, side_effect_exception ): mock_time_to_sleep_between_retries.return_value = 0 - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: with mock.patch( "aiohttp.ClientSession.post", side_effect=side_effect_exception, @@ -535,7 +574,7 @@ async def test_ping_for_cloud_for_failed_connection( @pytest.mark.asyncio async def test_get_users_for_cloud(): - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: users = [] with mock.patch( "aiohttp.ClientSession.post", @@ -565,7 +604,7 @@ async def test_get_users_for_cloud(): ], ) async def test_get_content(attachment, expected_content): - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: response = await source.get_content( attachment=attachment, timezone=TIMEZONE, @@ -574,6 +613,24 @@ async def test_get_content(attachment, expected_content): assert response == expected_content +@pytest.mark.asyncio +async def test_get_content_with_extraction_service(): + with patch( + "connectors.content_extraction.ContentExtraction.extract_text", + return_value=str(RESPONSE_CONTENT), + ), patch( + "connectors.content_extraction.ContentExtraction.get_extraction_config", + return_value={"host": "http://localhost:8090"}, + ): + async with create_outlook_source(use_text_extraction_service=True) as source: + response = await source.get_content( + attachment=MOCK_ATTACHMENT, + timezone=TIMEZONE, + doit=True, + ) + assert response == EXPECTED_CONTENT_EXTRACTED + + @pytest.mark.asyncio @pytest.mark.parametrize( "is_cloud, user_response", @@ -584,7 +641,7 @@ async def test_get_content(attachment, expected_content): ) @patch("connectors.sources.outlook.Account", return_value="account") async def test_get_user_accounts_for_cloud(account, is_cloud, user_response): - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: source.client.is_cloud = is_cloud source.client._get_user_instance.get_users = AsyncIterator([user_response]) @@ -594,7 +651,7 @@ async def test_get_user_accounts_for_cloud(account, is_cloud, user_response): @pytest.mark.asyncio async def test_get_docs(): - async with create_source(OutlookDataSource) as source: + async with create_outlook_source() as source: source.client._get_user_instance.get_user_accounts = AsyncIterator( [MockAccount()] ) diff --git a/tests/sources/test_sharepoint_server.py b/tests/sources/test_sharepoint_server.py index 7a988e980..199482686 100644 --- a/tests/sources/test_sharepoint_server.py +++ b/tests/sources/test_sharepoint_server.py @@ -25,7 +25,9 @@ @asynccontextmanager -async def create_sps_source(ssl_enabled=False, ssl_ca="", retry_count=3): +async def create_sps_source( + ssl_enabled=False, ssl_ca="", retry_count=3, use_text_extraction_service=False +): async with create_source( SharepointServerDataSource, username="admin", @@ -35,6 +37,7 @@ async def create_sps_source(ssl_enabled=False, ssl_ca="", retry_count=3): ssl_enabled=ssl_enabled, ssl_ca=ssl_ca, retry_count=retry_count, + use_text_extraction_service=use_text_extraction_service, ) as source: yield source @@ -678,7 +681,7 @@ async def test_get_content(): "aiohttp.StreamReader.iter_chunked", return_value=AsyncIterator([bytes(response_content, "utf-8")]), ): - response_content = await source.sharepoint_client.get_content( + response_content = await source.get_content( document=expected_attachment, file_relative_url="abc.com", site_url="/site", @@ -688,6 +691,46 @@ async def test_get_content(): assert response_content == expected_content +@pytest.mark.asyncio +async def test_get_content_with_content_extraction(): + response_content = "This is a dummy sharepoint body response" + with patch( + "connectors.content_extraction.ContentExtraction.extract_text", + return_value=response_content, + ), patch( + "connectors.content_extraction.ContentExtraction.get_extraction_config", + return_value={"host": "http://localhost:8090"}, + ): + async_response = MockObjectResponse() + expected_attachment = { + "id": 1, + "server_relative_url": "/url", + "_timestamp": "2022-06-20T10:37:44Z", + "size": 11, + "type": "sites", + "file_name": "dummy.pdf", + } + expected_content = { + "_id": 1, + "body": response_content, + "_timestamp": "2022-06-20T10:37:44Z", + } + async with create_sps_source(use_text_extraction_service=True) as source: + source.sharepoint_client._api_call = AsyncIterator([async_response]) + with mock.patch( + "aiohttp.StreamReader.iter_chunked", + return_value=AsyncIterator([bytes(response_content, "utf-8")]), + ): + response_content = await source.get_content( + document=expected_attachment, + file_relative_url="abc.com", + site_url="/site", + doit=True, + ) + + assert response_content == expected_content + + class ContentResponse: content = b"This is a dummy sharepoint body response" @@ -705,7 +748,7 @@ async def test_get_content_when_size_is_bigger(): "file_name": "dummy.pdf", } async with create_sps_source() as source: - response_content = await source.sharepoint_client.get_content( + response_content = await source.get_content( document=document, file_relative_url="abc.com", site_url="/site", doit=True ) @@ -723,7 +766,7 @@ async def test_get_content_when_doit_is_none(): "file_name": "dummy.pdf", } async with create_sps_source() as source: - response_content = await source.sharepoint_client.get_content( + response_content = await source.get_content( document=document, file_relative_url="abc.com", site_url="/site" ) @@ -1009,7 +1052,7 @@ async def test_get_site_pages_content(): "_timestamp": "2022-06-20T10:37:44Z", } async with create_sps_source() as source: - response_content = await source.sharepoint_client.get_site_pages_content( + response_content = await source.get_site_pages_content( document=EXPECTED_ATTACHMENT, list_response=RESPONSE_DATA, doit=True, @@ -1029,7 +1072,7 @@ async def coroutine_generator(item): async def test_get_site_pages_content_when_doit_is_none(): document = {"title": "Home.aspx", "type": "File", "size": 1000000} async with create_sps_source() as source: - response_content = await source.sharepoint_client.get_site_pages_content( + response_content = await source.get_site_pages_content( document=document, list_response={}, doit=None, @@ -1042,7 +1085,7 @@ async def test_get_site_pages_content_when_doit_is_none(): async def test_get_site_pages_content_for_wikifiled_none(): async with create_sps_source() as source: EXPECTED_ATTACHMENT = {"title": "Home.aspx", "type": "File", "size": "1000000"} - response_content = await source.sharepoint_client.get_site_pages_content( + response_content = await source.get_site_pages_content( document=EXPECTED_ATTACHMENT, list_response={"WikiField": None}, doit=True, diff --git a/tests/sources/test_zoom.py b/tests/sources/test_zoom.py index bf9e80873..81ffcbc69 100644 --- a/tests/sources/test_zoom.py +++ b/tests/sources/test_zoom.py @@ -291,6 +291,11 @@ "_timestamp": "2023-03-09T00:00:00Z", "_attachment": "Q29udGVudA==", } +FILE_EXPECTED_CONTENT_EXTRACTED = { + "_id": "file1", + "_timestamp": "2023-03-09T00:00:00Z", + "body": SAMPLE_CONTENT, +} FILE_EXPECTED_RESPONSE = { "_id": "file1", "_timestamp": "2023-03-09T00:00:00Z", @@ -403,7 +408,9 @@ def get_mock(mock_response): @asynccontextmanager -async def create_zoom_source(fetch_past_meeting_details=False): +async def create_zoom_source( + fetch_past_meeting_details=False, use_text_extraction_service=False +): async with create_source( ZoomDataSource, account_id="123", @@ -411,6 +418,7 @@ async def create_zoom_source(fetch_past_meeting_details=False): client_secret="secret#123", recording_age=4, fetch_past_meeting_details=fetch_past_meeting_details, + use_text_extraction_service=use_text_extraction_service, ) as source: yield source @@ -755,12 +763,37 @@ async def test_get_content(attachment, doit, expected_content): side_effect=mock_zoom_apis, ): response = await source.get_content( - doc=attachment, + chat_file=attachment, doit=doit, ) assert response == expected_content +@pytest.mark.asyncio +async def test_get_content_with_extraction_service(): + with patch( + "connectors.content_extraction.ContentExtraction.extract_text", + return_value=SAMPLE_CONTENT, + ), patch( + "connectors.content_extraction.ContentExtraction.get_extraction_config", + return_value={"host": "http://localhost:8090"}, + ): + async with create_zoom_source(use_text_extraction_service=True) as source: + with mock.patch( + "aiohttp.ClientSession.post", + return_value=mock_token_response(), + ): + with mock.patch( + "aiohttp.ClientSession.get", + side_effect=mock_zoom_apis, + ): + response = await source.get_content( + chat_file=FILE, + doit=True, + ) + assert response == FILE_EXPECTED_CONTENT_EXTRACTED + + @pytest.mark.asyncio @freeze_time("2023-03-09T00:00:00") async def test_get_docs():