@@ -135,38 +135,41 @@ where
135135 E : From < irpc:: Error > ,
136136 E : From < irpc:: channel:: RecvError > ,
137137 {
138- Gen :: new ( move |co| async move {
139- let mut rx = match self . await {
140- Ok ( rx) => rx,
141- Err ( e) => {
142- co. yield_ ( Err ( E :: from ( e) ) ) . await ;
143- return ;
144- }
145- } ;
146- loop {
147- match rx. recv ( ) . await {
148- Ok ( Some ( item) ) => match item. into_result_opt ( ) {
149- Some ( Ok ( i) ) => co. yield_ ( Ok ( i) ) . await ,
150- Some ( Err ( e) ) => {
151- co. yield_ ( Err ( E :: from ( e) ) ) . await ;
152- break ;
153- }
154- None => break ,
155- } ,
156- Ok ( None ) => {
157- co. yield_ ( Err ( E :: from ( irpc:: channel:: RecvError :: Io ( io:: Error :: new (
158- io:: ErrorKind :: UnexpectedEof ,
159- "unexpected end of stream" ,
160- ) ) ) ) )
161- . await ;
162- break ;
163- }
164- Err ( e) => {
165- co. yield_ ( Err ( E :: from ( e) ) ) . await ;
166- break ;
167- }
168- }
138+ enum State < S , T > {
139+ Init ( S ) ,
140+ Receiving ( mpsc:: Receiver < T > ) ,
141+ Done ,
142+ }
143+ fn eof ( ) -> RecvError {
144+ io:: Error :: new ( io:: ErrorKind :: UnexpectedEof , "unexpected end of stream" ) . into ( )
145+ }
146+ async fn process_recv < S , T , E > (
147+ mut rx : mpsc:: Receiver < T > ,
148+ ) -> Option < ( std:: result:: Result < T :: Item , E > , State < S , T > ) >
149+ where
150+ T : IrpcStreamItem ,
151+ E : From < T :: Error > ,
152+ E : From < irpc:: Error > ,
153+ E : From < RecvError > ,
154+ {
155+ match rx. recv ( ) . await {
156+ Ok ( Some ( item) ) => match item. into_result_opt ( ) ? {
157+ Ok ( i) => Some ( ( Ok ( i) , State :: Receiving ( rx) ) ) ,
158+ Err ( e) => Some ( ( Err ( E :: from ( e) ) , State :: Done ) ) ,
159+ } ,
160+ Ok ( None ) => Some ( ( Err ( E :: from ( eof ( ) ) ) , State :: Done ) ) ,
161+ Err ( e) => Some ( ( Err ( E :: from ( e) ) , State :: Done ) ) ,
162+ }
163+ }
164+ Box :: pin ( stream:: unfold ( State :: Init ( self ) , |state| async move {
165+ match state {
166+ State :: Init ( fut) => match fut. await {
167+ Ok ( rx) => process_recv ( rx) . await ,
168+ Err ( e) => Some ( ( Err ( E :: from ( e) ) , State :: Done ) ) ,
169+ } ,
170+ State :: Receiving ( rx) => process_recv ( rx) . await ,
171+ State :: Done => None ,
169172 }
170- } )
173+ } ) )
171174 }
172175}
0 commit comments