Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): enable requests to be messages #8783

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/actions/cache_nix/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ runs:
- name: Restore and Save Cache
id: nix-restore-and-save
if: ${{ github.event_name == 'schedule' }}
uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84
uses: actions/cache@v4
with:
path: |
/tmp/nix-cache
Expand All @@ -26,7 +26,7 @@ runs:
- name: Restore Cache
id: nix-restore
if: ${{ github.event_name != 'schedule' }}
uses: actions/cache/restore@704facf57e6136b1bc63b828d79edcd491f0ee84
uses: actions/cache/restore@v4
with:
path: |
/tmp/nix-cache
Expand Down
4 changes: 2 additions & 2 deletions .github/actions/nix_installer_and_cache/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ runs:
- name: Restore and Save Cache
id: nix-restore-and-save
if: ${{ github.event_name == 'schedule' }}
uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84
uses: actions/cache@v4
with:
path: |
/tmp/nix-cache
Expand All @@ -28,7 +28,7 @@ runs:
- name: Restore Cache
id: nix-restore
if: ${{ github.event_name != 'schedule' }}
uses: actions/cache/restore@704facf57e6136b1bc63b828d79edcd491f0ee84
uses: actions/cache/restore@v4
with:
path: |
/tmp/nix-cache
Expand Down
31 changes: 26 additions & 5 deletions examples/rust/file_transfer/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
use ockam::Message;
use ockam::{deserialize, serialize, Decodable, Encodable, Encoded, Message};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Message)]
pub struct FileDescription {
pub name: String,
pub size: usize,
}
impl Message for FileDescription {}

#[derive(Serialize, Deserialize)]
impl Encodable for FileDescription {
fn encode(self) -> ockam::Result<Encoded> {
serialize(self)
}
}

impl Decodable for FileDescription {
fn decode(v: &[u8]) -> ockam::Result<Self> {
deserialize(v)
}
}

#[derive(Serialize, Deserialize, Message)]
pub enum FileData {
Description(FileDescription),
Data(Vec<u8>),
Quit,
}

impl Message for FileData {}
impl Encodable for FileData {
fn encode(self) -> ockam::Result<Encoded> {
serialize(self)
}
}

impl Decodable for FileData {
fn decode(v: &[u8]) -> ockam::Result<Self> {
deserialize(v)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main(ctx: Context) -> Result<()> {
// Send a message to the "echoer" worker on a different node, over a tcp transport.
// Wait to receive a reply and print it.
let r = route![connection_to_responder, "echoer"];
let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;
let reply: String = node.send_and_receive(r, "Hello Ockam!".to_string()).await?;

println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main(ctx: Context) -> Result<()> {
// Send a message to the "echoer" worker, on a different node, over two tcp hops.
// Wait to receive a reply and print it.
let r = route![connection_to_middle_node, "forward_to_responder", "echoer"];
let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;
let reply: String = node.send_and_receive(r, "Hello Ockam!".to_string()).await?;
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main(ctx: Context) -> Result<()> {
// Send a message to the "echoer" worker on a different node, over a udp transport.
// Wait to receive a reply and print it.
let r = route![bind, (UDP, "localhost:4000"), "echoer"];
let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;
let reply: String = node.send_and_receive(r, "Hello Ockam!".to_string()).await?;

println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ async fn main(ctx: Context) -> Result<()> {

// Send a message to the echoer worker via the channel.
// Wait to receive a reply and print it.
let reply = node
.send_and_receive::<String>(route![channel, "echoer"], "Hello Ockam!".to_string())
let reply: String = node
.send_and_receive(route![channel, "echoer"], "Hello Ockam!".to_string())
.await?;
println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ async fn main(ctx: Context) -> Result<()> {

// Send a message to the echoer worker via the channel.
// Wait to receive a reply and print it.
let reply = node
.send_and_receive::<String>(route![channel, "echoer"], "Hello Ockam!".to_string())
let reply: String = node
.send_and_receive(route![channel, "echoer"], "Hello Ockam!".to_string())
.await?;
println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ async fn main(ctx: Context) -> Result<()> {

// Send a message to the worker at address "echoer".
// Wait to receive a reply and print it.
let reply = node
.send_and_receive::<String>(
let reply: String = node
.send_and_receive(
route![channel, DefaultAddress::ECHO_SERVICE],
"Hello Ockam!".to_string(),
)
Expand Down
5 changes: 3 additions & 2 deletions implementations/rust/ockam/ockam/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ pub use ockam_core::processor;
/// may be changed in the future to a [`Worker`](crate::Worker)-specific macro.
pub use ockam_core::worker;
pub use ockam_core::{
allow, deny, errcode, route, Address, Any, Encoded, Error, LocalMessage, Mailbox, Mailboxes,
Message, Processor, ProtocolId, Result, Route, Routed, TransportMessage, TryClone, Worker,
allow, deny, deserialize, errcode, route, serialize, Address, Any, Decodable, Encodable,
Encoded, Error, LocalMessage, Mailbox, Mailboxes, Message, Processor, ProtocolId, Result,
Route, Routed, TransportMessage, TryClone, Worker,
};
pub use ockam_identity as identity;
// ---
Expand Down
14 changes: 13 additions & 1 deletion implementations/rust/ockam/ockam/src/remote/info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Message;
use ockam_core::compat::string::String;
use ockam_core::flow_control::FlowControlId;
use ockam_core::{Address, Route};
use ockam_core::{deserialize, serialize, Address, Decodable, Encodable, Encoded, Route};
use serde::{Deserialize, Serialize};

/// Information about a remotely forwarded worker.
Expand All @@ -13,6 +13,18 @@ pub struct RemoteRelayInfo {
flow_control_id: Option<FlowControlId>,
}

impl Encodable for RemoteRelayInfo {
fn encode(self) -> ockam_core::Result<Encoded> {
serialize(self)
}
}

impl Decodable for RemoteRelayInfo {
fn decode(v: &[u8]) -> ockam_core::Result<Self> {
deserialize(v)
}
}

impl RemoteRelayInfo {
/// Constructor
pub fn new(
Expand Down
24 changes: 23 additions & 1 deletion implementations/rust/ockam/ockam/tests/message/test.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
use ockam::Message;
use ockam::*;
use serde::{Deserialize, Serialize};

#[derive(Message, Deserialize, Serialize)]
pub struct Tmp {
a: String,
}

impl Encodable for Tmp {
fn encode(self) -> Result<Encoded> {
serialize(self)
}
}
impl Decodable for Tmp {
fn decode(e: &[u8]) -> Result<Tmp> {
deserialize(e)
}
}

#[derive(Message, Deserialize, Serialize)]
pub struct Tmp1 {
a: Vec<u8>,
b: Vec<Tmp>,
}

impl Encodable for Tmp1 {
fn encode(self) -> Result<Encoded> {
serialize(self)
}
}
impl Decodable for Tmp1 {
fn decode(e: &[u8]) -> Result<Tmp1> {
deserialize(e)
}
}

fn assert_impl<T: Message>() {}
fn main() {
assert_impl::<String>();
Expand Down
12 changes: 6 additions & 6 deletions implementations/rust/ockam/ockam/tests/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ async fn test1(ctx: &mut Context) -> Result<()> {

let remote_info = RemoteRelay::create(ctx, route![], RemoteRelayOptions::new()).await?;

let resp = ctx
.send_and_receive::<String>(
let resp: String = ctx
.send_and_receive(
route![remote_info.remote_address(), "echoer"],
"Hello".to_string(),
)
Expand Down Expand Up @@ -62,8 +62,8 @@ async fn test2(ctx: &mut Context) -> Result<()> {
.connect(cloud_listener.socket_string(), TcpConnectionOptions::new())
.await?;

let resp = ctx
.send_and_receive::<String>(
let resp: String = ctx
.send_and_receive(
route![cloud_connection, remote_info.remote_address(), "echoer"],
"Hello".to_string(),
)
Expand Down Expand Up @@ -238,8 +238,8 @@ async fn test4(ctx: &mut Context) -> Result<()> {
)
.await?;

let resp = ctx
.send_and_receive::<String>(route![tunnel_channel, "echoer"], "Hello".to_string())
let resp: String = ctx
.send_and_receive(route![tunnel_channel, "echoer"], "Hello".to_string())
.await?;

assert_eq!(resp, "Hello");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use core::time::Duration;
use minicbor::Decoder;
use tracing::trace;

use crate::authenticator::credential_issuer::CredentialIssuer;
use crate::authenticator::direct::AccountAuthorityInfo;
use crate::authenticator::AuthorityMembersRepository;
use ockam::identity::{Credentials, Identifier, IdentitiesAttributes};
use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::api::{Method, Request, Response};
use ockam_core::compat::boxed::Box;
use ockam_core::compat::sync::Arc;
use ockam_core::compat::vec::Vec;
Expand Down Expand Up @@ -49,41 +48,46 @@ impl CredentialIssuerWorker {
#[ockam_core::worker]
impl Worker for CredentialIssuerWorker {
type Context = Context;
type Message = Vec<u8>;
type Message = Request<Vec<u8>>;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
let secure_channel_info = match SecureChannelLocalInfo::find_info(m.local_message()) {
Ok(secure_channel_info) => secure_channel_info,
Err(_e) => {
let resp = Response::bad_request_no_request("secure channel required").to_vec()?;
let resp =
Response::bad_request_no_request("secure channel required").encode_body()?;
c.send(m.return_route().clone(), resp).await?;
return Ok(());
}
};

let from = Identifier::from(secure_channel_info.their_identifier());
let return_route = m.return_route().clone();
let body = m.into_body()?;
let mut dec = Decoder::new(&body);
let req: RequestHeader = dec.decode()?;
let request = m.into_body()?;
let header = request.header();
trace! {
target: "credential_issuer",
from = %from,
id = %req.id(),
method = ?req.method(),
path = %req.path(),
body = %req.has_body(),
id = %header.id(),
method = ?header.method(),
path = %header.path(),
body = %header.has_body(),
"request"
}
let res = match (req.method(), req.path()) {
let res = match (header.method(), header.path()) {
(Some(Method::Post), "/") | (Some(Method::Post), "/credential") => {
match self.credential_issuer.issue_credential(&from).await {
Ok(Some(crd)) => Response::ok().with_headers(&req).body(crd).to_vec()?,
Ok(None) => Response::forbidden(&req, "unauthorized member").to_vec()?,
Err(error) => Response::internal_error(&req, &error.to_string()).to_vec()?,
Ok(Some(crd)) => Response::ok()
.with_headers(header)
.body(crd)
.encode_body()?,
Ok(None) => Response::forbidden(header, "unauthorized member").encode_body()?,
Err(error) => {
Response::internal_error(header, &error.to_string()).encode_body()?
}
}
}
_ => Response::unknown_path(&req).to_vec()?,
_ => Response::unknown_path(header).encode_body()?,
};

c.send(return_route, res).await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use miette::IntoDiagnostic;
use std::collections::{BTreeMap, HashMap};

use ockam::identity::models::IdentifierList;
use ockam::identity::AttributesEntry;
use ockam::identity::Identifier;
use ockam_core::api::Request;
use ockam_core::async_trait;
use ockam_node::Context;

use crate::authenticator::direct::types::AddMember;
use crate::authenticator::direct::types::{AddMember, MemberList};
use crate::nodes::service::default_address::DefaultAddress;
use crate::orchestrator::{AuthorityNodeClient, HasSecureClient};

Expand Down Expand Up @@ -90,24 +91,28 @@ impl Members for AuthorityNodeClient {

async fn list_member_ids(&self, ctx: &Context) -> miette::Result<Vec<Identifier>> {
let req = Request::get("/member_ids");
self.get_secure_client()
let identifiers: IdentifierList = self
.get_secure_client()
.ask(ctx, DefaultAddress::DIRECT_AUTHENTICATOR, req)
.await
.into_diagnostic()?
.success()
.into_diagnostic()
.into_diagnostic()?;
Ok(identifiers.0)
}

async fn list_members(
&self,
ctx: &Context,
) -> miette::Result<HashMap<Identifier, AttributesEntry>> {
let req = Request::get("/");
self.get_secure_client()
let member_list: MemberList = self
.get_secure_client()
.ask(ctx, DefaultAddress::DIRECT_AUTHENTICATOR, req)
.await
.into_diagnostic()?
.success()
.into_diagnostic()
.into_diagnostic()?;
Ok(member_list.0)
}
}
Loading
Loading