-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[New connector] Onelake connector #3057
Draft
Delacrobix
wants to merge
15
commits into
elastic:main
Choose a base branch
from
Delacrobix:onelake-connector
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 12 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
5e10e1e
onelake.py file added to /connector/sources
Delacrobix 117400b
Draft: BaseDataSource class and methods implementation for Onelake co…
Delacrobix a10e0dd
onelake connector declared in connectors/config.py
Delacrobix ae7fbb0
onelake connector dependencies added to requirements/framework.txt
Delacrobix 1c89181
refactor sync functions to async functions in onelake connector code
Delacrobix 314bf38
name and created_at removed from dictionary in get_content function (…
Delacrobix 4cf0c10
(onelake connector) added logs
Delacrobix ee07c13
(onelake connector) added debug statements
Delacrobix e7716ae
downgrade azure-storage-file-datalek to 12.14.0 version
Delacrobix 3f88710
(onelake connector) tests: 'ping', 'format_file' and '_get_file_clien…
Delacrobix 297db3e
(onelake connector) removed _timestamp from dictionary
Delacrobix 8afd676
(onelake connector) 96% test coverage reached
Delacrobix 38c260c
(onelake) fixture folder for onelake connector, added fixture.py, doc…
Delacrobix 54c5bf0
(onelake) tests: added fake credential
Delacrobix abfc07e
(onelake) _get_account_url method replaced by one field of the class
Delacrobix File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,326 @@ | ||
"""OneLake connector to retrieve data from datalakes""" | ||
|
||
from functools import partial | ||
|
||
from azure.identity import ClientSecretCredential | ||
from azure.storage.filedatalake import DataLakeServiceClient | ||
|
||
from connectors.source import BaseDataSource | ||
|
||
ACCOUNT_NAME = "onelake" | ||
|
||
|
||
class OneLakeDataSource(BaseDataSource): | ||
"""OneLake""" | ||
|
||
name = "OneLake" | ||
service_type = "onelake" | ||
incremental_sync_enabled = True | ||
|
||
def __init__(self, configuration): | ||
"""Set up the connection to the azure base client | ||
|
||
Args: | ||
configuration (DataSourceConfiguration): Object of DataSourceConfiguration class. | ||
""" | ||
super().__init__(configuration=configuration) | ||
self.tenant_id = self.configuration["tenant_id"] | ||
self.client_id = self.configuration["client_id"] | ||
self.client_secret = self.configuration["client_secret"] | ||
self.workspace_name = self.configuration["workspace_name"] | ||
self.data_path = self.configuration["data_path"] | ||
|
||
@classmethod | ||
def get_default_configuration(cls): | ||
"""Get the default configuration for OneLake | ||
|
||
Returns: | ||
dictionary: Default configuration | ||
""" | ||
return { | ||
"tenant_id": { | ||
"label": "OneLake tenant id", | ||
"order": 1, | ||
"type": "str", | ||
}, | ||
"client_id": { | ||
"label": "OneLake client id", | ||
"order": 2, | ||
"type": "str", | ||
}, | ||
"client_secret": { | ||
"label": "OneLake client secret", | ||
"order": 3, | ||
"type": "str", | ||
"sensitive": True, | ||
}, | ||
"workspace_name": { | ||
"label": "OneLake workspace name", | ||
"order": 4, | ||
"type": "str", | ||
}, | ||
"data_path": { | ||
"label": "OneLake data path", | ||
"tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>", | ||
"order": 5, | ||
"type": "str", | ||
}, | ||
"account_name": { | ||
"tooltip": "In the most cases is 'onelake'", | ||
"default_value": ACCOUNT_NAME, | ||
"label": "Account name", | ||
"order": 6, | ||
"type": "str", | ||
}, | ||
} | ||
|
||
async def ping(self): | ||
"""Verify the connection with OneLake""" | ||
|
||
self._logger.info("Generating file system client...") | ||
|
||
try: | ||
await self._get_directory_paths(self.configuration["data_path"]) | ||
self._logger.info( | ||
f"Connection to OneLake successful to {self.configuration['data_path']}" | ||
) | ||
|
||
except Exception: | ||
self._logger.exception("Error while connecting to OneLake.") | ||
raise | ||
|
||
def _get_account_url(self): | ||
"""Get the account URL for OneLake | ||
|
||
Returns: | ||
str: Account URL | ||
""" | ||
|
||
return f"https://{self.configuration['account_name']}.dfs.fabric.microsoft.com" | ||
|
||
def _get_token_credentials(self): | ||
"""Get the token credentials for OneLake | ||
|
||
Returns: | ||
obj: Token credentials | ||
""" | ||
|
||
tenant_id = self.configuration["tenant_id"] | ||
client_id = self.configuration["client_id"] | ||
client_secret = self.configuration["client_secret"] | ||
|
||
try: | ||
return ClientSecretCredential(tenant_id, client_id, client_secret) | ||
except Exception as e: | ||
self._logger.error(f"Error while getting token credentials: {e}") | ||
raise | ||
|
||
async def _get_service_client(self): | ||
"""Get the service client for OneLake | ||
|
||
Returns: | ||
obj: Service client | ||
""" | ||
|
||
try: | ||
return DataLakeServiceClient( | ||
account_url=self._get_account_url(), | ||
credential=self._get_token_credentials(), | ||
) | ||
except Exception as e: | ||
self._logger.error(f"Error while getting service client: {e}") | ||
raise | ||
|
||
async def _get_file_system_client(self): | ||
"""Get the file system client for OneLake | ||
|
||
Returns: | ||
obj: File system client | ||
""" | ||
try: | ||
service_client = await self._get_service_client() | ||
|
||
return service_client.get_file_system_client( | ||
self.configuration["workspace_name"] | ||
) | ||
except Exception as e: | ||
self._logger.error(f"Error while getting file system client: {e}") | ||
raise | ||
|
||
async def _get_directory_client(self): | ||
"""Get the directory client for OneLake | ||
|
||
Returns: | ||
obj: Directory client | ||
""" | ||
|
||
try: | ||
file_system_client = await self._get_file_system_client() | ||
|
||
return file_system_client.get_directory_client( | ||
self.configuration["data_path"] | ||
) | ||
except Exception as e: | ||
self._logger.error(f"Error while getting directory client: {e}") | ||
raise | ||
|
||
async def _get_file_client(self, file_name): | ||
"""Get file client from OneLake | ||
|
||
Args: | ||
file_name (str): name of the file | ||
|
||
Returns: | ||
obj: File client | ||
""" | ||
|
||
try: | ||
directory_client = await self._get_directory_client() | ||
|
||
return directory_client.get_file_client(file_name) | ||
except Exception as e: | ||
self._logger.error(f"Error while getting file client: {e}") | ||
raise | ||
|
||
async def _get_directory_paths(self, directory_path): | ||
"""List directory paths from data lake | ||
|
||
Args: | ||
directory_path (str): Directory path | ||
|
||
Returns: | ||
list: List of paths | ||
""" | ||
|
||
try: | ||
file_system_client = await self._get_file_system_client() | ||
|
||
return file_system_client.get_paths(path=directory_path) | ||
except Exception as e: | ||
self._logger.error(f"Error while getting directory paths: {e}") | ||
raise | ||
|
||
def format_file(self, file_client): | ||
"""Format file_client to be processed | ||
|
||
Args: | ||
file_client (obj): File object | ||
|
||
Returns: | ||
dict: Formatted file | ||
""" | ||
|
||
try: | ||
file_properties = file_client.get_file_properties() | ||
|
||
return { | ||
"_id": f"{file_client.file_system_name}_{file_properties.name.split('/')[-1]}", | ||
"name": file_properties.name.split("/")[-1], | ||
"created_at": file_properties.creation_time.isoformat(), | ||
"_timestamp": file_properties.last_modified.isoformat(), | ||
"size": file_properties.size, | ||
} | ||
except Exception as e: | ||
self._logger.error( | ||
f"Error while formatting file or getting file properties: {e}" | ||
) | ||
raise | ||
|
||
async def download_file(self, file_client): | ||
"""Download file from OneLake | ||
|
||
Args: | ||
file_client (obj): File client | ||
|
||
Returns: | ||
generator: File stream | ||
""" | ||
|
||
try: | ||
download = file_client.download_file() | ||
stream = download.chunks() | ||
|
||
for chunk in stream: | ||
yield chunk | ||
except Exception as e: | ||
self._logger.error(f"Error while downloading file: {e}") | ||
raise | ||
|
||
async def get_content(self, file_name, doit=None, timestamp=None): | ||
"""Obtains the file content for the specified file in `file_name`. | ||
|
||
Args: | ||
file_name (obj): The file name to process to obtain the content. | ||
timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None. | ||
doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None. | ||
|
||
Returns: | ||
str: Content of the file or None if not applicable. | ||
""" | ||
|
||
if not doit: | ||
return | ||
|
||
file_client = await self._get_file_client(file_name) | ||
file_properties = file_client.get_file_properties() | ||
file_extension = self.get_file_extension(file_name) | ||
|
||
doc = { | ||
"_id": f"{file_client.file_system_name}_{file_properties.name}", # id in format <file_system_name>_<file_name> | ||
} | ||
|
||
can_be_downloaded = self.can_file_be_downloaded( | ||
file_extension=file_extension, | ||
filename=file_properties.name, | ||
file_size=file_properties.size, | ||
) | ||
|
||
if not can_be_downloaded: | ||
self._logger.warning( | ||
f"File {file_properties.name} cannot be downloaded. Skipping." | ||
) | ||
return doc | ||
|
||
self._logger.debug(f"Downloading file {file_properties.name}...") | ||
extracted_doc = await self.download_and_extract_file( | ||
doc=doc, | ||
source_filename=file_properties.name.split("/")[-1], | ||
file_extension=file_extension, | ||
download_func=partial(self.download_file, file_client), | ||
) | ||
|
||
return extracted_doc if extracted_doc is not None else doc | ||
|
||
async def prepare_files(self, doc_paths): | ||
"""Prepare files for processing | ||
|
||
Args: | ||
doc_paths (list): List of paths extracted from OneLake | ||
|
||
Yields: | ||
tuple: File document and partial function to get content | ||
""" | ||
|
||
for path in doc_paths: | ||
file_name = path.name.split("/")[-1] | ||
field_client = await self._get_file_client(file_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at the code, for each file multiple clients are created. Is there a way to reuse the clients between calls? I can see how this can become a problem with RAM |
||
|
||
yield self.format_file(field_client) | ||
|
||
async def get_docs(self, filtering=None): | ||
"""Get documents from OneLake and index them | ||
|
||
Yields: | ||
tuple: dictionary with meta-data of each file and a partial function to get the file content. | ||
""" | ||
|
||
self._logger.info(f"Fetching files from OneLake datalake {self.data_path}") | ||
directory_paths = await self._get_directory_paths( | ||
self.configuration["data_path"] | ||
) | ||
|
||
self._logger.debug(f"Found {len(directory_paths)} files in {self.data_path}") | ||
async for file in self.prepare_files(directory_paths): | ||
file_dict = file | ||
|
||
yield file_dict, partial(self.get_content, file_dict["name"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can just become an field of the class that's set during init