diff --git a/encord/http/utils.py b/encord/http/utils.py index 750fe9d29..e591d4d36 100644 --- a/encord/http/utils.py +++ b/encord/http/utils.py @@ -9,13 +9,15 @@ --- """ +from __future__ import annotations + import logging import mimetypes import os.path from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path -from typing import Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type, Union from tqdm import tqdm @@ -34,6 +36,9 @@ ) from encord.orm.storage import StorageItemType, UploadSignedUrl +if TYPE_CHECKING: + from requests.sessions import _Data + PROGRESS_BAR_FILE_FACTOR = 100 CACHE_DURATION_IN_SECONDS = 24 * 60 * 60 # 1 day UPLOAD_TO_SIGNED_URL_LIST_MAX_WORKERS = 4 @@ -139,14 +144,15 @@ def upload_to_signed_url_list_for_single_file( backoff_factor (float): Backoff factor for retry delays. """ try: - _upload_single_file( - file_path, - title, - signed_url, - _get_content_type(upload_item_type, file_path), - max_retries=max_retries, - backoff_factor=backoff_factor, - ) + with open(file_path, "rb") as f: + _upload_single_file( + f, + title, + signed_url, + _get_content_type(upload_item_type, file_path), + max_retries=max_retries, + backoff_factor=backoff_factor, + ) except CloudUploadError as e: failures.append( UploadToSignedUrlFailure( @@ -266,7 +272,7 @@ def upload_to_signed_url_list( def _upload_single_file( - file_path: Union[str, Path], + data: _Data, title: str, signed_url: str, content_type: Optional[str], @@ -278,20 +284,19 @@ def _upload_single_file( with create_new_session( max_retries=max_retries, backoff_factor=backoff_factor, connect_retries=max_retries ) as session: - with open(file_path, "rb") as f: - res_upload = session.put( - signed_url, data=f, headers={"Content-Type": content_type, "Cache-Control": f"max-age={cache_max_age}"} + res_upload = session.put( + signed_url, data=data, headers={"Content-Type": content_type, "Cache-Control": f"max-age={cache_max_age}"} + ) + + if res_upload.status_code != 200: + status_code = res_upload.status_code + headers = res_upload.headers + res_text = res_upload.text + error_string = str( + f"Error uploading file '{title}' to signed url: " + f"'{signed_url}'.\n" + f"Response data:\n\tstatus code: '{status_code}'\n\theaders: '{headers}'\n\tcontent: '{res_text}'", ) - if res_upload.status_code != 200: - status_code = res_upload.status_code - headers = res_upload.headers - res_text = res_upload.text - error_string = str( - f"Error uploading file '{title}' to signed url: " - f"'{signed_url}'.\n" - f"Response data:\n\tstatus code: '{status_code}'\n\theaders: '{headers}'\n\tcontent: '{res_text}'", - ) - - logger.error(error_string) - raise CloudUploadError(error_string) + logger.error(error_string) + raise CloudUploadError(error_string) diff --git a/encord/storage.py b/encord/storage.py index e070959c9..d67caa256 100644 --- a/encord/storage.py +++ b/encord/storage.py @@ -15,6 +15,7 @@ import os import time from datetime import datetime +from io import BufferedReader from math import ceil from pathlib import Path from typing import Any, Collection, Dict, Iterable, List, Literal, Optional, Sequence, TextIO, Union @@ -705,7 +706,8 @@ def upload_audio( def upload_text( self, - file_path: Union[Path, str], + file_path: Optional[Union[Path, str]] = None, + text_contents: Optional[Union[str, bytes, BufferedReader]] = None, title: Optional[str] = None, client_metadata: Optional[Dict[str, Any]] = None, text_metadata: Optional[orm_storage.CustomerProvidedTextMetadata] = None, @@ -715,6 +717,7 @@ def upload_text( Args: file_path: File path of the text file. For example: '/home/user/data/report.txt' + text_contents: Text contents to be uploaded. Can pass str,bytes, or file-like object directly title: The item title. If unspecified, the file name is used as the title. client_metadata: Optional custom metadata to be associated with the text file. Should be a dictionary that is JSON-serializable. text_metadata: Optional media metadata for a text file. The Encord platform uses the specified values instead of scanning the files. @@ -734,17 +737,23 @@ def upload_text( - file_size: int - Size of the text file in bytes. - mime_type: str - MIME type of the text file (for example: `application/json` or `text/plain`). """ + if file_path and text_contents: + raise EncordException("We only support passing either the file_path or the text_contents. Can't pass both") + if not file_path and not text_contents: + raise EncordException("Either file_path or text_contents must be provided") upload_url_info = self._get_upload_signed_urls( item_type=StorageItemType.PLAIN_TEXT, count=1, frames_subfolder_name=None ) if len(upload_url_info) != 1: raise EncordException("Can't access upload location") - title = self._guess_title(title, file_path) + title = self._guess_title(title, file_path, text_contents) + data = Path(file_path) if file_path else text_contents + assert data is not None # Checked above self._upload_local_file( - file_path, + data, title, StorageItemType.PLAIN_TEXT, upload_url_info[0].signed_url, @@ -1374,15 +1383,29 @@ def _get_upload_signed_urls( return urls.results - def _guess_title(self, title: Optional[str], file_path: Union[Path, str]) -> str: + def _guess_title( + self, + title: Optional[str], + file_path: Optional[Union[Path, str]], + text_contents: Optional[Union[str, bytes, BufferedReader]] = None, + ) -> str: if title: return title + if not file_path and not text_contents: + raise ValueError("Require at least one of file_path, text_contents") + if file_path and text_contents: + raise ValueError("Require at most one of file_path, text_contents") + if file_path: + if isinstance(file_path, str): + file_path = Path(file_path) + return file_path.name + else: + assert text_contents + if isinstance(text_contents, BufferedReader): + return str(text_contents.peek(10)) # Ensure we **peek** as BufferedReader is stateful + return str(text_contents[:10]) - if isinstance(file_path, str): - file_path = Path(file_path) - return file_path.name - - def _get_content_type(self, file_path: Union[Path, str], item_type: StorageItemType) -> str: + def _get_content_type(self, file_path: Path, item_type: StorageItemType) -> str: if item_type == StorageItemType.IMAGE: return mimetypes.guess_type(str(file_path))[0] or "application/octet-stream" elif item_type == StorageItemType.VIDEO or item_type == StorageItemType.AUDIO: @@ -1410,14 +1433,12 @@ def _get_content_type(self, file_path: Union[Path, str], item_type: StorageItemT def _upload_local_file( self, - file_path: Union[Path, str], + data: Union[Path, str, bytes, BufferedReader], title: str, item_type: StorageItemType, signed_url: str, cloud_upload_settings: CloudUploadSettings = CloudUploadSettings(), ) -> None: - content_type = self._get_content_type(file_path, item_type) - max_retries = ( cloud_upload_settings.max_retries if cloud_upload_settings.max_retries is not None @@ -1428,15 +1449,28 @@ def _upload_local_file( if cloud_upload_settings.backoff_factor is not None else DEFAULT_REQUESTS_SETTINGS.backoff_factor ) - - _upload_single_file( - str(file_path), - title, - signed_url, - content_type, - max_retries=max_retries, - backoff_factor=backoff_factor, - ) + if isinstance(data, Path): + file_path = data + content_type = self._get_content_type(file_path, item_type) + with open(file_path, "rb") as file_obj: + _upload_single_file( + file_obj, + title, + signed_url, + content_type, + max_retries=max_retries, + backoff_factor=backoff_factor, + ) + else: + content_type = "text/plain" if item_type == StorageItemType.PLAIN_TEXT else "application/octet-stream" + _upload_single_file( + data, + title, + signed_url, + content_type, + max_retries=max_retries, + backoff_factor=backoff_factor, + ) def _add_data( self,