Skip to content

Commit 32f236a

Browse files
Expand extraction service to more connectors #3 (#1694)
1 parent a7402a3 commit 32f236a

21 files changed

+619
-317
lines changed

connectors/source.py

+48-17
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,11 @@ def get_file_extension(self, filename):
693693
return get_file_extension(filename)
694694

695695
def can_file_be_downloaded(self, file_extension, filename, file_size):
696+
return self.is_valid_file_type(
697+
file_extension, filename
698+
) and self.is_file_size_within_limit(file_size, filename)
699+
700+
def is_valid_file_type(self, file_extension, filename):
696701
if file_extension == "":
697702
self._logger.debug(
698703
f"Files without extension are not supported, skipping {filename}."
@@ -705,6 +710,9 @@ def can_file_be_downloaded(self, file_extension, filename, file_size):
705710
)
706711
return False
707712

713+
return True
714+
715+
def is_file_size_within_limit(self, file_size, filename):
708716
if file_size > FILE_SIZE_LIMIT and not self.configuration.get(
709717
"use_text_extraction_service"
710718
):
@@ -716,26 +724,49 @@ def can_file_be_downloaded(self, file_extension, filename, file_size):
716724
return True
717725

718726
async def download_and_extract_file(
719-
self, doc, source_filename, file_extension, download_func
727+
self,
728+
doc,
729+
source_filename,
730+
file_extension,
731+
download_func,
732+
return_doc_if_failed=False,
720733
):
721-
# 1 create tempfile
722-
async with self.create_temp_file(file_extension) as async_buffer:
723-
temp_filename = async_buffer.name
724-
725-
# 2 download to tempfile
726-
await self.download_to_temp_file(
727-
temp_filename,
728-
source_filename,
729-
async_buffer,
730-
download_func,
731-
)
734+
"""
735+
Performs all the steps required for handling binary content:
736+
1. Make temp file
737+
2. Download content to temp file
738+
3. Extract using local service or convert to b64
732739
733-
# 3 extract or convert content
734-
doc = await self.handle_file_content_extraction(
735-
doc, source_filename, temp_filename
736-
)
740+
Will return the doc with either `_attachment` or `body` added.
741+
Returns `None` if any step fails.
737742
738-
return doc
743+
If the optional arg `return_doc_if_failed` is `True`,
744+
will return the original doc upon failure
745+
"""
746+
try:
747+
async with self.create_temp_file(file_extension) as async_buffer:
748+
temp_filename = async_buffer.name
749+
750+
await self.download_to_temp_file(
751+
temp_filename,
752+
source_filename,
753+
async_buffer,
754+
download_func,
755+
)
756+
757+
doc = await self.handle_file_content_extraction(
758+
doc, source_filename, temp_filename
759+
)
760+
return doc
761+
except Exception as e:
762+
self._logger.warning(
763+
f"File download and extraction or conversion for file {source_filename} failed: {e}",
764+
exc_info=True,
765+
)
766+
if return_doc_if_failed:
767+
return doc
768+
else:
769+
return
739770

740771
@asynccontextmanager
741772
async def create_temp_file(self, file_extension):

connectors/sources/azure_blob_storage.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,13 @@ async def get_content(self, blob, timestamp=None, doit=None):
178178
return
179179

180180
document = {"_id": blob["id"], "_timestamp": blob["_timestamp"]}
181-
document = await self.download_and_extract_file(
181+
return await self.download_and_extract_file(
182182
document,
183183
filename,
184184
file_extension,
185185
partial(self.blob_download_func, filename, blob["container"]),
186186
)
187187

188-
return document
189-
190188
async def blob_download_func(self, blob_name, container_name):
191189
async with BlobClient.from_connection_string(
192190
conn_str=self.connection_string,

connectors/sources/confluence.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ async def download_attachment(self, url, attachment, timestamp=None, doit=False)
653653
return
654654

655655
document = {"_id": attachment["_id"], "_timestamp": attachment["_timestamp"]}
656-
document = await self.download_and_extract_file(
656+
return await self.download_and_extract_file(
657657
document,
658658
filename,
659659
file_extension,
@@ -666,8 +666,6 @@ async def download_attachment(self, url, attachment, timestamp=None, doit=False)
666666
),
667667
)
668668

669-
return document
670-
671669
async def _attachment_coro(self, document, access_control):
672670
"""Coroutine to add attachments to Queue and download content
673671

connectors/sources/dropbox.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ async def get_content(
671671
"_id": attachment["id"],
672672
"_timestamp": attachment["server_modified"],
673673
}
674-
document = await self.download_and_extract_file(
674+
return await self.download_and_extract_file(
675675
document,
676676
filename,
677677
file_extension,
@@ -681,8 +681,6 @@ async def get_content(
681681
),
682682
)
683683

684-
return document
685-
686684
def download_func(self, is_shared, attachment, filename):
687685
if is_shared:
688686
return partial(

connectors/sources/google_cloud_storage.py

+24-29
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010
import urllib.parse
1111
from functools import cached_property, partial
1212

13-
import aiofiles
14-
from aiofiles.os import remove
15-
from aiofiles.tempfile import NamedTemporaryFile
1613
from aiogoogle import Aiogoogle
1714
from aiogoogle.auth.creds import ServiceAccountCreds
1815

@@ -22,7 +19,7 @@
2219
load_service_account_json,
2320
validate_service_account_json,
2421
)
25-
from connectors.utils import TIKA_SUPPORTED_FILETYPES, convert_to_b64, get_pem_format
22+
from connectors.utils import get_pem_format
2623

2724
CLOUD_STORAGE_READ_ONLY_SCOPE = "https://www.googleapis.com/auth/devstorage.read_only"
2825
CLOUD_STORAGE_BASE_URL = "https://console.cloud.google.com/storage/browser/_details/"
@@ -229,6 +226,14 @@ def get_default_configuration(cls):
229226
"type": "int",
230227
"ui_restrictions": ["advanced"],
231228
},
229+
"use_text_extraction_service": {
230+
"display": "toggle",
231+
"label": "Use text extraction service",
232+
"order": 3,
233+
"tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.",
234+
"type": "bool",
235+
"value": False,
236+
},
232237
}
233238

234239
async def validate_config(self):
@@ -370,50 +375,40 @@ async def get_content(self, blob, timestamp=None, doit=None):
370375
Returns:
371376
dictionary: Content document with id, timestamp & text
372377
"""
373-
blob_size = int(blob["size"])
374-
if not (doit and blob_size):
378+
file_size = int(blob["size"])
379+
if not (doit and file_size):
375380
return
376381

377-
blob_name = blob["name"]
378-
if (os.path.splitext(blob_name)[-1]).lower() not in TIKA_SUPPORTED_FILETYPES:
379-
self._logger.debug(f"{blob_name} can't be extracted")
382+
filename = blob["name"]
383+
file_extension = self.get_file_extension(filename)
384+
if not self.can_file_be_downloaded(file_extension, filename, file_size):
380385
return
381386

382-
if blob_size > DEFAULT_FILE_SIZE_LIMIT:
383-
self._logger.warning(
384-
f"File size {blob_size} of file {blob_name} is larger than {DEFAULT_FILE_SIZE_LIMIT} bytes. Discarding the file content"
385-
)
386-
return
387-
self._logger.debug(f"Downloading {blob_name}")
388387
document = {
389388
"_id": blob["id"],
390389
"_timestamp": blob["_timestamp"],
391390
}
392-
source_file_name = ""
393-
async with NamedTemporaryFile(mode="wb", delete=False) as async_buffer:
391+
392+
# gcs has a unique download method so we can't utilize
393+
# the generic download_and_extract_file func
394+
async with self.create_temp_file(file_extension) as async_buffer:
394395
await anext(
395396
self._google_storage_client.api_call(
396397
resource="objects",
397398
method="get",
398399
bucket=blob["bucket_name"],
399-
object=blob_name,
400+
object=filename,
400401
alt="media",
401402
userProject=self._google_storage_client.user_project_id,
402403
pipe_to=async_buffer,
403404
)
404405
)
405-
source_file_name = async_buffer.name
406+
await async_buffer.close()
407+
408+
document = await self.handle_file_content_extraction(
409+
document, filename, async_buffer.name
410+
)
406411

407-
self._logger.debug(f"Calling convert_to_b64 for file : {blob_name}")
408-
await asyncio.to_thread(
409-
convert_to_b64,
410-
source=source_file_name,
411-
)
412-
async with aiofiles.open(file=source_file_name, mode="r") as target_file:
413-
# base64 on macOS will add a EOL, so we strip() here
414-
document["_attachment"] = (await target_file.read()).strip()
415-
await remove(str(source_file_name))
416-
self._logger.debug(f"Downloaded {blob_name} for {blob_size} bytes ")
417412
return document
418413

419414
async def get_docs(self, filtering=None):

0 commit comments

Comments
 (0)