@@ -9,6 +9,7 @@ use std::{
99
1010use color_eyre:: { Result , eyre:: eyre} ;
1111use eyre:: Context ;
12+ use itertools:: Itertools ;
1213use reqwest:: Url ;
1314
1415use super :: api_client:: client:: { EdfSpec , ScriptSpec } ;
@@ -19,6 +20,7 @@ use crate::{
1920 client:: { CscsApi , JobStartOptions } ,
2021 types:: { FileStat , FileSystemType , Job , JobDetail , PathEntry , PathType , S3Upload , System , UserInfo } ,
2122 } ,
23+ cli:: upload_chunk,
2224 oauth2:: {
2325 CLIENT_ID_SECRET_NAME , CLIENT_SECRET_SECRET_NAME , client_credentials_login, finish_cscs_device_login,
2426 start_cscs_device_login,
@@ -201,14 +203,74 @@ async fn setup_ssh(
201203 }
202204}
203205
206+ async fn inject_coman_squash (
207+ api_client : & CscsApi ,
208+ base_path : & Path ,
209+ current_system : & str ,
210+ options : & JobStartOptions ,
211+ ) -> Result < Option < PathBuf > > {
212+ if options. no_coman {
213+ return Ok ( None ) ;
214+ }
215+ let config = Config :: new ( ) . unwrap ( ) ;
216+ let local_squash_path = match config. values . coman_squash_path . clone ( ) {
217+ Some ( path) => path,
218+ None => todo ! ( ) ,
219+ } ;
220+ let target = base_path. join ( "coman.sqsh" ) ;
221+ let file_meta = std:: fs:: metadata ( local_squash_path. clone ( ) ) ?;
222+
223+ #[ cfg( target_family = "unix" ) ]
224+ let size = file_meta. size ( ) as usize ;
225+
226+ #[ cfg( target_family = "windows" ) ]
227+ let size = file_meta. file_size ( ) as usize ;
228+
229+ //upload squash file
230+ let transfer_data = api_client
231+ . transfer_upload ( current_system, config. values . cscs . account , target. clone ( ) , size as i64 )
232+ . await ?;
233+ let mut etags: Vec < String > = Vec :: new ( ) ;
234+ let client = reqwest:: Client :: new ( ) ;
235+ let num_parts = transfer_data. 1 . num_parts ;
236+ for ( chunk_id, transfer_url) in transfer_data. 1 . parts_upload_urls . into_iter ( ) . enumerate ( ) {
237+ println ! (
238+ "Uploading part {}/{} ({}Mb)" ,
239+ chunk_id + 1 ,
240+ num_parts,
241+ transfer_data. 1 . part_size / 1024 / 1024
242+ ) ;
243+ let etag = upload_chunk (
244+ local_squash_path. clone ( ) ,
245+ ( chunk_id as u64 ) * transfer_data. 1 . part_size ,
246+ transfer_data. 1 . part_size ,
247+ transfer_url,
248+ )
249+ . await ?;
250+ etags. push ( etag) ;
251+ }
252+
253+ let body = etags
254+ . into_iter ( )
255+ . enumerate ( )
256+ . map ( |( i, etag) | ( i + 1 , etag) )
257+ . map ( |( i, etag) | format ! ( "<Part><PartNumber>{}</PartNumber><ETag>{}</ETag></Part>" , i, etag) )
258+ . join ( "" ) ;
259+ let body = format ! ( "<CompleteMultipartUpload>{}</CompleteMultipartUpload>" , body) ;
260+ let req = client. post ( transfer_data. 1 . complete_upload_url ) . body ( body) . build ( ) ?;
261+ let resp = client. execute ( req) . await ?;
262+ resp. error_for_status ( ) ?;
263+ Ok ( Some ( target) )
264+ }
265+
204266async fn handle_edf (
205267 api_client : & CscsApi ,
206268 base_path : & Path ,
207269 current_system : & str ,
208270 envvars : & HashMap < String , String > ,
209271 workdir : & str ,
210272 options : & JobStartOptions ,
211- ) -> Result < PathBuf > {
273+ ) -> Result < ( PathBuf , Option < PathBuf > ) > {
212274 let config = Config :: new ( ) . unwrap ( ) ;
213275 let environment_path = base_path. join ( "environment.toml" ) ;
214276 match options. edf_spec . clone ( ) {
@@ -248,21 +310,23 @@ async fn handle_edf(
248310 }
249311
250312 let ssh_path = setup_ssh ( api_client, base_path, current_system, options) . await ?;
313+ let coman_squash = inject_coman_squash ( api_client, base_path, current_system, options) . await ?;
251314
252315 let mut context = tera:: Context :: new ( ) ;
253316 context. insert ( "edf_image" , & docker_image. to_edf ( ) ) ;
254317 context. insert ( "container_workdir" , & workdir) ;
255318 context. insert ( "env" , & envvars) ;
256319 context. insert ( "mount" , & mount) ;
257320 context. insert ( "ssh_public_key" , & ssh_path) ;
321+ context. insert ( "coman_squash" , & coman_squash) ;
258322
259323 let environment_file = tera. render ( "environment.toml" , & context) ?;
260324 api_client. mkdir ( current_system, base_path. to_path_buf ( ) ) . await ?;
261325 api_client. chmod ( current_system, base_path. to_path_buf ( ) , "700" ) . await ?;
262326 api_client
263327 . upload ( current_system, environment_path. clone ( ) , environment_file. into_bytes ( ) )
264328 . await ?;
265- Ok ( environment_path)
329+ Ok ( ( environment_path, coman_squash ) )
266330 }
267331 EdfSpec :: Local ( local_path) => {
268332 let environment_file = std:: fs:: read_to_string ( local_path. clone ( ) ) ?;
@@ -271,17 +335,20 @@ async fn handle_edf(
271335 api_client
272336 . upload ( current_system, environment_path. clone ( ) , environment_file. into_bytes ( ) )
273337 . await ?;
274- Ok ( environment_path)
338+ Ok ( ( environment_path, None ) )
275339 }
276- EdfSpec :: Remote ( path) => Ok ( path) ,
340+ EdfSpec :: Remote ( path) => Ok ( ( path, None ) ) ,
277341 }
278342}
343+
344+ #[ allow( clippy:: too_many_arguments) ]
279345async fn handle_script (
280346 api_client : & CscsApi ,
281347 job_name : & str ,
282348 base_path : & Path ,
283349 current_system : & str ,
284350 environment_path : & Path ,
351+ coman_squash : Option < PathBuf > ,
285352 workdir : & str ,
286353 options : & JobStartOptions ,
287354) -> Result < PathBuf > {
@@ -300,6 +367,9 @@ async fn handle_script(
300367 ) ;
301368 context. insert ( "environment_file" , & environment_path. to_path_buf ( ) ) ;
302369 context. insert ( "container_workdir" , & workdir) ;
370+ if let Some ( path) = coman_squash {
371+ context. insert ( "coman_squash" , & path) ;
372+ }
303373 let script = tera. render ( "script.sh" , & context) ?;
304374 api_client
305375 . upload ( current_system, script_path. clone ( ) , script. into_bytes ( ) )
@@ -360,7 +430,7 @@ pub async fn cscs_job_start(
360430 let mut envvars = config. values . cscs . env . clone ( ) ;
361431 envvars. extend ( options. env . clone ( ) ) ;
362432
363- let environment_path = handle_edf (
433+ let ( environment_path, coman_squash ) = handle_edf (
364434 & api_client,
365435 & base_path,
366436 current_system,
@@ -376,6 +446,7 @@ pub async fn cscs_job_start(
376446 & base_path,
377447 current_system,
378448 & environment_path,
449+ coman_squash,
379450 & container_workdir,
380451 & options,
381452 )
0 commit comments