Skip to content

Commit

Permalink
feat(LAB-2828): manage native video imports asynchronously (#1699)
Browse files Browse the repository at this point in the history
  • Loading branch information
BlueGrizzliBear authored Jun 6, 2024
1 parent 1299580 commit 72802b7
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 46 deletions.
39 changes: 36 additions & 3 deletions src/kili/services/asset_import/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import warnings
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
from json import dumps
from json import dumps, loads
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand All @@ -22,7 +22,7 @@
)
from uuid import uuid4

from tenacity import Retrying
from tenacity import Retrying, stop_after_attempt, stop_never
from tenacity.retry import retry_if_exception_type
from tenacity.wait import wait_exponential

Expand Down Expand Up @@ -130,6 +130,8 @@ def verify_batch_imported(self, assets: List):
" number of assets to be processed by the server."
)
for attempt in Retrying(
# adds at least a limit for asynch imports to the number of time to wait = ~ 5 minutes
stop=stop_after_attempt(40) if self.is_asynchronous else stop_never,
retry=retry_if_exception_type(BatchImportError),
wait=wait_exponential(multiplier=1, min=1, max=8),
before_sleep=RetryLongWaitWarner(logger_func=logger_func, warn_message=log_message),
Expand Down Expand Up @@ -207,9 +209,40 @@ def build_url_from_parts(*parts) -> str:
"""Builds an url from the parts."""
return "/".join(parts)

@staticmethod
def are_native_videos(assets) -> bool:
"""Determine if assets should be imported asynchronously and cut into frames."""
should_use_native_video_array = []
for asset in assets:
json_metadata = asset.get("json_metadata", "{}")
json_metadata_ = loads(json_metadata)
processing_parameters = json_metadata_.get("processingParameters", {})
should_use_native_video_array.append(
processing_parameters.get("shouldUseNativeVideo", True)
)
if all(should_use_native_video_array):
return True
if all(not b for b in should_use_native_video_array):
return False
raise ImportValidationError(
"""
Cannot upload videos to split into frames
and video to keep as native in the same time.
Please separate the assets into 2 calls
"""
)

def _async_import_to_kili(self, assets: List[KiliResolverAsset]):
"""Import assets with asynchronous resolver."""
upload_type = "GEO_SATELLITE" if self.input_type == "IMAGE" else "VIDEO"
if self.input_type == "IMAGE":
upload_type = "GEO_SATELLITE"
elif self.input_type in ("VIDEO", "VIDEO_LEGACY"):
upload_type = "NATIVE_VIDEO" if self.are_native_videos(assets) else "FRAME_VIDEO"
else:
raise NotImplementedError(
f"Import of {self.input_type} assets is not supported in async calls"
)

content = (
{
"multiLayerContentArray": [asset["multi_layer_content"] for asset in assets],
Expand Down
28 changes: 2 additions & 26 deletions src/kili/services/asset_import/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,44 +162,20 @@ def get_data_type(self, assets):
return VideoDataType.HOSTED_FILE
return VideoDataType.LOCAL_FILE

@staticmethod
def should_cut_into_frames(assets) -> bool:
"""Determine if assets should be imported asynchronously and cut into frames."""
should_use_native_video_array = []
for asset in assets:
json_metadata = asset.get("json_metadata", {})
processing_parameters = json_metadata.get("processingParameters", {})
should_use_native_video_array.append(
processing_parameters.get("shouldUseNativeVideo", True)
)
if all(should_use_native_video_array):
return False
if all(not b for b in should_use_native_video_array):
return True
raise ImportValidationError(
"""
Cannot upload videos to split into frames
and video to keep as native in the same time.
Please separate the assets into 2 calls
"""
)

def import_assets(self, assets: List[AssetLike]):
"""Import video assets into Kili."""
self._check_upload_is_allowed(assets)
data_type = self.get_data_type(assets)
assets = self.filter_duplicate_external_ids(assets)
if data_type == VideoDataType.LOCAL_FILE:
assets = self.filter_local_assets(assets, self.raise_error)
as_frames = self.should_cut_into_frames(assets)
batch_params = BatchParams(is_hosted=False, is_asynchronous=as_frames)
batch_params = BatchParams(is_hosted=False, is_asynchronous=True)
batch_importer = VideoContentBatchImporter(
self.kili, self.project_params, batch_params, self.pbar
)
batch_size = IMPORT_BATCH_SIZE
elif data_type == VideoDataType.HOSTED_FILE:
as_frames = self.should_cut_into_frames(assets)
batch_params = BatchParams(is_hosted=True, is_asynchronous=as_frames)
batch_params = BatchParams(is_hosted=True, is_asynchronous=True)
batch_importer = VideoContentBatchImporter(
self.kili, self.project_params, batch_params, self.pbar
)
Expand Down
29 changes: 12 additions & 17 deletions tests/unit/services/asset_import/test_import_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ def test_upload_from_one_local_video_file_to_native(self, *_):
}
}
)
expected_parameters = self.get_expected_sync_call(
expected_parameters = self.get_expected_async_call(
["https://signed_url?id=id"],
["local video file to native"],
["unique_id"],
[False],
[""],
[expected_json_metadata],
"NATIVE_VIDEO",
)
self.kili.graphql_client.execute.assert_called_with(*expected_parameters)

Expand All @@ -61,13 +60,12 @@ def test_upload_from_one_hosted_video_file_to_native(self, *_):
}
}
)
expected_parameters = self.get_expected_sync_call(
expected_parameters = self.get_expected_async_call(
["https://hosted-data"],
["hosted file"],
["unique_id"],
[False],
[""],
[expected_json_metadata],
"NATIVE_VIDEO",
)
self.kili.graphql_client.execute.assert_called_with(*expected_parameters)

Expand All @@ -89,13 +87,12 @@ def test_upload_from_one_hosted_video_authorized_while_local_forbidden(self, *_)
}
}
)
expected_parameters = self.get_expected_sync_call(
expected_parameters = self.get_expected_async_call(
["https://hosted-data"],
["hosted file"],
["unique_id"],
[False],
[""],
[expected_json_metadata],
"NATIVE_VIDEO",
)
self.kili.graphql_client.execute.assert_called_with(*expected_parameters)

Expand Down Expand Up @@ -136,7 +133,7 @@ def test_upload_one_local_video_to_frames(self, *_):
["local video to frames"],
["unique_id"],
[expected_json_metadata],
"VIDEO",
"FRAME_VIDEO",
)
self.kili.graphql_client.execute.assert_called_with(*expected_parameters)

Expand Down Expand Up @@ -170,7 +167,7 @@ def test_upload_one_hosted_video_to_frames(self, *_):
["changing fps"],
["unique_id"],
[expected_json_metadata],
"VIDEO",
"FRAME_VIDEO",
)
self.kili.graphql_client.execute.assert_called_with(*expected_parameters)

Expand Down Expand Up @@ -296,13 +293,12 @@ def test_import_one_video_with_metadata(self, *_):
},
}
)
expected_parameters = self.get_expected_sync_call(
expected_parameters = self.get_expected_async_call(
["https://hosted-data"],
["with metadata"],
["unique_id"],
[False],
[""],
[expected_json_metadata],
"NATIVE_VIDEO",
)
self.kili.graphql_client.execute.assert_called_with(*expected_parameters)

Expand Down Expand Up @@ -332,12 +328,11 @@ def test_upload_from_one_hosted_video_file_to_video_legacy_project(self, *_):
}
}
)
expected_parameters = self.get_expected_sync_call(
expected_parameters = self.get_expected_async_call(
["https://hosted-data"],
["hosted file"],
["unique_id"],
[False],
[""],
[expected_json_metadata],
"NATIVE_VIDEO",
)
self.kili.graphql_client.execute.assert_called_with(*expected_parameters)

0 comments on commit 72802b7

Please sign in to comment.