4
4
# you may not use this file except in compliance with the Elastic License 2.0.
5
5
#
6
6
"""Salesforce source module responsible to fetch documents from Salesforce."""
7
- import asyncio
8
7
import os
9
8
from functools import cached_property , partial
10
9
from itertools import groupby
11
10
12
- import aiofiles
13
11
import aiohttp
14
- from aiofiles .os import remove
15
- from aiofiles .tempfile import NamedTemporaryFile
16
12
from aiohttp .client_exceptions import ClientResponseError
17
13
18
14
from connectors .logger import logger
19
15
from connectors .source import BaseDataSource
20
16
from connectors .utils import (
21
17
TIKA_SUPPORTED_FILETYPES ,
22
18
CancellableSleeps ,
23
- convert_to_b64 ,
24
19
retryable ,
25
20
)
26
21
@@ -293,15 +288,6 @@ async def get_case_feeds(self, case_ids):
293
288
294
289
return all_case_feeds
295
290
296
- # async def download_content_documents(self, content_documents):
297
- # for content_document in content_documents:
298
- # content_version = content_document.get("LatestPublishedVersion", {}) or {}
299
- # download_url = content_version.get("VersionDataUrl")
300
- # if download_url:
301
- # content_document["_attachment"] = await self._download(download_url)
302
- #
303
- # yield content_document
304
-
305
291
async def queryable_sobjects (self ):
306
292
"""Cached async property"""
307
293
if self ._queryable_sobjects is not None :
@@ -425,34 +411,6 @@ async def _execute_non_paginated_query(self, soql_query):
425
411
)
426
412
return response .get ("records" )
427
413
428
- async def _download (self , download_url ):
429
- attachment = None
430
- source_file_name = ""
431
-
432
- try :
433
- async with NamedTemporaryFile (mode = "wb" , delete = False ) as async_buffer :
434
- resp = await self ._get (download_url )
435
- async for data in resp .content .iter_chunked (CHUNK_SIZE ):
436
- await async_buffer .write (data )
437
- source_file_name = async_buffer .name
438
-
439
- await asyncio .to_thread (
440
- convert_to_b64 ,
441
- source = source_file_name ,
442
- )
443
-
444
- async with aiofiles .open (file = source_file_name , mode = "r" ) as target_file :
445
- attachment = (await target_file .read ()).strip ()
446
- except Exception as e :
447
- self ._logger .error (
448
- f"Exception encountered when processing file: { source_file_name } . Exception: { e } "
449
- )
450
- finally :
451
- if source_file_name :
452
- await remove (str (source_file_name ))
453
-
454
- return attachment
455
-
456
414
async def _auth_headers (self ):
457
415
token = await self .api_token .token ()
458
416
return {"authorization" : f"Bearer { token } " }
@@ -1432,9 +1390,13 @@ async def get_docs(self, filtering=None):
1432
1390
# Note: this could possibly be done on the fly if memory becomes an issue
1433
1391
content_docs = self ._combine_duplicate_content_docs (content_docs )
1434
1392
for content_doc in content_docs :
1435
- download_url = (content_doc .get ("LatestPublishedVersion" , {}) or {}).get ("VersionDataUrl" )
1393
+ download_url = (content_doc .get ("LatestPublishedVersion" , {}) or {}).get (
1394
+ "VersionDataUrl"
1395
+ )
1436
1396
if not download_url :
1437
- self ._logger .debug (f"No download URL found for { content_doc .get ('title' )} , skipping." )
1397
+ self ._logger .debug (
1398
+ f"No download URL found for { content_doc .get ('title' )} , skipping."
1399
+ )
1438
1400
continue
1439
1401
1440
1402
doc = self .doc_mapper .map_content_document (content_doc )
@@ -1460,7 +1422,7 @@ async def get_content(self, doc, download_url):
1460
1422
download_url ,
1461
1423
),
1462
1424
),
1463
- return_doc_if_failed = True , # we still ingest on download failure for Salesforce
1425
+ return_doc_if_failed = True , # we still ingest on download failure for Salesforce
1464
1426
)
1465
1427
1466
1428
def _parse_content_documents (self , record ):
0 commit comments