diff --git a/pyproject.toml b/pyproject.toml index 262cb7c8..8504aabe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ license = "Apache-2.0" dependencies = [ "requests ~= 2.31", "boto3 >= 1.34.75", - "deadline >= 0.54.0,< 0.55", + "deadline >= 0.55,< 0.56", # Pinned to patch version due to Host Config Script runner usage of private OpenJD Sessions API. "openjd-sessions == 0.10.7", "openjd-model >= 0.8.1, < 0.10", diff --git a/src/deadline_worker_agent/aws/deadline/__init__.py b/src/deadline_worker_agent/aws/deadline/__init__.py index 6380ba37..072bb61e 100644 --- a/src/deadline_worker_agent/aws/deadline/__init__.py +++ b/src/deadline_worker_agent/aws/deadline/__init__.py @@ -841,6 +841,15 @@ def _record_attachment_download_filesystem_event(queue_id: str, file_system: str ) +def record_vfs_mount_telemetry_event(successfully_mounted: bool) -> None: + """Calls the telemetry client to record whether a VFS mount succeeded.""" + details: Dict[str, Any] = {"successfully_mounted": successfully_mounted} + _get_deadline_telemetry_client().record_event( + event_type="com.amazon.rum.deadline.job_attachments.vfs_mount", + event_details=details, + ) + + def record_sync_inputs_telemetry_event(queue_id: str, summary: SummaryStatistics) -> None: """Calls the telemetry client to record an event capturing the sync-inputs summary.""" details: Dict[str, Any] = asdict(summary) diff --git a/src/deadline_worker_agent/sessions/actions/run_attachment_download.py b/src/deadline_worker_agent/sessions/actions/run_attachment_download.py index bfc34ccd..074a2f4d 100644 --- a/src/deadline_worker_agent/sessions/actions/run_attachment_download.py +++ b/src/deadline_worker_agent/sessions/actions/run_attachment_download.py @@ -29,7 +29,10 @@ WindowsPermissionEnum, ) from deadline.job_attachments._utils import _get_unique_dest_dir_name -from deadline_worker_agent.aws.deadline import _record_attachment_download_filesystem_event +from deadline_worker_agent.aws.deadline import ( + _record_attachment_download_filesystem_event, + record_vfs_mount_telemetry_event, +) from openjd.sessions import ( LOG as OPENJD_LOG, @@ -425,6 +428,7 @@ def _start_vfs( fs_permission_settings=fs_permission_settings, merged_manifests_by_root=merged_manifests_by_root, os_env_vars=dict(session._env), # type: ignore + on_mount_complete=record_vfs_mount_telemetry_event, ) else: return False diff --git a/src/deadline_worker_agent/sessions/actions/scripts/attachment_upload.py b/src/deadline_worker_agent/sessions/actions/scripts/attachment_upload.py index 6983d665..60e8e355 100644 --- a/src/deadline_worker_agent/sessions/actions/scripts/attachment_upload.py +++ b/src/deadline_worker_agent/sessions/actions/scripts/attachment_upload.py @@ -49,6 +49,10 @@ _queue_id = os.environ.get("DEADLINE_QUEUE_ID", "queue-unknown") # Just for telemetry +# Configuration constants for the S3AssetUploader used during output uploads. +S3_MAX_POOL_CONNECTIONS = 50 +SMALL_FILE_THRESHOLD_MULTIPLIER = 20 + F = TypeVar("F", bound=Callable[..., Any]) @@ -288,7 +292,10 @@ def upload_output_assets( s3_settings = JobAttachmentS3Settings.from_s3_root_uri(s3_uri) # Initialize the S3 asset uploader - asset_uploader: S3AssetUploader = S3AssetUploader() + asset_uploader: S3AssetUploader = S3AssetUploader( + s3_max_pool_connections=S3_MAX_POOL_CONNECTIONS, + small_file_threshold_multiplier=SMALL_FILE_THRESHOLD_MULTIPLIER, + ) output_manifest_info_list = [] all_upload_summaries = [] diff --git a/test/unit/sessions/actions/scripts/test_attachment_upload.py b/test/unit/sessions/actions/scripts/test_attachment_upload.py index c0a59caa..7dad2b5c 100644 --- a/test/unit/sessions/actions/scripts/test_attachment_upload.py +++ b/test/unit/sessions/actions/scripts/test_attachment_upload.py @@ -646,6 +646,7 @@ def test_failure_telemetry_decorator_on_upload_output_assets( }, ) # Mock the environment variables @patch.object(attachment_upload_mod, "record_attachment_upload_latencies_telemetry_event") + @patch.object(attachment_upload_mod, "upload_output_assets") @patch.object(attachment_upload_mod, "snapshot") @patch.object(attachment_upload_mod, "merge") @patch.object(attachment_upload_mod, "parse_worker_manifest_properties") @@ -654,6 +655,7 @@ def test_latencies_telemetry( mock_parse: Mock, mock_merge: Mock, mock_snapshot: Mock, + mock_upload: Mock, mock_latencies_telemetry: Mock, root_path_to_output_manifest: Optional[Mock], ): diff --git a/test/unit/sessions/actions/test_run_attachment_download.py b/test/unit/sessions/actions/test_run_attachment_download.py index 012eeee3..9504b459 100644 --- a/test/unit/sessions/actions/test_run_attachment_download.py +++ b/test/unit/sessions/actions/test_run_attachment_download.py @@ -496,6 +496,7 @@ def test_start_vfs_calling_asset_sync( fs_permission_settings=ANY, merged_manifests_by_root=merged_manifests_by_root, os_env_vars=session._env, + on_mount_complete=ANY, ) def test_start_vfs_windows_platform(