1
1
use crate :: binding:: http:: { to_event, Headers } ;
2
2
use crate :: Event ;
3
+ use actix_web:: dev:: Payload ;
3
4
use actix_web:: web:: BytesMut ;
4
5
use actix_web:: { web, HttpRequest } ;
5
6
use async_trait:: async_trait;
6
- use futures:: future:: LocalBoxFuture ;
7
- use futures:: { FutureExt , StreamExt } ;
7
+ use futures:: { future:: LocalBoxFuture , FutureExt , StreamExt } ;
8
8
use http:: header:: { AsHeaderName , HeaderName , HeaderValue } ;
9
9
10
10
/// Implement Headers for the actix HeaderMap
11
- impl < ' a > Headers < ' a > for actix_web :: http :: HeaderMap {
11
+ impl < ' a > Headers < ' a > for actix_http :: header :: HeaderMap {
12
12
type Iterator = Box < dyn Iterator < Item = ( & ' a HeaderName , & ' a HeaderValue ) > + ' a > ;
13
13
fn get < K : AsHeaderName > ( & self , key : K ) -> Option < & HeaderValue > {
14
14
self . get ( key. as_str ( ) )
@@ -32,14 +32,18 @@ pub async fn request_to_event(
32
32
33
33
/// So that an actix-web handler may take an Event parameter
34
34
impl actix_web:: FromRequest for Event {
35
- type Config = ( ) ;
36
35
type Error = actix_web:: Error ;
37
36
type Future = LocalBoxFuture < ' static , std:: result:: Result < Self , Self :: Error > > ;
38
37
39
- fn from_request ( r : & HttpRequest , p : & mut actix_web:: dev:: Payload ) -> Self :: Future {
40
- let payload = web:: Payload ( p. take ( ) ) ;
38
+ fn from_request ( r : & HttpRequest , p : & mut Payload ) -> Self :: Future {
41
39
let request = r. to_owned ( ) ;
42
- async move { request_to_event ( & request, payload) . await } . boxed_local ( )
40
+ bytes:: Bytes :: from_request ( & request, p)
41
+ . map ( move |bytes| match bytes {
42
+ Ok ( b) => to_event ( request. headers ( ) , b. to_vec ( ) )
43
+ . map_err ( actix_web:: error:: ErrorBadRequest ) ,
44
+ Err ( e) => Err ( e) ,
45
+ } )
46
+ . boxed_local ( )
43
47
}
44
48
}
45
49
@@ -74,46 +78,52 @@ mod private {
74
78
#[ cfg( test) ]
75
79
mod tests {
76
80
use super :: * ;
77
- use actix_web:: test;
81
+ use actix_web:: { test, FromRequest } ;
78
82
79
83
use crate :: test:: fixtures;
80
84
use serde_json:: json;
85
+
86
+ async fn to_event ( req : & HttpRequest , mut payload : Payload ) -> Event {
87
+ web:: Payload :: from_request ( & req, & mut payload)
88
+ . then ( |p| req. to_event ( p. unwrap ( ) ) )
89
+ . await
90
+ . unwrap ( )
91
+ }
92
+
81
93
#[ actix_rt:: test]
82
94
async fn test_request ( ) {
83
95
let expected = fixtures:: v10:: minimal_string_extension ( ) ;
84
96
85
97
let ( req, payload) = test:: TestRequest :: post ( )
86
- . header ( "ce-specversion" , "1.0" )
87
- . header ( "ce-id" , "0001" )
88
- . header ( "ce-type" , "test_event.test_application" )
89
- . header ( "ce-source" , "http://localhost/" )
90
- . header ( "ce-someint" , "10" )
98
+ . insert_header ( ( "ce-specversion" , "1.0" ) )
99
+ . insert_header ( ( "ce-id" , "0001" ) )
100
+ . insert_header ( ( "ce-type" , "test_event.test_application" ) )
101
+ . insert_header ( ( "ce-source" , "http://localhost/" ) )
102
+ . insert_header ( ( "ce-someint" , "10" ) )
91
103
. to_http_parts ( ) ;
92
104
93
- let resp = req. to_event ( web:: Payload ( payload) ) . await . unwrap ( ) ;
94
- assert_eq ! ( expected, resp) ;
105
+ assert_eq ! ( expected, to_event( & req, payload) . await ) ;
95
106
}
96
107
97
108
#[ actix_rt:: test]
98
109
async fn test_request_with_full_data ( ) {
99
110
let expected = fixtures:: v10:: full_binary_json_data_string_extension ( ) ;
100
111
101
112
let ( req, payload) = test:: TestRequest :: post ( )
102
- . header ( "ce-specversion" , "1.0" )
103
- . header ( "ce-id" , "0001" )
104
- . header ( "ce-type" , "test_event.test_application" )
105
- . header ( "ce-subject" , "cloudevents-sdk" )
106
- . header ( "ce-source" , "http://localhost/" )
107
- . header ( "ce-time" , fixtures:: time ( ) . to_rfc3339 ( ) )
108
- . header ( "ce-string_ex" , "val" )
109
- . header ( "ce-int_ex" , "10" )
110
- . header ( "ce-bool_ex" , "true" )
111
- . header ( "content-type" , "application/json" )
113
+ . insert_header ( ( "ce-specversion" , "1.0" ) )
114
+ . insert_header ( ( "ce-id" , "0001" ) )
115
+ . insert_header ( ( "ce-type" , "test_event.test_application" ) )
116
+ . insert_header ( ( "ce-subject" , "cloudevents-sdk" ) )
117
+ . insert_header ( ( "ce-source" , "http://localhost/" ) )
118
+ . insert_header ( ( "ce-time" , fixtures:: time ( ) . to_rfc3339 ( ) ) )
119
+ . insert_header ( ( "ce-string_ex" , "val" ) )
120
+ . insert_header ( ( "ce-int_ex" , "10" ) )
121
+ . insert_header ( ( "ce-bool_ex" , "true" ) )
122
+ . insert_header ( ( "content-type" , "application/json" ) )
112
123
. set_json ( & fixtures:: json_data ( ) )
113
124
. to_http_parts ( ) ;
114
125
115
- let resp = req. to_event ( web:: Payload ( payload) ) . await . unwrap ( ) ;
116
- assert_eq ! ( expected, resp) ;
126
+ assert_eq ! ( expected, to_event( & req, payload) . await ) ;
117
127
}
118
128
119
129
#[ actix_rt:: test]
@@ -136,11 +146,10 @@ mod tests {
136
146
let expected = fixtures:: v10:: full_json_data_string_extension ( ) ;
137
147
138
148
let ( req, payload) = test:: TestRequest :: post ( )
139
- . header ( "content-type" , "application/cloudevents+json" )
149
+ . insert_header ( ( "content-type" , "application/cloudevents+json" ) )
140
150
. set_payload ( bytes)
141
151
. to_http_parts ( ) ;
142
152
143
- let resp = req. to_event ( web:: Payload ( payload) ) . await . unwrap ( ) ;
144
- assert_eq ! ( expected, resp) ;
153
+ assert_eq ! ( expected, to_event( & req, payload) . await ) ;
145
154
}
146
155
}
0 commit comments