11use crate :: http:: {
2- GetRequestError , GetResponse , HttpClient , HttpError , HttpRequestBuilder , HttpResponse ,
2+ FileId , GetRequestError , GetResponse , HttpClient , HttpError , HttpHeaders , HttpRequestBuilder ,
3+ HttpResponse ,
34} ;
45use bytes:: Bytes ;
56use fast_pull:: { ProgressEntry , RandPuller , SeqPuller } ;
@@ -17,13 +18,20 @@ pub struct HttpPuller<Client: HttpClient> {
1718 pub ( crate ) client : Client ,
1819 url : Url ,
1920 resp : Arc < SpinMutex < Option < GetResponse < Client > > > > ,
21+ file_id : FileId ,
2022}
2123impl < Client : HttpClient > HttpPuller < Client > {
22- pub fn new ( url : Url , client : Client , resp : Option < GetResponse < Client > > ) -> Self {
24+ pub fn new (
25+ url : Url ,
26+ client : Client ,
27+ resp : Option < GetResponse < Client > > ,
28+ file_id : FileId ,
29+ ) -> Self {
2330 Self {
2431 client,
2532 url,
2633 resp : Arc :: new ( SpinMutex :: new ( resp) ) ,
34+ file_id,
2735 }
2836 }
2937}
@@ -54,6 +62,7 @@ impl<Client: HttpClient + 'static> RandPuller for HttpPuller<Client> {
5462 } else {
5563 ResponseState :: None
5664 } ,
65+ file_id : self . file_id . clone ( ) ,
5766 }
5867 }
5968}
@@ -63,6 +72,7 @@ struct RandRequestStream<Client: HttpClient + 'static> {
6372 start : u64 ,
6473 end : u64 ,
6574 state : ResponseState < Client > ,
75+ file_id : FileId ,
6676}
6777impl < Client : HttpClient > Stream for RandRequestStream < Client > {
6878 type Item = Result < Bytes , HttpError < Client > > ;
@@ -73,8 +83,16 @@ impl<Client: HttpClient> Stream for RandRequestStream<Client> {
7383 return match resp. try_poll_unpin ( cx) {
7484 Poll :: Ready ( resp) => match resp {
7585 Ok ( resp) => {
76- self . state = ResponseState :: Ready ( resp) ;
77- self . poll_next ( cx)
86+ let etag = resp. headers ( ) . get ( "etag" ) . ok ( ) ;
87+ let last_modified = resp. headers ( ) . get ( "last-modified" ) . ok ( ) ;
88+ let new_file_id = FileId :: new ( etag, last_modified) ;
89+ if new_file_id != self . file_id {
90+ self . state = ResponseState :: None ;
91+ Poll :: Ready ( Some ( Err ( HttpError :: MismatchedBody ( new_file_id) ) ) )
92+ } else {
93+ self . state = ResponseState :: Ready ( resp) ;
94+ self . poll_next ( cx)
95+ }
7896 }
7997 Err ( e) => {
8098 self . state = ResponseState :: None ;
@@ -118,21 +136,21 @@ impl<Client: HttpClient> Stream for RandRequestStream<Client> {
118136impl < Client : HttpClient + ' static > SeqPuller for HttpPuller < Client > {
119137 type Error = HttpError < Client > ;
120138 fn pull ( & mut self ) -> impl TryStream < Ok = Bytes , Error = Self :: Error > + Send + Unpin {
121- match self . resp . lock ( ) . take ( ) {
122- Some ( resp) => SeqRequestStream {
123- state : ResponseState :: Ready ( resp) ,
124- } ,
125- None => {
126- let req = self . client . get ( self . url . clone ( ) , None ) . send ( ) ;
127- SeqRequestStream {
128- state : ResponseState :: Pending ( Box :: pin ( req) ) ,
139+ SeqRequestStream {
140+ state : match self . resp . lock ( ) . take ( ) {
141+ Some ( resp) => ResponseState :: Ready ( resp) ,
142+ None => {
143+ let req = self . client . get ( self . url . clone ( ) , None ) . send ( ) ;
144+ ResponseState :: Pending ( Box :: pin ( req) )
129145 }
130- }
146+ } ,
147+ file_id : self . file_id . clone ( ) ,
131148 }
132149 }
133150}
134151struct SeqRequestStream < Client : HttpClient + ' static > {
135152 state : ResponseState < Client > ,
153+ file_id : FileId ,
136154}
137155impl < Client : HttpClient > Stream for SeqRequestStream < Client > {
138156 type Item = Result < Bytes , HttpError < Client > > ;
@@ -143,8 +161,16 @@ impl<Client: HttpClient> Stream for SeqRequestStream<Client> {
143161 return match resp. try_poll_unpin ( cx) {
144162 Poll :: Ready ( resp) => match resp {
145163 Ok ( resp) => {
146- self . state = ResponseState :: Ready ( resp) ;
147- self . poll_next ( cx)
164+ let etag = resp. headers ( ) . get ( "etag" ) . ok ( ) ;
165+ let last_modified = resp. headers ( ) . get ( "last-modified" ) . ok ( ) ;
166+ let new_file_id = FileId :: new ( etag, last_modified) ;
167+ if new_file_id != self . file_id {
168+ self . state = ResponseState :: None ;
169+ Poll :: Ready ( Some ( Err ( HttpError :: MismatchedBody ( new_file_id) ) ) )
170+ } else {
171+ self . state = ResponseState :: Ready ( resp) ;
172+ self . poll_next ( cx)
173+ }
148174 }
149175 Err ( e) => {
150176 self . state = ResponseState :: None ;
0 commit comments