33
33
from murfey .client .watchdir import DirWatcher
34
34
from murfey .client .watchdir_multigrid import MultigridDirWatcher
35
35
from murfey .util import posix_path
36
- from murfey .util .api import url_path_for
37
36
from murfey .util .client import (
38
37
capture_delete ,
39
38
capture_get ,
@@ -153,7 +152,10 @@ def _start_rsyncer_multigrid(
153
152
log .info (f"starting multigrid rsyncer: { source } " )
154
153
destination_overrides = destination_overrides or {}
155
154
machine_data = capture_get (
156
- url = f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'machine_info_by_instrument' , instrument_name = instrument_name )} "
155
+ base_url = str (self ._environment .url .geturl ()),
156
+ router_name = "session_control.router" ,
157
+ function_name = "machine_info_by_instrument" ,
158
+ instrument_name = instrument_name ,
157
159
).json ()
158
160
if destination_overrides .get (source ):
159
161
destination = destination_overrides [source ] + f"/{ extra_directory } "
@@ -205,8 +207,13 @@ def _start_rsyncer(
205
207
log .info (f"starting rsyncer: { source } " )
206
208
if transfer :
207
209
# Always make sure the destination directory exists
208
- make_directory_url = f"{ str (self ._url .geturl ())} { url_path_for ('file_io_instrument.router' , 'make_rsyncer_destination' , session_id = self ._environment .murfey_session )} "
209
- capture_post (make_directory_url , json = {"destination" : destination })
210
+ capture_post (
211
+ base_url = str (self ._url .geturl ()),
212
+ router_name = "file_io_instrument.router" ,
213
+ function_name = "make_rsyncer_destination" ,
214
+ session_id = self ._environment .murfey_session ,
215
+ data = {"destination" : destination },
216
+ )
210
217
if self ._environment :
211
218
self ._environment .default_destinations [source ] = destination
212
219
if self ._environment .gain_ref and visit_path :
@@ -271,14 +278,19 @@ def rsync_result(update: RSyncerUpdate):
271
278
),
272
279
secondary = True ,
273
280
)
274
- url = f"{ str (self ._url .geturl ())} { url_path_for ('session_control.router' , 'register_rsyncer' , session_id = self ._environment .murfey_session )} "
275
281
rsyncer_data = {
276
282
"source" : str (source ),
277
283
"destination" : destination ,
278
284
"session_id" : self ._environment .murfey_session ,
279
285
"transferring" : self ._do_transfer ,
280
286
}
281
- capture_post (url = url , json = rsyncer_data )
287
+ capture_post (
288
+ base_url = str (self ._url .geturl ()),
289
+ router_name = "session_control.router" ,
290
+ function_name = "register_rsyncer" ,
291
+ session_id = self ._environment .murfey_session ,
292
+ data = rsyncer_data ,
293
+ )
282
294
283
295
self ._environment .watchers [source ] = DirWatcher (source , settling_time = 30 )
284
296
@@ -341,7 +353,6 @@ def _increment_file_count(
341
353
self , observed_files : List [Path ], source : str , destination : str
342
354
):
343
355
if len (observed_files ):
344
- url = f"{ str (self ._url .geturl ())} { url_path_for ('prometheus.router' , 'increment_rsync_file_count' , visit_name = self ._visit )} "
345
356
num_data_files = len (
346
357
[
347
358
f
@@ -357,15 +368,20 @@ def _increment_file_count(
357
368
"increment_count" : len (observed_files ),
358
369
"increment_data_count" : num_data_files ,
359
370
}
360
- capture_post (url = url , json = data )
371
+ capture_post (
372
+ base_url = str (self ._url .geturl ()),
373
+ router_name = "prometheus.router" ,
374
+ function_name = "increment_rsync_file_count" ,
375
+ visit_name = self ._visit ,
376
+ data = data ,
377
+ )
361
378
362
379
# Prometheus can handle higher traffic so update for every transferred file rather
363
380
# than batching as we do for the Murfey database updates in _increment_transferred_files
364
381
def _increment_transferred_files_prometheus (
365
382
self , update : RSyncerUpdate , source : str , destination : str
366
383
):
367
384
if update .outcome is TransferResult .SUCCESS :
368
- url = f"{ str (self ._url .geturl ())} { url_path_for ('prometheus.router' , 'increment_rsync_transferred_files_prometheus' , visit_name = self ._visit )} "
369
385
data_files = (
370
386
[update ]
371
387
if update .file_path .suffix in self ._data_suffixes
@@ -384,7 +400,13 @@ def _increment_transferred_files_prometheus(
384
400
"increment_data_count" : len (data_files ),
385
401
"data_bytes" : sum (f .file_size for f in data_files ),
386
402
}
387
- capture_post (url = url , json = data )
403
+ capture_post (
404
+ base_url = str (self ._url .geturl ()),
405
+ router_name = "prometheus.router" ,
406
+ function_name = "increment_rsync_transferred_files_prometheus" ,
407
+ visit_name = self ._visit ,
408
+ data = data ,
409
+ )
388
410
389
411
def _increment_transferred_files (
390
412
self , updates : List [RSyncerUpdate ], source : str , destination : str
@@ -394,7 +416,6 @@ def _increment_transferred_files(
394
416
]
395
417
if not checked_updates :
396
418
return
397
- url = f"{ str (self ._url .geturl ())} { url_path_for ('prometheus.router' , 'increment_rsync_transferred_files' , visit_name = self ._visit )} "
398
419
data_files = [
399
420
u
400
421
for u in updates
@@ -412,7 +433,13 @@ def _increment_transferred_files(
412
433
"increment_data_count" : len (data_files ),
413
434
"data_bytes" : sum (f .file_size for f in data_files ),
414
435
}
415
- capture_post (url = url , json = data )
436
+ capture_post (
437
+ base_url = str (self ._url .geturl ()),
438
+ router_name = "prometheus.router" ,
439
+ function_name = "increment_rsync_transferred_files" ,
440
+ visit_name = self ._visit ,
441
+ data = data ,
442
+ )
416
443
417
444
def _set_register_dc (self , response : str ):
418
445
if response == "y" :
@@ -488,8 +515,12 @@ def _start_dc(self, metadata_json, from_form: bool = False):
488
515
log .info ("Registering tomography processing parameters" )
489
516
if context .data_collection_parameters .get ("num_eer_frames" ):
490
517
eer_response = capture_post (
491
- url = f"{ str (self .app ._environment .url .geturl ())} { url_path_for ('file_io_instrument.router' , 'write_eer_fractionation_file' , visit_name = self .app ._environment .visit , session_id = self .app ._environment .murfey_session )} " ,
492
- json = {
518
+ base_url = str (self .app ._environment .url .geturl ()),
519
+ router_name = "file_io_instrument.router" ,
520
+ function_name = "write_eer_fractionation_file" ,
521
+ visit_name = self .app ._environment .visit ,
522
+ session_id = self .app ._environment .murfey_session ,
523
+ data = {
493
524
"num_frames" : context .data_collection_parameters [
494
525
"num_eer_frames"
495
526
],
@@ -501,16 +532,22 @@ def _start_dc(self, metadata_json, from_form: bool = False):
501
532
eer_fractionation_file = eer_response .json ()["eer_fractionation_file" ]
502
533
metadata_json .update ({"eer_fractionation_file" : eer_fractionation_file })
503
534
capture_post (
504
- url = f"{ self .app ._environment .url .geturl ()} { url_path_for ('workflow.tomo_router' , 'register_tomo_proc_params' , session_id = self .app ._environment .murfey_session )} " ,
505
- json = metadata_json ,
535
+ base_url = str (self .app ._environment .url .geturl ()),
536
+ router_name = "workflow.tomo_router" ,
537
+ function_name = "register_tomo_proc_params" ,
538
+ session_id = self .app ._environment .murfey_session ,
539
+ data = metadata_json ,
506
540
)
507
541
capture_post (
508
- f"{ self .app ._environment .url .geturl ()} { url_path_for ('workflow.tomo_router' , 'flush_tomography_processing' , visit_name = self ._visit , session_id = self .app ._environment .murfey_session )} " ,
509
- json = {"rsync_source" : str (source )},
542
+ base_url = str (self .app ._environment .url .geturl ()),
543
+ router_name = "workflow.tomo_router" ,
544
+ function_name = "flush_tomography_processing" ,
545
+ visit_name = self ._visit ,
546
+ session_id = self .app ._environment .murfey_session ,
547
+ data = {"rsync_source" : str (source )},
510
548
)
511
549
log .info ("Tomography processing flushed" )
512
550
elif isinstance (context , SPAModularContext ):
513
- url = f"{ str (self ._url .geturl ())} { url_path_for ('workflow.router' , 'register_dc_group' , visit_name = self ._visit , session_id = self ._environment .murfey_session )} "
514
551
dcg_data = {
515
552
"experiment_type" : "single particle" ,
516
553
"experiment_type_id" : 37 ,
@@ -526,7 +563,14 @@ def _start_dc(self, metadata_json, from_form: bool = False):
526
563
else None
527
564
),
528
565
}
529
- capture_post (url , json = dcg_data )
566
+ capture_post (
567
+ base_url = str (self ._url .geturl ()),
568
+ router_name = "workflow.router" ,
569
+ function_name = "register_dc_group" ,
570
+ visit_name = self ._visit ,
571
+ session_id = self ._environment .murfey_session ,
572
+ data = dcg_data ,
573
+ )
530
574
if from_form :
531
575
data = {
532
576
"voltage" : metadata_json ["voltage" ],
@@ -549,8 +593,12 @@ def _start_dc(self, metadata_json, from_form: bool = False):
549
593
"phase_plate" : metadata_json .get ("phase_plate" , False ),
550
594
}
551
595
capture_post (
552
- f"{ str (self ._url .geturl ())} { url_path_for ('workflow.router' , 'start_dc' , visit_name = self ._visit , session_id = self ._environment .murfey_session )} " ,
553
- json = data ,
596
+ base_url = str (self ._url .geturl ()),
597
+ router_name = "workflow.router" ,
598
+ function_name = "start_dc" ,
599
+ visit_name = self ._visit ,
600
+ session_id = self ._environment .murfey_session ,
601
+ data = data ,
554
602
)
555
603
for recipe in (
556
604
"em-spa-preprocess" ,
@@ -560,17 +608,24 @@ def _start_dc(self, metadata_json, from_form: bool = False):
560
608
"em-spa-refine" ,
561
609
):
562
610
capture_post (
563
- f"{ str (self ._url .geturl ())} { url_path_for ('workflow.router' , 'register_proc' , visit_name = self ._visit , session_id = self ._environment .murfey_session )} " ,
564
- json = {
611
+ base_url = str (self ._url .geturl ()),
612
+ router_name = "workflow.router" ,
613
+ function_name = "register_proc" ,
614
+ visit_name = self ._visit ,
615
+ session_id = self ._environment .murfey_session ,
616
+ data = {
565
617
"tag" : str (source ),
566
618
"source" : str (source ),
567
619
"recipe" : recipe ,
568
620
},
569
621
)
570
622
log .info (f"Posting SPA processing parameters: { metadata_json } " )
571
623
response = capture_post (
572
- f"{ self .app ._environment .url .geturl ()} { url_path_for ('workflow.spa_router' , 'register_spa_proc_params' , session_id = self .app ._environment .murfey_session )} " ,
573
- json = {
624
+ base_url = str (self .app ._environment .url .geturl ()),
625
+ router_name = "workflow.spa_router" ,
626
+ function_name = "register_spa_proc_params" ,
627
+ session_id = self .app ._environment .murfey_session ,
628
+ data = {
574
629
** {
575
630
k : None if v == "None" else v
576
631
for k , v in metadata_json .items ()
@@ -586,8 +641,12 @@ def _start_dc(self, metadata_json, from_form: bool = False):
586
641
if not str (response .status_code ).startswith ("2" ):
587
642
log .warning (f"{ response .reason } " )
588
643
capture_post (
589
- f"{ self .app ._environment .url .geturl ()} { url_path_for ('workflow.spa_router' , 'flush_spa_processing' , visit_name = self .app ._environment .visit , session_id = self .app ._environment .murfey_session )} " ,
590
- json = {"tag" : str (source )},
644
+ base_url = str (self .app ._environment .url .geturl ()),
645
+ router_name = "workflow.spa_router" ,
646
+ function_name = "flush_spa_processing" ,
647
+ visit_name = self .app ._environment .visit ,
648
+ session_id = self .app ._environment .murfey_session ,
649
+ data = {"tag" : str (source )},
591
650
)
592
651
593
652
def _set_request_destination (self , response : str ):
@@ -618,7 +677,9 @@ async def on_button_pressed(self, event: Button.Pressed):
618
677
619
678
async def on_mount (self ) -> None :
620
679
exisiting_sessions = capture_get (
621
- url = f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'get_sessions' )} "
680
+ base_url = str (self ._environment .url .geturl ()),
681
+ router_name = "session_control.router" ,
682
+ function_name = "get_sessions" ,
622
683
).json ()
623
684
if self .visits :
624
685
self .install_screen (VisitSelection (self .visits ), "visit-select-screen" )
@@ -645,8 +706,12 @@ async def on_mount(self) -> None:
645
706
else :
646
707
session_name = "Client connection"
647
708
resp = capture_post (
648
- f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'link_client_to_session' , instrument_name = self ._environment .instrument_name , client_id = self ._environment .client_id )} " ,
649
- json = {"session_id" : None , "session_name" : session_name },
709
+ base_url = str (self ._environment .url .geturl ()),
710
+ router_name = "session_control.router" ,
711
+ function_name = "link_client_to_session" ,
712
+ instrument_name = self ._environment .instrument_name ,
713
+ client_id = self ._environment .client_id ,
714
+ data = {"session_id" : None , "session_name" : session_name },
650
715
)
651
716
if resp :
652
717
self ._environment .murfey_session = resp .json ()
@@ -664,7 +729,10 @@ async def reset(self):
664
729
sources = "\n " .join (str (k ) for k in self .rsync_processes .keys ())
665
730
prompt = f"Remove files from the following:\n { sources } \n "
666
731
rsync_instances = capture_get (
667
- url = f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'get_rsyncers_for_session' , session_id = self ._environment .murfey_session )} "
732
+ base_url = str (self ._environment .url .geturl ()),
733
+ router_name = "session_control.router" ,
734
+ function_name = "get_rsyncers_for_session" ,
735
+ session_id = self ._environment .murfey_session ,
668
736
).json ()
669
737
prompt += f"Copied { sum (r ['files_counted' ] for r in rsync_instances )} / { sum (r ['files_transferred' ] for r in rsync_instances )} "
670
738
self .install_screen (
@@ -688,7 +756,10 @@ async def action_quit(self) -> None:
688
756
689
757
async def action_remove_session (self ) -> None :
690
758
capture_delete (
691
- url = f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'remove_session' , session_id = self ._environment .murfey_session )} "
759
+ base_url = str (self ._environment .url .geturl ()),
760
+ router_name = "session_control.router" ,
761
+ function_name = "remove_session" ,
762
+ session_id = self ._environment .murfey_session ,
692
763
)
693
764
if self .rsync_processes :
694
765
for rp in self .rsync_processes .values ():
@@ -702,7 +773,10 @@ async def action_remove_session(self) -> None:
702
773
703
774
def clean_up_quit (self ) -> None :
704
775
capture_delete (
705
- url = f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'remove_session' , session_id = self ._environment .murfey_session )} "
776
+ base_url = str (self ._environment .url .geturl ()),
777
+ router_name = "session_control.router" ,
778
+ function_name = "remove_session" ,
779
+ session_id = self ._environment .murfey_session ,
706
780
)
707
781
self .exit ()
708
782
@@ -745,10 +819,16 @@ def _remove_data(self, listener: Callable[..., Awaitable[None] | None], **kwargs
745
819
removal_rp .stop ()
746
820
log .info (f"rsyncer { rp } rerun with removal" )
747
821
capture_post (
748
- url = f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'register_processing_success_in_ispyb' , session_id = self ._environment .murfey_session )} "
822
+ base_url = str (self ._environment .url .geturl ()),
823
+ router_name = "session_control.router" ,
824
+ function_name = "register_processing_success_in_ispyb" ,
825
+ session_id = self ._environment .murfey_session ,
749
826
)
750
827
capture_delete (
751
- url = f"{ self ._environment .url .geturl ()} { url_path_for ('session_control.router' , 'remove_session' , session_id = self ._environment .murfey_session )} "
828
+ base_url = str (self ._environment .url .geturl ()),
829
+ router_name = "session_control.router" ,
830
+ function_name = "remove_session" ,
831
+ session_id = self ._environment .murfey_session ,
752
832
)
753
833
self .exit ()
754
834
0 commit comments