1
1
//! HTTP Client
2
2
3
3
use std:: {
4
+ borrow:: Cow ,
4
5
collections:: VecDeque ,
5
6
fmt:: Debug ,
6
7
future:: Future ,
@@ -11,7 +12,8 @@ use std::{
11
12
time:: { Duration , Instant } ,
12
13
} ;
13
14
14
- use http:: Uri ;
15
+ use bson:: doc;
16
+ use http:: { header:: InvalidHeaderValue , HeaderValue , Method as HttpMethod , Uri , Version as HttpVersion } ;
15
17
use hyper:: {
16
18
body:: { self , Body } ,
17
19
client:: conn:: { http1, http2} ,
@@ -47,6 +49,9 @@ pub enum HttpClientError {
47
49
/// Errors from http
48
50
#[ error( "{0}" ) ]
49
51
Http ( #[ from] http:: Error ) ,
52
+ /// Errors from http header
53
+ #[ error( "{0}" ) ]
54
+ InvalidHeaderValue ( #[ from] InvalidHeaderValue ) ,
50
55
}
51
56
52
57
#[ derive( Clone , Debug ) ]
@@ -137,20 +142,32 @@ where
137
142
pub async fn send_request (
138
143
& self ,
139
144
context : Arc < ServiceContext > ,
140
- req : Request < B > ,
145
+ mut req : Request < B > ,
141
146
balancer : Option < & PingBalancer > ,
142
147
) -> Result < Response < body:: Incoming > , HttpClientError > {
143
148
let host = match host_addr ( req. uri ( ) ) {
144
149
Some ( h) => h,
145
150
None => panic ! ( "URI missing host: {}" , req. uri( ) ) ,
146
151
} ;
147
152
153
+ // Set Host header if it was missing in the Request
154
+ {
155
+ let headers = req. headers_mut ( ) ;
156
+ if !headers. contains_key ( "Host" ) {
157
+ let host_value = match host {
158
+ Address :: DomainNameAddress ( ref domain, _) => HeaderValue :: from_str ( domain) ?,
159
+ Address :: SocketAddress ( ref saddr) => HeaderValue :: from_str ( saddr. ip ( ) . to_string ( ) . as_str ( ) ) ?,
160
+ } ;
161
+ headers. insert ( "Host" , host_value) ;
162
+ }
163
+ }
164
+
148
165
// 1. Check if there is an available client
149
166
//
150
167
// FIXME: If the cached connection is closed unexpectedly, this request will fail immediately.
151
168
if let Some ( c) = self . get_cached_connection ( & host) . await {
152
169
trace ! ( "HTTP client for host: {} taken from cache" , host) ;
153
- return self . send_request_conn ( host, c, req) . await
170
+ return self . send_request_conn ( host, c, req) . await ;
154
171
}
155
172
156
173
// 2. If no. Make a new connection
@@ -159,13 +176,12 @@ where
159
176
None => & Scheme :: HTTP ,
160
177
} ;
161
178
162
- let domain = req
163
- . uri ( )
164
- . host ( )
165
- . unwrap ( )
166
- . trim_start_matches ( '[' )
167
- . trim_start_matches ( ']' ) ;
168
- let c = match HttpConnection :: connect ( context. clone ( ) , scheme, host. clone ( ) , domain, balancer) . await {
179
+ let domain = match host {
180
+ Address :: DomainNameAddress ( ref domain, _) => Cow :: Borrowed ( domain. as_str ( ) ) ,
181
+ Address :: SocketAddress ( ref saddr) => Cow :: Owned ( saddr. ip ( ) . to_string ( ) ) ,
182
+ } ;
183
+
184
+ let c = match HttpConnection :: connect ( context. clone ( ) , scheme, host. clone ( ) , & domain, balancer) . await {
169
185
Ok ( c) => c,
170
186
Err ( err) => {
171
187
error ! ( "failed to connect to host: {}, error: {}" , host, err) ;
@@ -196,19 +212,8 @@ where
196
212
& self ,
197
213
host : Address ,
198
214
mut c : HttpConnection < B > ,
199
- mut req : Request < B > ,
215
+ req : Request < B > ,
200
216
) -> Result < Response < body:: Incoming > , HttpClientError > {
201
- // Remove Scheme, Host part from URI
202
- if req. uri ( ) . scheme ( ) . is_some ( ) || req. uri ( ) . authority ( ) . is_some ( ) {
203
- let mut builder = Uri :: builder ( ) ;
204
- if let Some ( path_and_query) = req. uri ( ) . path_and_query ( ) {
205
- builder = builder. path_and_query ( path_and_query. as_str ( ) ) ;
206
- } else {
207
- builder = builder. path_and_query ( "/" ) ;
208
- }
209
- * ( req. uri_mut ( ) ) = builder. build ( ) ?;
210
- }
211
-
212
217
trace ! ( "HTTP making request to host: {}, request: {:?}" , host, req) ;
213
218
let response = c. send_request ( req) . await ?;
214
219
trace ! ( "HTTP received response from host: {}, response: {:?}" , host, response) ;
@@ -351,10 +356,45 @@ where
351
356
}
352
357
353
358
#[ inline]
354
- pub async fn send_request ( & mut self , req : Request < B > ) -> hyper :: Result < Response < body:: Incoming > > {
359
+ pub async fn send_request ( & mut self , mut req : Request < B > ) -> Result < Response < body:: Incoming > , HttpClientError > {
355
360
match self {
356
- HttpConnection :: Http1 ( r) => r. send_request ( req) . await ,
357
- HttpConnection :: Http2 ( r) => r. send_request ( req) . await ,
361
+ HttpConnection :: Http1 ( r) => {
362
+ if !matches ! (
363
+ req. version( ) ,
364
+ HttpVersion :: HTTP_09 | HttpVersion :: HTTP_10 | HttpVersion :: HTTP_11
365
+ ) {
366
+ trace ! (
367
+ "HTTP client changed Request.version to HTTP/1.1 from {:?}" ,
368
+ req. version( )
369
+ ) ;
370
+
371
+ * req. version_mut ( ) = HttpVersion :: HTTP_11 ;
372
+ }
373
+
374
+ // Remove Scheme, Host part from URI
375
+ if req. method ( ) != HttpMethod :: CONNECT
376
+ && ( req. uri ( ) . scheme ( ) . is_some ( ) || req. uri ( ) . authority ( ) . is_some ( ) )
377
+ {
378
+ let mut builder = Uri :: builder ( ) ;
379
+ if let Some ( path_and_query) = req. uri ( ) . path_and_query ( ) {
380
+ builder = builder. path_and_query ( path_and_query. as_str ( ) ) ;
381
+ } else {
382
+ builder = builder. path_and_query ( "/" ) ;
383
+ }
384
+ * ( req. uri_mut ( ) ) = builder. build ( ) ?;
385
+ }
386
+
387
+ r. send_request ( req) . await . map_err ( Into :: into)
388
+ }
389
+ HttpConnection :: Http2 ( r) => {
390
+ if !matches ! ( req. version( ) , HttpVersion :: HTTP_2 ) {
391
+ trace ! ( "HTTP client changed Request.version to HTTP/2 from {:?}" , req. version( ) ) ;
392
+
393
+ * req. version_mut ( ) = HttpVersion :: HTTP_2 ;
394
+ }
395
+
396
+ r. send_request ( req) . await . map_err ( Into :: into)
397
+ }
358
398
}
359
399
}
360
400
0 commit comments