16
16
from inference .core .utils .image_utils import load_image_bgr
17
17
from inference_cli .lib .utils import dump_jsonl
18
18
from inference_cli .lib .workflows .common import deduct_images , dump_objects_to_json
19
- from inference_cli .lib .workflows .entities import OutputFileType
19
+ from inference_cli .lib .workflows .entities import OutputFileType , VideoProcessingDetails
20
20
21
21
22
22
def process_video_with_workflow (
@@ -31,10 +31,11 @@ def process_video_with_workflow(
31
31
max_fps : Optional [float ] = None ,
32
32
save_image_outputs_as_video : bool = True ,
33
33
api_key : Optional [str ] = None ,
34
- ) -> None :
34
+ ) -> VideoProcessingDetails :
35
35
structured_sink = WorkflowsStructuredDataSink (
36
36
output_directory = output_directory ,
37
37
output_file_type = output_file_type ,
38
+ numbers_of_streams = 1 ,
38
39
)
39
40
progress_sink = ProgressSink .init (input_video_path = input_video_path )
40
41
sinks = [structured_sink .on_prediction , progress_sink .on_prediction ]
@@ -61,9 +62,14 @@ def process_video_with_workflow(
61
62
pipeline .start (use_main_thread = True )
62
63
pipeline .join ()
63
64
progress_sink .stop ()
64
- structured_sink .flush ()
65
+ structured_results_file = structured_sink .flush ()[0 ]
66
+ video_outputs = None
65
67
if video_sink is not None :
66
- video_sink .release ()
68
+ video_outputs = video_sink .release ()
69
+ return VideoProcessingDetails (
70
+ structured_results_file = structured_results_file ,
71
+ video_outputs = video_outputs ,
72
+ )
67
73
68
74
69
75
class WorkflowsStructuredDataSink :
@@ -72,10 +78,12 @@ def __init__(
72
78
self ,
73
79
output_directory : str ,
74
80
output_file_type : OutputFileType ,
81
+ numbers_of_streams : int = 1 ,
75
82
):
76
83
self ._output_directory = output_directory
77
84
self ._structured_results_buffer = defaultdict (list )
78
85
self ._output_file_type = output_file_type
86
+ self ._numbers_of_streams = numbers_of_streams
79
87
80
88
def on_prediction (
81
89
self ,
@@ -94,11 +102,17 @@ def on_prediction(
94
102
}
95
103
self ._structured_results_buffer [stream_idx ].append (prediction )
96
104
97
- def flush (self ) -> None :
105
+ def flush (self ) -> List [Optional [str ]]:
106
+ stream_idx2file_path = {}
98
107
for stream_idx , buffer in self ._structured_results_buffer .items ():
99
- self ._flush_stream_buffer (stream_idx = stream_idx )
100
-
101
- def _flush_stream_buffer (self , stream_idx : int ) -> None :
108
+ file_path = self ._flush_stream_buffer (stream_idx = stream_idx )
109
+ stream_idx2file_path [stream_idx ] = file_path
110
+ return [
111
+ stream_idx2file_path .get (stream_idx )
112
+ for stream_idx in range (self ._numbers_of_streams )
113
+ ]
114
+
115
+ def _flush_stream_buffer (self , stream_idx : int ) -> Optional [str ]:
102
116
content = self ._structured_results_buffer [stream_idx ]
103
117
if len (content ) == 0 :
104
118
return None
@@ -114,6 +128,7 @@ def _flush_stream_buffer(self, stream_idx: int) -> None:
114
128
else :
115
129
dump_jsonl (path = file_path , content = content )
116
130
self ._structured_results_buffer [stream_idx ] = []
131
+ return file_path
117
132
118
133
def __del__ (self ):
119
134
self .flush ()
@@ -182,11 +197,14 @@ def on_prediction(
182
197
image = load_image_bgr (value )
183
198
stream_sinks [key ].write_frame (frame = image )
184
199
185
- def release (self ) -> None :
186
- for stream_sinks in self ._video_sinks .values ():
187
- for sink in stream_sinks .values ():
200
+ def release (self ) -> Optional [Dict [str , str ]]:
201
+ stream_idx2keys_videos : Dict [int , Dict [str , str ]] = defaultdict (dict )
202
+ for stream_idx , stream_sinks in self ._video_sinks .items ():
203
+ for key , sink in stream_sinks .items ():
188
204
sink .release ()
205
+ stream_idx2keys_videos [stream_idx ][key ] = sink .target_path
189
206
self ._video_sinks = defaultdict (dict )
207
+ return stream_idx2keys_videos .get (0 )
190
208
191
209
def __del__ (self ):
192
210
self .release ()
0 commit comments