@@ -2,10 +2,7 @@ use crate::{
22 AVCodecContext , AVFormatContext , AVFrame , AVPacket , AVPixelFormat , AVRational , AVStreamRef , Error , Result , SwsContext , VideoStreamFormat ,
33} ;
44use std:: {
5- sync:: {
6- atomic:: { AtomicBool , Ordering } ,
7- Arc , Condvar , Mutex ,
8- } ,
5+ sync:: { Arc , Condvar , Mutex } ,
96 thread:: JoinHandle ,
107} ;
118use tracing:: error;
@@ -14,10 +11,8 @@ pub struct Video {
1411 stream_format : VideoStreamFormat ,
1512 video_stream : AVStreamRef ,
1613
17- dropped : Arc < AtomicBool > ,
18- ended : AtomicBool ,
19-
20- mutex : Arc < ( Mutex < Option < Option < & ' static AVFrame > > > , Condvar ) > ,
14+ position : Arc < ( Mutex < i64 > , Condvar ) > ,
15+ frame : Arc < Mutex < ( AVFrame , i64 ) > > ,
2116 decode_thread : Option < JoinHandle < ( ) > > ,
2217}
2318
@@ -28,66 +23,106 @@ impl Video {
2823 format_ctx. find_stream_info ( ) ?;
2924
3025 let video_stream = format_ctx. streams ( ) . into_iter ( ) . find ( |it| it. is_video ( ) ) . ok_or ( Error :: NoVideoStream ) ?;
26+ let time_base = video_stream. time_base ( ) . to_f64 ( ) ;
3127
3228 let decoder = video_stream. find_decoder ( ) ?;
3329 let mut codec_ctx = AVCodecContext :: new ( decoder, video_stream. codec_params ( ) , Some ( pix_fmt) ) ?;
3430
31+ let stream_format = codec_ctx. video_stream_format ( ) ;
3532 let out_format = VideoStreamFormat {
3633 pix_fmt,
37- ..codec_ctx . video_stream_format ( )
34+ ..stream_format . clone ( )
3835 } ;
3936
40- let mutex = Arc :: new ( ( Mutex :: new ( None ) , Condvar :: new ( ) ) ) ;
41-
42- let stream_format = codec_ctx. video_stream_format ( ) ;
43-
4437 let mut sws = SwsContext :: new ( stream_format. clone ( ) , out_format. clone ( ) ) ?;
4538 let mut in_frame = AVFrame :: new ( ) ?;
4639 let mut out_frame = AVFrame :: new ( ) ?;
4740 out_frame. set_video_format ( & out_format) ;
4841 out_frame. get_buffer ( ) ?;
4942
50- let dropped = Arc :: new ( AtomicBool :: default ( ) ) ;
43+ let mut buf_frame = AVFrame :: new ( ) ?;
44+ buf_frame. set_video_format ( & stream_format) ;
45+ buf_frame. get_buffer ( ) ?;
46+
47+ let position = Arc :: new ( ( Mutex :: new ( 0 ) , Condvar :: new ( ) ) ) ;
48+ let frame = Arc :: new ( Mutex :: new ( ( buf_frame, -1 ) ) ) ;
49+
50+ let video_index = video_stream. index ( ) ;
5151
5252 let decode_thread = std:: thread:: spawn ( {
53- let mut packet = AVPacket :: new ( ) ?;
54- let video_index = video_stream. index ( ) ;
55- let mutex = Arc :: clone ( & mutex) ;
56- let dropped = Arc :: clone ( & dropped) ;
53+ let position = position. clone ( ) ;
54+ let frame = frame. clone ( ) ;
55+
5756 move || {
58- let mut decode_main = {
59- let mutex = Arc :: clone ( & mutex) ;
60- move || -> Result < ( ) > {
61- while !dropped. load ( Ordering :: Relaxed ) && format_ctx. read_frame ( & mut packet) ? {
62- if packet. stream_index ( ) != video_index {
57+ let mut run = || -> Result < ( ) > {
58+ let mut packet = AVPacket :: new ( ) ?;
59+ let mut current_ts = -1 ;
60+
61+ let mut catching_up_to: Option < i64 > = None ;
62+
63+ loop {
64+ let ts = {
65+ let mut guard = position. 0 . lock ( ) . unwrap ( ) ;
66+ loop {
67+ let ts = * guard;
68+ if ts == i64:: MAX {
69+ return Ok ( ( ) ) ;
70+ }
71+
72+ let diff_sec = ( current_ts - ts) as f64 * time_base;
73+ if current_ts != -1 && diff_sec > 0.0 && diff_sec < 0.5 {
74+ guard = position. 1 . wait ( guard) . unwrap ( ) ;
75+ } else {
76+ break ts;
77+ }
78+ }
79+ } ;
80+
81+ if ( current_ts - ts) as f64 * time_base > 0.5 {
82+ let is_already_catching_up = catching_up_to. is_some_and ( |target| ( ( ts - target) as f64 * time_base) . abs ( ) < 0.5 ) ;
83+
84+ if !is_already_catching_up {
85+ format_ctx. seek_frame ( video_index, ts, crate :: ffi:: AVSEEK_FLAG_BACKWARD ) ?;
86+ codec_ctx. flush_buffers ( ) ;
87+
88+ catching_up_to = Some ( ts) ;
89+ current_ts = -1 ;
6390 continue ;
6491 }
65- codec_ctx. send_packet ( & packet) ?;
92+ }
93+
94+ if !format_ctx. read_frame ( & mut packet) ? {
95+ frame. lock ( ) . unwrap ( ) . 1 = -1 ;
96+ continue ;
97+ }
98+
99+ if packet. stream_index ( ) != video_index {
100+ continue ;
101+ }
102+
103+ codec_ctx. send_packet ( & packet) ?;
104+ let mut sent = false ;
105+ while codec_ctx. receive_frame ( & mut in_frame) ? {
106+ current_ts = in_frame. pts ( ) ;
107+
108+ if catching_up_to. is_some_and ( |target| current_ts >= target) {
109+ catching_up_to = None ;
110+ }
66111
67- while codec_ctx . receive_frame ( & mut in_frame ) ? {
112+ if !sent && current_ts >= ts {
68113 sws. scale ( & in_frame, & mut out_frame) ;
69- let mut frame = mutex. 0 . lock ( ) . unwrap ( ) ;
70- * frame = Some ( Some ( unsafe { std:: mem:: transmute :: < & AVFrame , & ' static AVFrame > ( & out_frame) } ) ) ;
71- mutex. 1 . notify_one ( ) ;
72- while frame. is_some ( ) {
73- if dropped. load ( Ordering :: Relaxed ) {
74- return Ok ( ( ) ) ;
75- }
76- frame = mutex. 1 . wait ( frame) . unwrap ( ) ;
77- }
114+ let mut guard = frame. lock ( ) . unwrap ( ) ;
115+ std:: mem:: swap ( & mut guard. 0 , & mut out_frame) ;
116+ guard. 1 = current_ts;
117+ sent = true ;
78118 }
79119 }
80- let mut frame = mutex. 0 . lock ( ) . unwrap ( ) ;
81- * frame = Some ( None ) ;
82- mutex. 1 . notify_one ( ) ;
83- Ok ( ( ) )
84120 }
85121 } ;
86- if let Err ( err) = decode_main ( ) {
87- error ! ( "decode failed: {err:?}" ) ;
88- let mut frame = mutex. 0 . lock ( ) . unwrap ( ) ;
89- * frame = Some ( None ) ;
90- mutex. 1 . notify_one ( ) ;
122+
123+ if let Err ( e) = run ( ) {
124+ error ! ( "decode error: {e:?}" ) ;
125+ frame. lock ( ) . unwrap ( ) . 1 = -1 ;
91126 }
92127 }
93128 } ) ;
@@ -96,10 +131,8 @@ impl Video {
96131 stream_format,
97132 video_stream,
98133
99- dropped,
100- ended : AtomicBool :: default ( ) ,
101-
102- mutex,
134+ frame,
135+ position,
103136 decode_thread : Some ( decode_thread) ,
104137 } )
105138 }
@@ -112,32 +145,34 @@ impl Video {
112145 self . video_stream . frame_rate ( )
113146 }
114147
115- pub fn with_frame < R > ( & self , f : impl FnOnce ( & AVFrame ) -> R ) -> Option < R > {
116- let mut frame = self . mutex . 0 . lock ( ) . unwrap ( ) ;
117- loop {
118- let Some ( data) = * frame else {
119- frame = self . mutex . 1 . wait ( frame) . unwrap ( ) ;
120- continue ;
121- } ;
122- let Some ( data) = data else {
123- self . ended . store ( true , Ordering :: SeqCst ) ;
124- return None ;
125- } ;
126- let res = f ( data) ;
127- * frame = None ;
128- self . mutex . 1 . notify_one ( ) ;
129- break Some ( res) ;
130- }
148+ pub fn time_base ( & self ) -> AVRational {
149+ self . video_stream . time_base ( )
150+ }
151+
152+ pub fn duration ( & self ) -> f64 {
153+ self . video_stream . duration ( ) as f64 * self . time_base ( ) . to_f64 ( )
154+ }
155+
156+ pub fn elapsed_to_timestamp ( & self , elapsed : f32 ) -> i64 {
157+ let time_base = self . time_base ( ) ;
158+ ( elapsed as f64 * time_base. den as f64 / time_base. num as f64 ) . round ( ) as i64
159+ }
160+
161+ pub fn seek ( & self , timestamp : i64 ) {
162+ let mut guard = self . position . 0 . lock ( ) . unwrap ( ) ;
163+ * guard = timestamp;
164+ self . position . 1 . notify_one ( ) ;
165+ }
166+
167+ pub fn with_frame < R > ( & self , mut f : impl FnMut ( & AVFrame , i64 ) -> R ) -> R {
168+ let guard = self . frame . lock ( ) . unwrap ( ) ;
169+ f ( & guard. 0 , guard. 1 )
131170 }
132171}
133172
134173impl Drop for Video {
135174 fn drop ( & mut self ) {
136- self . dropped . store ( true , Ordering :: Relaxed ) ;
137- {
138- let _guard = self . mutex . 0 . lock ( ) . unwrap ( ) ;
139- self . mutex . 1 . notify_one ( ) ;
140- }
175+ self . seek ( i64:: MAX ) ;
141176 if let Some ( handle) = self . decode_thread . take ( ) {
142177 handle. join ( ) . unwrap ( ) ;
143178 }
0 commit comments