7
7
import ipywidgets as ipw
8
8
from progressivis .table .dshape import dataframe_dshape
9
9
from progressivis .vis import DataShape
10
- from progressivis .core import Sink
10
+ from progressivis .core import Sink , Scheduler
11
11
from progressivis .core import Module
12
12
from progressivis .table .table_facade import TableFacade
13
13
from progressivis .core .utils import normalize_columns
@@ -69,6 +69,47 @@ class Header:
69
69
modules_out : Sidecar
70
70
71
71
72
+ class Proxy :
73
+ def __init__ (self , carrier : "NodeCarrier" ) -> None :
74
+ self .__carrier = carrier
75
+
76
+ @property
77
+ def input_module (self ) -> ModuleOrFacade | None :
78
+ if self .__carrier is PARAMS ["constructor" ]:
79
+ return None
80
+ return self .__carrier ._input_module
81
+
82
+ @property
83
+ def input_slot (self ) -> str | None :
84
+ if self .__carrier is PARAMS ["constructor" ]:
85
+ return None
86
+ return self .__carrier ._input_slot
87
+
88
+ @property
89
+ def input_dtypes (self ) -> dict [str , str ] | None :
90
+ if self .__carrier is PARAMS ["constructor" ]:
91
+ return None
92
+ return self .__carrier ._dtypes
93
+
94
+ @property
95
+ def scheduler (self ) -> Scheduler :
96
+ if self .__carrier is PARAMS ["constructor" ]:
97
+ assert PARAMS ["constructor" ] is not None
98
+ return cast (Scheduler , PARAMS ["constructor" ].scheduler )
99
+ assert self .input_module is not None
100
+ return self .input_module .scheduler ()
101
+
102
+ def resume (self , output_module : ModuleOrFacade ,
103
+ output_slot : str = "result" ,
104
+ output_dtypes : dict [str , str ] | None = None ,
105
+ freeze : bool = False ) -> "NodeCarrier" :
106
+ self .__carrier ._output_module = output_module
107
+ self .__carrier ._output_slot = output_slot
108
+ self .__carrier ._output_dtypes = output_dtypes
109
+ self .__carrier .make_chaining_box ()
110
+ return self .__carrier
111
+
112
+
72
113
def get_header () -> Header :
73
114
"""
74
115
NB: call this function ONLY from the first cell of the notebook!!
@@ -357,11 +398,11 @@ def make_button(
357
398
358
399
359
400
stage_register : Dict [str , AnyType ] = {}
360
- parent_widget : Optional ["NodeVBox " ] = None
401
+ parent_widget : Optional ["NodeCarrier " ] = None
361
402
parent_dtypes : Optional [Dict [str , str ]] = None
362
403
key_by_id : Dict [int , Tuple [str , int ]] = {}
363
- widget_by_id : Dict [int , "NodeVBox " ] = {}
364
- widget_by_key : Dict [Tuple [str , int ], "NodeVBox " ] = {}
404
+ widget_by_id : Dict [int , "NodeCarrier " ] = {}
405
+ widget_by_key : Dict [Tuple [str , int ], "NodeCarrier " ] = {}
365
406
widget_numbers : Dict [str , int ] = defaultdict (int )
366
407
recording_state : bool = False
367
408
@@ -412,13 +453,16 @@ def create_loader_widget(
412
453
ctx = dict (parent = obj , dtypes = dtypes , input_module = obj ._output_module , dag = dag )
413
454
from .csv_loader import CsvLoaderW
414
455
from .parquet_loader import ParquetLoaderW
456
+ from .custom_loader import CustomLoaderW
415
457
416
- loader : Union [ CsvLoaderW , ParquetLoaderW ]
458
+ loader : CsvLoaderW | ParquetLoaderW | CustomLoaderW
417
459
if ftype == "csv" :
418
460
loader = CsvLoaderW ()
419
- else :
420
- assert ftype == "parquet"
461
+ elif ftype == "parquet" :
421
462
loader = ParquetLoaderW ()
463
+ else :
464
+ assert ftype == "custom"
465
+ loader = CustomLoaderW ()
422
466
if frozen is not None :
423
467
loader .frozen_kw = frozen
424
468
stage = NodeCarrier (ctx , loader )
@@ -435,11 +479,11 @@ def create_loader_widget(
435
479
return stage
436
480
437
481
438
- def get_widget_by_id (key : int ) -> "NodeVBox " :
482
+ def get_widget_by_id (key : int ) -> "NodeCarrier " :
439
483
return widget_by_id [key ]
440
484
441
485
442
- def get_widget_by_key (key : str , num : int ) -> "NodeVBox " :
486
+ def get_widget_by_key (key : str , num : int ) -> "NodeCarrier " :
443
487
return widget_by_key [(key , num )]
444
488
445
489
@@ -453,7 +497,7 @@ def set_recording_state(val: bool) -> None:
453
497
454
498
455
499
def _make_btn_start_loader (
456
- obj : "NodeVBox " , ftype : str , alias : WidgetType , frozen : AnyType = None
500
+ obj : "NodeCarrier " , ftype : str , alias : WidgetType , frozen : AnyType = None
457
501
) -> Callable [..., None ]:
458
502
def _cbk (btn : ipw .Button ) -> None :
459
503
global parent_widget
@@ -468,7 +512,7 @@ def _cbk(btn: ipw.Button) -> None:
468
512
469
513
470
514
def replay_start_loader (
471
- obj : "NodeVBox " , ftype : str , alias : str , frozen : AnyType | None = None
515
+ obj : "NodeCarrier " , ftype : str , alias : str , frozen : AnyType | None = None
472
516
) -> None :
473
517
global parent_widget
474
518
parent_widget = obj
@@ -477,7 +521,7 @@ def replay_start_loader(
477
521
478
522
479
523
def replay_new_stage (
480
- obj : "NodeVBox " , title : str , frozen : AnyType | None = None
524
+ obj : "NodeCarrier " , title : str , frozen : AnyType | None = None
481
525
) -> None :
482
526
class _FakeSel :
483
527
value : str
@@ -675,6 +719,58 @@ def get_previous(obj: "ChainingWidget") -> "ChainingWidget":
675
719
676
720
new_stage_cell_0 = "Constructor.widget('{key}'){end}"
677
721
new_stage_cell = "Constructor.widget('{key}', {num}){end}"
722
+ new_stage_cell_code = ("proxy = Constructor.proxy('{key}', {num})\n "
723
+ "# proxy object provides the following attributes:\n "
724
+ "# input_module: Module | TableFacade \n "
725
+ "# input_slot: str \n "
726
+ "# input_dtypes: dict[str, str] | None\n "
727
+ "# scheduler: Scheduler\n "
728
+ "# Warning: keep the code above unchanged\n "
729
+ "# Put your own code here\n "
730
+ "...\n "
731
+ "...\n "
732
+ "...\n "
733
+ "# fill in the following variables:\n "
734
+ "output_module: 'Module | TableFacade' = ...\n "
735
+ "output_slot: str = 'result'\n "
736
+ "output_dtypes: dict[str, str] | None = None\n "
737
+ "freeze: bool = False\n "
738
+ "# Warning: keep the code below unchanged\n "
739
+ "proxy.resume(output_module, output_slot, output_dtypes, freeze)"
740
+ "{end}"
741
+ )
742
+
743
+ new_loader_cell_code = ("proxy = Constructor.proxy('{key}', {num})\n "
744
+ "scheduler = proxy.scheduler\n "
745
+ "# Warning: keep the code above unchanged\n "
746
+ "# Put your own imports here\n "
747
+ "... \n "
748
+ "with scheduler:\n "
749
+ " # Put your own code here\n "
750
+ " ...\n "
751
+ " ...\n "
752
+ " # fill in the following variables:\n "
753
+ " output_module: 'Module | TableFacade' = ...\n "
754
+ " output_slot: str = 'result'\n "
755
+ " output_dtypes: dict[str, str] | None = None\n "
756
+ " freeze: bool = False\n "
757
+ " # Warning: keep the code below unchanged\n "
758
+ " display(proxy.resume(output_module, output_slot,"
759
+ " output_dtypes, freeze))"
760
+ "{end}"
761
+ )
762
+
763
+
764
+ def get_stage_cell (key : str , num : int , end : str ) -> tuple [str , bool , bool ]:
765
+ if key == "Python" :
766
+ return new_stage_cell_code .format (key = key , num = num , end = end ), True , False
767
+ return new_stage_cell .format (key = key , num = num , end = end ), False , True
768
+
769
+
770
+ def get_loader_cell (key : str , ftype : str , num : int , end : str ) -> tuple [str , bool , bool ]:
771
+ if ftype == "custom" :
772
+ return new_loader_cell_code .format (key = key , num = num , end = end ), True , False
773
+ return new_stage_cell .format (key = key , num = num , end = end ), False , True
678
774
679
775
680
776
def add_new_stage (parent : "ChainingWidget" , title : str , frozen : AnyType = None ) -> None :
@@ -686,8 +782,8 @@ def add_new_stage(parent: "ChainingWidget", title: str, frozen: AnyType = None)
686
782
if frozen is not None :
687
783
end = ".run()"
688
784
md = "## " + title + (f"[{ n } ]" if n else "" )
689
- code = new_stage_cell . format (key = title , num = n , end = end )
690
- labcommand ("progressivis:create_stage_cells" , tag = tag , md = md , code = code )
785
+ code , rw , run = get_stage_cell (key = title , num = n , end = end )
786
+ labcommand ("progressivis:create_stage_cells" , tag = tag , md = md , code = code , rw = rw , run = run )
691
787
add_to_record (dict (title = title , parent = parent_key ))
692
788
693
789
@@ -703,14 +799,10 @@ def add_new_loader(
703
799
end = ".run();"
704
800
if alias :
705
801
md = f"## { alias } "
706
- code = new_stage_cell_0 .format (key = alias , end = end )
707
802
else :
708
803
md = "## " + title + (f"[{ n } ]" if n else "" )
709
- if n :
710
- code = new_stage_cell .format (key = title , num = n , end = end )
711
- else :
712
- code = new_stage_cell_0 .format (key = title , end = end )
713
- labcommand ("progressivis:create_stage_cells" , tag = tag , md = md , code = code )
804
+ code , rw , run = get_loader_cell (key = alias or title , ftype = ftype , num = n , end = end )
805
+ labcommand ("progressivis:create_stage_cells" , tag = tag , md = md , code = code , rw = rw , run = run )
714
806
add_to_record (dict (ftype = ftype , alias = alias ))
715
807
716
808
0 commit comments