Skip to content

Commit 2b2f9ea

Browse files
committed
feat(rust): added the possibility to overwrite http headers in inlets
1 parent 9944b7d commit 2b2f9ea

File tree

26 files changed

+813
-202
lines changed

26 files changed

+813
-202
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
use crate::http::state::{ClientRequestWriter, RequestState};
2+
use crate::nodes::models::services::{DeleteServiceRequest, StartServiceRequest};
3+
use crate::nodes::registry::HttpHeaderInterceptorInfo;
4+
use crate::nodes::{NodeManager, NodeManagerWorker};
5+
use crate::DefaultAddress;
6+
use httparse::Request;
7+
use minicbor::{CborLen, Decode, Encode};
8+
use ockam_abac::{Action, PolicyAccessControl, Resource, ResourceType};
9+
use ockam_core::api::Response;
10+
use ockam_core::errcode::{Kind, Origin};
11+
use ockam_core::{async_trait, Address, AllowAll, IncomingAccessControl, OutgoingAccessControl};
12+
use ockam_node::Context;
13+
use ockam_transport_tcp::{
14+
read_portal_payload_length, Direction, PortalInletInterceptor, PortalInterceptor,
15+
PortalInterceptorFactory,
16+
};
17+
use std::io::Write;
18+
use std::sync::{Arc, Mutex as SyncMutex};
19+
20+
/// An HTTP headers interceptor that rewrites the headers of the HTTP
21+
/// request with the statically provided headers.
22+
struct StaticHttpHeadersInterceptor {
23+
headers: Arc<Vec<(String, String)>>,
24+
}
25+
26+
impl StaticHttpHeadersInterceptor {
27+
/// Starts a listener that will intercept data in a portal on the inlet side.
28+
/// The listener will rewrite the headers of the HTTP request with the provided headers.
29+
pub async fn start_listener(
30+
context: &Context,
31+
listener_address: Address,
32+
headers: Vec<(String, String)>,
33+
policy_access_control: Option<PolicyAccessControl>,
34+
) -> ockam_core::Result<()> {
35+
let flow_control_id = context
36+
.flow_controls()
37+
.get_flow_control_with_spawner(&Address::from_string(
38+
DefaultAddress::SECURE_CHANNEL_LISTENER,
39+
))
40+
.ok_or_else(|| {
41+
ockam_core::Error::new(
42+
Origin::Channel,
43+
Kind::NotFound,
44+
"Secure channel listener not found",
45+
)
46+
})?;
47+
48+
context
49+
.flow_controls()
50+
.add_consumer(&listener_address, &flow_control_id);
51+
52+
let incoming_access_control: Arc<dyn IncomingAccessControl>;
53+
let outgoing_access_control: Arc<dyn OutgoingAccessControl>;
54+
if let Some(policy_access_control) = policy_access_control {
55+
incoming_access_control = Arc::new(policy_access_control.create_incoming());
56+
outgoing_access_control = Arc::new(policy_access_control.create_outgoing(context)?);
57+
} else {
58+
incoming_access_control = Arc::new(AllowAll);
59+
outgoing_access_control = Arc::new(AllowAll);
60+
}
61+
62+
PortalInletInterceptor::start_listener(
63+
context,
64+
listener_address,
65+
Arc::new(StaticHttpHeadersInterceptor {
66+
headers: Arc::new(headers),
67+
}),
68+
incoming_access_control,
69+
outgoing_access_control,
70+
read_portal_payload_length(),
71+
)
72+
}
73+
}
74+
75+
impl PortalInterceptorFactory for StaticHttpHeadersInterceptor {
76+
fn create(&self) -> Arc<dyn PortalInterceptor> {
77+
Arc::new(HttpHeadersInterceptor {
78+
headers: self.headers.clone(),
79+
state: SyncMutex::new(RequestState::ParsingHeader(None)),
80+
})
81+
}
82+
}
83+
84+
struct HttpHeadersInterceptor {
85+
headers: Arc<Vec<(String, String)>>,
86+
state: SyncMutex<RequestState>,
87+
}
88+
89+
#[async_trait]
90+
impl PortalInterceptor for HttpHeadersInterceptor {
91+
async fn intercept(
92+
&self,
93+
_context: &mut Context,
94+
direction: Direction,
95+
buffer: &[u8],
96+
) -> ockam_core::Result<Option<Vec<u8>>> {
97+
match direction {
98+
Direction::FromOutletToInlet => Ok(Some(buffer.to_vec())),
99+
Direction::FromInletToOutlet => {
100+
let mut guard = self.state.lock().unwrap();
101+
Ok(Some(guard.process_http_buffer(buffer, self)?))
102+
}
103+
}
104+
}
105+
}
106+
107+
impl ClientRequestWriter for &HttpHeadersInterceptor {
108+
fn write_headers(&self, request: &Request, buffer: &mut Vec<u8>) -> ockam_core::Result<()> {
109+
write!(
110+
buffer,
111+
"{} {} HTTP/1.{}\r\n",
112+
request.method.unwrap(),
113+
request.path.unwrap(),
114+
request.version.unwrap()
115+
)
116+
.unwrap();
117+
118+
for (name, value) in self.headers.iter() {
119+
write!(buffer, "{}: {}\r\n", name, value).unwrap();
120+
}
121+
122+
for h in &*request.headers {
123+
if !self
124+
.headers
125+
.iter()
126+
.any(|(name, _)| name.eq_ignore_ascii_case(h.name))
127+
{
128+
write!(buffer, "{}: ", h.name).unwrap();
129+
buffer.extend_from_slice(h.value);
130+
buffer.extend_from_slice(b"\r\n");
131+
}
132+
}
133+
134+
buffer.extend_from_slice(b"\r\n");
135+
Ok(())
136+
}
137+
}
138+
139+
/// Request body to create a new HTTP rewrite headers interceptor
140+
#[derive(Clone, Debug, Encode, Decode, CborLen)]
141+
#[rustfmt::skip]
142+
#[cbor(map)]
143+
pub struct HttpHeadersInterceptorRequest {
144+
#[n(0)] pub headers: Vec<(String, String)>,
145+
}
146+
147+
impl NodeManagerWorker {
148+
pub async fn start_http_header_service(
149+
&self,
150+
context: &Context,
151+
request: StartServiceRequest<HttpHeadersInterceptorRequest>,
152+
) -> ockam_core::Result<Response<()>, Response<ockam_core::api::Error>> {
153+
let result = self
154+
.node_manager
155+
.start_http_header_service(
156+
context,
157+
Address::from_string(request.address()),
158+
request.request().headers.clone(),
159+
)
160+
.await;
161+
162+
match result {
163+
Ok(_) => Ok(Response::ok().body(())),
164+
Err(e) => Err(Response::internal_error_no_request(&e.to_string())),
165+
}
166+
}
167+
168+
pub async fn delete_http_overwrite_header_service(
169+
&self,
170+
context: &Context,
171+
request: DeleteServiceRequest,
172+
) -> ockam_core::Result<Response<()>, Response<ockam_core::api::Error>> {
173+
let result = self
174+
.node_manager
175+
.delete_http_overwrite_header_service(context, &Address::from_string(request.address()))
176+
.await;
177+
178+
match result {
179+
Ok(_) => Ok(Response::ok().body(())),
180+
Err(e) => Err(Response::internal_error_no_request(&e.to_string())),
181+
}
182+
}
183+
}
184+
185+
impl NodeManager {
186+
pub async fn start_http_header_service(
187+
&self,
188+
context: &Context,
189+
listener_address: Address,
190+
headers: Vec<(String, String)>,
191+
) -> ockam_core::Result<()> {
192+
let policy_access_control = if let Some(project_authority) = self.project_authority() {
193+
Some(
194+
self.policy_access_control(
195+
Some(project_authority),
196+
Resource::new(listener_address.to_string(), ResourceType::TcpInlet),
197+
Action::HandleMessage,
198+
None,
199+
)
200+
.await?,
201+
)
202+
} else {
203+
None
204+
};
205+
206+
StaticHttpHeadersInterceptor::start_listener(
207+
context,
208+
listener_address.clone(),
209+
headers,
210+
policy_access_control,
211+
)
212+
.await?;
213+
214+
self.registry
215+
.http_headers_interceptors
216+
.insert(listener_address, HttpHeaderInterceptorInfo {});
217+
218+
Ok(())
219+
}
220+
221+
pub async fn delete_http_overwrite_header_service(
222+
&self,
223+
context: &Context,
224+
listener_address: &Address,
225+
) -> ockam_core::Result<()> {
226+
context.stop_address(listener_address)?;
227+
228+
self.registry
229+
.http_headers_interceptors
230+
.remove(listener_address);
231+
232+
Ok(())
233+
}
234+
}
235+
236+
#[cfg(test)]
237+
mod test {
238+
use super::*;
239+
use crate::nodes::service::{NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions};
240+
use crate::test_utils::start_manager_for_tests;
241+
use ockam_core::NeutralMessage;
242+
use ockam_transport_tcp::PortalMessage;
243+
244+
#[ockam::test]
245+
async fn main(context: &mut Context) -> ockam::Result<()> {
246+
let handler = start_manager_for_tests(
247+
context,
248+
None,
249+
Some(NodeManagerTrustOptions::new(
250+
NodeManagerCredentialRetrieverOptions::None,
251+
NodeManagerCredentialRetrieverOptions::None,
252+
None,
253+
NodeManagerCredentialRetrieverOptions::None,
254+
)),
255+
)
256+
.await?;
257+
258+
StaticHttpHeadersInterceptor::start_listener(
259+
context,
260+
"http_interceptor".into(),
261+
vec![("Host".to_string(), "ockam.io".to_string())],
262+
None,
263+
)
264+
.await?;
265+
266+
let connection = handler
267+
.node_manager
268+
.make_connection(
269+
context,
270+
&format!(
271+
"/service/http_interceptor/service/{}",
272+
context.primary_address().address()
273+
)
274+
.parse()?,
275+
handler.node_manager.identifier(),
276+
None,
277+
None,
278+
)
279+
.await?;
280+
281+
let route = connection.route()?;
282+
283+
context
284+
.send(route.clone(), PortalMessage::Ping.to_neutral_message()?)
285+
.await?;
286+
287+
let _ = context.receive::<NeutralMessage>().await?;
288+
289+
context
290+
.send(
291+
route.clone(),
292+
PortalMessage::Payload(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n", None)
293+
.to_neutral_message()?,
294+
)
295+
.await?;
296+
297+
let message = context.receive::<NeutralMessage>().await?;
298+
let message = PortalMessage::decode(message.payload())?;
299+
300+
if let PortalMessage::Payload(payload, _) = message {
301+
let message = String::from_utf8(payload.to_vec()).unwrap();
302+
assert_eq!(message, "GET / HTTP/1.1\r\nHost: ockam.io\r\n\r\n");
303+
} else {
304+
panic!("Decoded message is not a Payload");
305+
}
306+
307+
Ok(())
308+
}
309+
}

Diff for: implementations/rust/ockam/ockam_api/src/http/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub(crate) mod interceptor;
2+
pub mod state;

0 commit comments

Comments
 (0)