Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand extraction service to more connectors #4 #1698

Merged
merged 15 commits into from
Sep 29, 2023
Merged
1 change: 0 additions & 1 deletion connectors/sources/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"size": "size",
"container": "container",
}
DEFAULT_FILE_SIZE_LIMIT = 10485760
DEFAULT_RETRY_COUNT = 3
MAX_CONCURRENT_DOWNLOADS = (
100 # Max concurrent download supported by Azure Blob Storage
Expand Down
2 changes: 0 additions & 2 deletions connectors/sources/confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
ssl_context,
)

FILE_SIZE_LIMIT = 10485760
RETRY_INTERVAL = 2
SPACE = "space"
BLOGPOST = "blogpost"
Expand All @@ -55,7 +54,6 @@
}
PING_URL = "rest/api/space?limit=1"
MAX_CONCURRENT_DOWNLOADS = 50 # Max concurrent download supported by confluence
CHUNK_SIZE = 1024
MAX_CONCURRENCY = 50
QUEUE_SIZE = 1024
QUEUE_MEM_SIZE = 25 * 1024 * 1024 # Size in Megabytes
Expand Down
2 changes: 0 additions & 2 deletions connectors/sources/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
RETRY_COUNT = 3
DEFAULT_RETRY_AFTER = 300 # seconds
RETRY_INTERVAL = 2
CHUNK_SIZE = 1024
FILE_SIZE_LIMIT = 10485760 # ~10 Megabytes
MAX_CONCURRENT_DOWNLOADS = 100
LIMIT = 300 # Limit for fetching shared files per call
PAPER = "paper"
Expand Down
74 changes: 29 additions & 45 deletions connectors/sources/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""GitHub source module responsible to fetch documents from GitHub Cloud and Server."""
import asyncio
import time
from enum import Enum
from functools import cached_property, partial

import aiofiles
import aiohttp
import fastjsonschema
from aiofiles.os import remove
from aiofiles.tempfile import NamedTemporaryFile
from aiohttp.client_exceptions import ClientResponseError
from gidgethub.aiohttp import GitHubAPI

Expand All @@ -26,7 +22,6 @@
from connectors.utils import (
CancellableSleeps,
RetryStrategy,
convert_to_b64,
decode_base64_value,
retryable,
ssl_context,
Expand All @@ -41,7 +36,6 @@

RETRIES = 3
RETRY_INTERVAL = 2
FILE_SIZE_LIMIT = 10485760 # ~ 10 Megabytes
FORBIDDEN = 403
NODE_SIZE = 100
REVIEWS_COUNT = 45
Expand Down Expand Up @@ -920,6 +914,14 @@ def get_default_configuration(cls):
"ui_restrictions": ["advanced"],
"value": RETRIES,
},
"use_text_extraction_service": {
"display": "toggle",
"label": "Use text extraction service",
"order": 8,
"tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.",
"type": "bool",
"value": False,
},
}

async def get_invalid_repos(self):
Expand Down Expand Up @@ -1320,29 +1322,6 @@ async def _fetch_files(self, repo_name, default_branch):
f"Something went wrong while fetching the files of {repo_name}. Exception: {exception}"
)

async def _get_document_with_content(self, url, attachment_name, document):
file_data = await self.github_client.get_github_item(resource=url)
temp_filename = ""
async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer:
await async_buffer.write(
decode_base64_value(content=file_data["content"]) # pyright: ignore
)
temp_filename = str(async_buffer.name)

self._logger.debug(f"Calling convert_to_b64 for file : {attachment_name}")
await asyncio.to_thread(convert_to_b64, source=temp_filename)
async with aiofiles.open(file=temp_filename, mode="r") as async_buffer:
# base64 on macOS will add a EOL, so we strip() here
document["_attachment"] = (await async_buffer.read()).strip()

try:
await remove(temp_filename)
except Exception as exception:
self._logger.warning(
f"Could not remove file from: {temp_filename}. Error: {exception}"
)
return document

async def get_content(self, attachment, timestamp=None, doit=False):
"""Extracts the content for Apache TIKA supported file types.

Expand All @@ -1354,31 +1333,36 @@ async def get_content(self, attachment, timestamp=None, doit=False):
Returns:
dictionary: Content document with _id, _timestamp and attachment content
"""
attachment_size = int(attachment["size"])
if not (doit and attachment_size > 0):
file_size = int(attachment["size"])
if not (doit and file_size > 0):
return

attachment_name = attachment["name"]

if attachment_size > FILE_SIZE_LIMIT:
self._logger.warning(
f"File size {attachment_size} of file {attachment_name} is larger than {FILE_SIZE_LIMIT} bytes. Discarding file content"
)
filename = attachment["name"]
file_extension = self.get_file_extension(filename)
if not self.can_file_be_downloaded(file_extension, filename, file_size):
return

self._logger.debug(f"Downloading {attachment_name}")

document = {
"_id": f"{attachment['repo_name']}/{attachment['name']}",
"_id": f"{attachment['repo_name']}/{filename}",
"_timestamp": attachment["_timestamp"],
}

return await self._get_document_with_content(
url=attachment["url"],
attachment_name=attachment_name,
document=document,
return await self.download_and_extract_file(
document,
filename,
file_extension,
partial(
self.download_func,
attachment["url"],
),
)

async def download_func(self, url):
file_data = await self.github_client.get_github_item(resource=url)
if file_data:
yield decode_base64_value(content=file_data["content"])
else:
yield

def _filter_rule_query(self, repo, query, query_type):
"""
Filters a query based on the query type.
Expand Down
1 change: 0 additions & 1 deletion connectors/sources/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

RETRIES = 3
RETRY_INTERVAL = 2
FILE_SIZE_LIMIT = 10485760 # ~ 10 Megabytes
DEFAULT_TIMEOUT = 1 * 60 # 1 min
DEFAULT_PAGE_SIZE = 100

Expand Down
1 change: 0 additions & 1 deletion connectors/sources/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
}
DEFAULT_RETRY_COUNT = 3
DEFAULT_WAIT_MULTIPLIER = 2
DEFAULT_FILE_SIZE_LIMIT = 10485760
STORAGE_EMULATOR_HOST = os.environ.get("STORAGE_EMULATOR_HOST")
RUNNING_FTEST = (
"RUNNING_FTEST" in os.environ
Expand Down
12 changes: 9 additions & 3 deletions connectors/sources/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ async def resolve_paths(self):

return folders

async def _download_content(self, file, download_func):
async def _download_content(self, file, file_extension, download_func):
"""Downloads the file from Google Drive and returns the encoded file content.

Args:
Expand All @@ -781,7 +781,6 @@ async def _download_content(self, file, download_func):
"""

file_name = file["name"]
file_extension = self.get_file_extension(file_name)
attachment, body, file_size = None, None, 0

async with self.create_temp_file(file_extension) as async_buffer:
Expand Down Expand Up @@ -813,14 +812,20 @@ async def get_google_workspace_content(self, file, timestamp=None):
dict: Content document with id, timestamp & text
"""

file_name, file_id, file_mime_type = file["name"], file["id"], file["mime_type"]
file_name, file_id, file_mime_type, file_extension = (
file["name"],
file["id"],
file["mime_type"],
f".{file['file_extension']}",
)

document = {
"_id": file_id,
"_timestamp": file["_timestamp"],
}
attachment, body, file_size = await self._download_content(
file=file,
file_extension=file_extension,
download_func=partial(
self.google_drive_client.api_call,
resource="files",
Expand Down Expand Up @@ -876,6 +881,7 @@ async def get_generic_file_content(self, file, timestamp=None):
}
attachment, body, _ = await self._download_content(
file=file,
file_extension=file_extension,
download_func=partial(
self.google_drive_client.api_call,
resource="files",
Expand Down
2 changes: 0 additions & 2 deletions connectors/sources/jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@
RETRIES = 3
RETRY_INTERVAL = 2
DEFAULT_RETRY_SECONDS = 30
FILE_SIZE_LIMIT = 10485760

FETCH_SIZE = 100
CHUNK_SIZE = 1024
QUEUE_MEM_SIZE = 5 * 1024 * 1024 # Size in Megabytes
MAX_CONCURRENCY = 5
MAX_CONCURRENT_DOWNLOADS = 100 # Max concurrent download supported by jira
Expand Down
34 changes: 0 additions & 34 deletions connectors/sources/onedrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@
#
"""OneDrive source module responsible to fetch documents from OneDrive.
"""
import asyncio
import os
from datetime import datetime, timedelta
from functools import cached_property, partial
from urllib import parse

import aiofiles
import aiohttp
import fastjsonschema
from aiofiles.os import remove
from aiofiles.tempfile import NamedTemporaryFile
from aiohttp.client_exceptions import ClientResponseError, ServerConnectionError
from wcmatch import glob

Expand All @@ -34,16 +30,13 @@
CacheWithTimeout,
CancellableSleeps,
RetryStrategy,
convert_to_b64,
iso_utc,
retryable,
)

RETRIES = 3
RETRY_INTERVAL = 2
DEFAULT_RETRY_SECONDS = 30
CHUNK_SIZE = 1024
FILE_SIZE_LIMIT = 10485760 # ~10 Megabytes
FETCH_SIZE = 999
DEFAULT_PARALLEL_CONNECTION_COUNT = 15
REQUEST_TIMEOUT = 300
Expand Down Expand Up @@ -486,33 +479,6 @@ async def ping(self):
self._logger.exception("Error while connecting to OneDrive")
raise

async def _get_document_with_content(self, attachment_name, document, url):
temp_filename = ""

async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer:
async for response in self.client.get(url=url):
async for data in response.content.iter_chunked(n=CHUNK_SIZE):
await async_buffer.write(data)
temp_filename = str(async_buffer.name)

self._logger.debug(
f"Download completed for file: {attachment_name}. Calling convert_to_b64"
)
await asyncio.to_thread(
convert_to_b64,
source=temp_filename,
)
async with aiofiles.open(file=temp_filename, mode="r") as target_file:
# base64 on macOS will add a EOL, so we strip() here
document["_attachment"] = (await target_file.read()).strip()
try:
await remove(temp_filename)
except Exception as exception:
self._logger.warning(
f"Could not remove file: {temp_filename}. Error: {exception}"
)
return document

async def get_content(self, file, download_url, timestamp=None, doit=False):
"""Extracts the content for allowed file types.

Expand Down
Loading