Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0e3f6eb

Browse files
committedFeb 17, 2025··
feat(rust): added the possibility to overwrite http headers in inlets
1 parent 9944b7d commit 0e3f6eb

File tree

21 files changed

+717
-194
lines changed

21 files changed

+717
-194
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
use crate::http::state::{ClientRequestWriter, RequestState};
2+
use crate::nodes::models::services::{DeleteServiceRequest, StartServiceRequest};
3+
use crate::nodes::registry::HttpHeaderInterceptorInfo;
4+
use crate::nodes::{NodeManager, NodeManagerWorker};
5+
use crate::DefaultAddress;
6+
use httparse::Request;
7+
use minicbor::{CborLen, Decode, Encode};
8+
use ockam_abac::{Action, PolicyAccessControl, Resource, ResourceType};
9+
use ockam_core::api::Response;
10+
use ockam_core::errcode::{Kind, Origin};
11+
use ockam_core::{async_trait, Address, AllowAll, IncomingAccessControl, OutgoingAccessControl};
12+
use ockam_node::Context;
13+
use ockam_transport_tcp::{
14+
read_portal_payload_length, Direction, PortalInletInterceptor, PortalInterceptor,
15+
PortalInterceptorFactory,
16+
};
17+
use std::io::Write;
18+
use std::sync::{Arc, Mutex as SyncMutex};
19+
20+
struct HttpHeadersInterceptorFactory {
21+
headers: Arc<Vec<(String, String)>>,
22+
}
23+
24+
impl HttpHeadersInterceptorFactory {
25+
pub async fn create(
26+
context: &Context,
27+
listener_address: Address,
28+
headers: Vec<(String, String)>,
29+
policy_access_control: Option<PolicyAccessControl>,
30+
) -> ockam_core::Result<()> {
31+
let flow_control_id = context
32+
.flow_controls()
33+
.get_flow_control_with_spawner(&Address::from_string(
34+
DefaultAddress::SECURE_CHANNEL_LISTENER,
35+
))
36+
.ok_or_else(|| {
37+
ockam_core::Error::new(
38+
Origin::Channel,
39+
Kind::NotFound,
40+
"Secure channel listener not found",
41+
)
42+
})?;
43+
44+
context
45+
.flow_controls()
46+
.add_consumer(&listener_address, &flow_control_id);
47+
48+
let incoming_access_control: Arc<dyn IncomingAccessControl>;
49+
let outgoing_access_control: Arc<dyn OutgoingAccessControl>;
50+
if let Some(policy_access_control) = policy_access_control {
51+
incoming_access_control = Arc::new(policy_access_control.create_incoming());
52+
outgoing_access_control = Arc::new(policy_access_control.create_outgoing(context)?);
53+
} else {
54+
incoming_access_control = Arc::new(AllowAll);
55+
outgoing_access_control = Arc::new(AllowAll);
56+
}
57+
58+
PortalInletInterceptor::create(
59+
context,
60+
listener_address,
61+
Arc::new(HttpHeadersInterceptorFactory {
62+
headers: Arc::new(headers),
63+
}),
64+
incoming_access_control,
65+
outgoing_access_control,
66+
read_portal_payload_length(),
67+
)
68+
}
69+
}
70+
71+
impl PortalInterceptorFactory for HttpHeadersInterceptorFactory {
72+
fn create(&self) -> Arc<dyn PortalInterceptor> {
73+
Arc::new(HttpHeadersInterceptor {
74+
headers: self.headers.clone(),
75+
state: SyncMutex::new(RequestState::ParsingHeader(None)),
76+
})
77+
}
78+
}
79+
80+
struct HttpHeadersInterceptor {
81+
headers: Arc<Vec<(String, String)>>,
82+
state: SyncMutex<RequestState>,
83+
}
84+
85+
#[async_trait]
86+
impl PortalInterceptor for HttpHeadersInterceptor {
87+
async fn intercept(
88+
&self,
89+
_context: &mut Context,
90+
direction: Direction,
91+
buffer: &[u8],
92+
) -> ockam_core::Result<Option<Vec<u8>>> {
93+
match direction {
94+
Direction::FromOutletToInlet => Ok(Some(buffer.to_vec())),
95+
Direction::FromInletToOutlet => {
96+
let mut guard = self.state.lock().unwrap();
97+
Ok(Some(guard.process_http_buffer(buffer, self)?))
98+
}
99+
}
100+
}
101+
}
102+
103+
impl ClientRequestWriter for &HttpHeadersInterceptor {
104+
fn write_headers(&self, request: &Request, buffer: &mut Vec<u8>) -> ockam_core::Result<()> {
105+
write!(
106+
buffer,
107+
"{} {} HTTP/1.{}\r\n",
108+
request.method.unwrap(),
109+
request.path.unwrap(),
110+
request.version.unwrap()
111+
)
112+
.unwrap();
113+
114+
for (name, value) in self.headers.iter() {
115+
write!(buffer, "{}: {}\r\n", name, value).unwrap();
116+
}
117+
118+
for h in &*request.headers {
119+
if !self
120+
.headers
121+
.iter()
122+
.any(|(name, _)| name.eq_ignore_ascii_case(h.name))
123+
{
124+
write!(buffer, "{}: ", h.name).unwrap();
125+
buffer.extend_from_slice(h.value);
126+
buffer.extend_from_slice(b"\r\n");
127+
}
128+
}
129+
130+
buffer.extend_from_slice(b"\r\n");
131+
Ok(())
132+
}
133+
}
134+
135+
/// Request body to create a new HTTP rewrite headers interceptor
136+
#[derive(Clone, Debug, Encode, Decode, CborLen)]
137+
#[rustfmt::skip]
138+
#[cbor(map)]
139+
pub struct HttpHeadersInterceptorRequest {
140+
#[n(0)] pub headers: Vec<(String, String)>,
141+
}
142+
143+
impl NodeManagerWorker {
144+
pub async fn start_http_header_service(
145+
&self,
146+
context: &Context,
147+
request: StartServiceRequest<HttpHeadersInterceptorRequest>,
148+
) -> ockam_core::Result<Response<()>, Response<ockam_core::api::Error>> {
149+
let result = self
150+
.node_manager
151+
.start_http_header_service(
152+
context,
153+
Address::from_string(request.address()),
154+
request.request().headers.clone(),
155+
)
156+
.await;
157+
158+
match result {
159+
Ok(_) => Ok(Response::ok().body(())),
160+
Err(e) => Err(Response::internal_error_no_request(&e.to_string())),
161+
}
162+
}
163+
164+
pub async fn delete_http_overwrite_header_service(
165+
&self,
166+
context: &Context,
167+
request: DeleteServiceRequest,
168+
) -> ockam_core::Result<Response<()>, Response<ockam_core::api::Error>> {
169+
let result = self
170+
.node_manager
171+
.delete_http_overwrite_header_service(context, &Address::from_string(request.address()))
172+
.await;
173+
174+
match result {
175+
Ok(_) => Ok(Response::ok().body(())),
176+
Err(e) => Err(Response::internal_error_no_request(&e.to_string())),
177+
}
178+
}
179+
}
180+
181+
impl NodeManager {
182+
pub async fn start_http_header_service(
183+
&self,
184+
context: &Context,
185+
listener_address: Address,
186+
headers: Vec<(String, String)>,
187+
) -> ockam_core::Result<()> {
188+
let policy_access_control = if let Some(project_authority) = self.project_authority() {
189+
Some(
190+
self.policy_access_control(
191+
Some(project_authority),
192+
Resource::new(listener_address.to_string(), ResourceType::TcpInlet),
193+
Action::HandleMessage,
194+
None,
195+
)
196+
.await?,
197+
)
198+
} else {
199+
None
200+
};
201+
202+
HttpHeadersInterceptorFactory::create(
203+
context,
204+
listener_address.clone(),
205+
headers,
206+
policy_access_control,
207+
)
208+
.await?;
209+
210+
self.registry
211+
.http_headers_interceptors
212+
.insert(listener_address, HttpHeaderInterceptorInfo {});
213+
214+
Ok(())
215+
}
216+
217+
pub async fn delete_http_overwrite_header_service(
218+
&self,
219+
context: &Context,
220+
listener_address: &Address,
221+
) -> ockam_core::Result<()> {
222+
context.stop_address(listener_address)?;
223+
224+
self.registry
225+
.http_headers_interceptors
226+
.remove(listener_address);
227+
228+
Ok(())
229+
}
230+
}
231+
232+
#[cfg(test)]
233+
mod test {
234+
use super::*;
235+
use crate::nodes::service::{NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions};
236+
use crate::test_utils::start_manager_for_tests;
237+
use ockam_core::NeutralMessage;
238+
use ockam_transport_tcp::PortalMessage;
239+
240+
#[ockam::test]
241+
async fn main(context: &mut Context) -> ockam::Result<()> {
242+
let handler = start_manager_for_tests(
243+
context,
244+
None,
245+
Some(NodeManagerTrustOptions::new(
246+
NodeManagerCredentialRetrieverOptions::None,
247+
NodeManagerCredentialRetrieverOptions::None,
248+
None,
249+
NodeManagerCredentialRetrieverOptions::None,
250+
)),
251+
)
252+
.await?;
253+
254+
HttpHeadersInterceptorFactory::create(
255+
context,
256+
"http_interceptor".into(),
257+
vec![("Host".to_string(), "ockam.io".to_string())],
258+
None,
259+
)
260+
.await?;
261+
262+
let connection = handler
263+
.node_manager
264+
.make_connection(
265+
context,
266+
&format!(
267+
"/service/http_interceptor/service/{}",
268+
context.primary_address().address()
269+
)
270+
.parse()?,
271+
handler.node_manager.identifier(),
272+
None,
273+
None,
274+
)
275+
.await?;
276+
277+
let route = connection.route()?;
278+
279+
context
280+
.send(route.clone(), PortalMessage::Ping.to_neutral_message()?)
281+
.await?;
282+
283+
let _ = context.receive::<NeutralMessage>().await?;
284+
285+
context
286+
.send(
287+
route.clone(),
288+
PortalMessage::Payload(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n", None)
289+
.to_neutral_message()?,
290+
)
291+
.await?;
292+
293+
let message = context.receive::<NeutralMessage>().await?;
294+
let message = PortalMessage::decode(message.payload())?;
295+
296+
if let PortalMessage::Payload(payload, _) = message {
297+
let message = String::from_utf8(payload.to_vec()).unwrap();
298+
assert_eq!(message, "GET / HTTP/1.1\r\nHost: ockam.io\r\n\r\n");
299+
} else {
300+
panic!("Decoded message is not a Payload");
301+
}
302+
303+
Ok(())
304+
}
305+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub(crate) mod interceptor;
2+
pub mod state;

0 commit comments

Comments
 (0)
Please sign in to comment.