Skip to content

Commit

Permalink
Release client-0.3.1, protocol-0.2.1, server-0.3.1, standard-0.2.1, t…
Browse files Browse the repository at this point in the history
…ests-0.2.1 (#157)
  • Loading branch information
petehayes102 authored Dec 24, 2023
1 parent 2351372 commit 3e08f3d
Show file tree
Hide file tree
Showing 30 changed files with 222 additions and 137 deletions.
99 changes: 19 additions & 80 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@
- Added connection reestablishment for failed connections
- Features for Selium Cloud
- Bump dependency versions

## v0.3.1

- Add closure so client can gracefully handle replier errors
- Add CA certificate
- Bug fix for Selium Cloud
5 changes: 3 additions & 2 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[package]
name = "selium"
version = "0.3.0"
version = "0.3.1"
description = """
An extremely developer friendly, composable messaging platform with zero build
time configuration.
"""
include = ["src/**/*", "proxy.debug.der", "proxy.prod.der"]
include = ["src/**/*", "ca.debug.der", "ca.prod.der"]
edition.workspace = true
authors.workspace = true
license.workspace = true
Expand Down Expand Up @@ -40,6 +40,7 @@ chrono = ["dep:chrono"]
std-compression = ["selium-std/compression"]
std-codec = ["selium-std/codec"]
std = ["std-compression", "std-codec"]
__notopiccheck = ["selium-protocol/__notopiccheck"]

[[example]]
name = "publish"
Expand Down
Binary file added client/ca.debug.der
Binary file not shown.
Binary file added client/ca.prod.der
Binary file not shown.
12 changes: 11 additions & 1 deletion client/examples/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,17 @@ async fn main() -> Result<()> {
.open()
.await?;

replier.listen().await?;
// To handle errors gracefully without terminating the listener, create an error
// handler that returns 'true'.
replier
.listen(|e| {
eprintln!("{e:?}");
true
})
.await?;

// To terminate on errors, create an error handler that returns 'false'
// replier.listen(|_| false).await?;

Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion client/proxy.debug.der

This file was deleted.

1 change: 0 additions & 1 deletion client/proxy.prod.der

This file was deleted.

4 changes: 2 additions & 2 deletions client/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ pub const KEEP_ALIVE_DEFAULT: u64 = 5_000;
pub const RETENTION_POLICY_DEFAULT: u64 = 0;

#[cfg(debug_assertions)]
pub(crate) const CLOUD_CA: &[u8; 469] = include_bytes!("../proxy.debug.der");
pub(crate) const CLOUD_CA: &[u8; 390] = include_bytes!("../ca.debug.der");
#[cfg(debug_assertions)]
pub(crate) const SELIUM_CLOUD_REMOTE_URL: &str = "127.0.0.1:7002";

#[cfg(not(debug_assertions))]
pub(crate) const CLOUD_CA: &[u8; 470] = include_bytes!("../proxy.prod.der");
pub(crate) const CLOUD_CA: &[u8; 391] = include_bytes!("../ca.prod.der");
#[cfg(not(debug_assertions))]
pub(crate) const SELIUM_CLOUD_REMOTE_URL: &str = "selium.io:7001";
36 changes: 24 additions & 12 deletions client/src/streams/request_reply/replier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use selium_protocol::{BiStream, Frame, MessagePayload, ReplierPayload, TopicName
use selium_std::errors::{CodecError, Result, SeliumError};
use selium_std::traits::codec::{MessageDecoder, MessageEncoder};
use selium_std::traits::compression::{Compress, Decompress};
use std::fmt::Debug;
use std::{marker::PhantomData, pin::Pin, sync::Arc};
use tokio::sync::MutexGuard;

Expand Down Expand Up @@ -113,6 +114,7 @@ impl<D, E, Err, F, Fut, ReqItem, ResItem> Open
where
D: MessageDecoder<ReqItem> + Send + Unpin,
E: MessageEncoder<ResItem> + Send + Unpin,
Err: Debug,
F: FnMut(ReqItem) -> Fut + Send + Unpin,
Fut: Future<Output = std::result::Result<ResItem, Err>>,
ReqItem: Unpin + Send,
Expand Down Expand Up @@ -164,6 +166,7 @@ impl<D, E, Err, F, Fut, ReqItem, ResItem> Replier<E, D, F, ReqItem, ResItem>
where
D: MessageDecoder<ReqItem> + Send + Unpin,
E: MessageEncoder<ResItem> + Send + Unpin,
Err: Debug,
F: FnMut(ReqItem) -> Fut + Send + Unpin,
Fut: Future<Output = std::result::Result<ResItem, Err>>,
ReqItem: Unpin + Send,
Expand Down Expand Up @@ -244,7 +247,7 @@ where
let decoded = self.decode_message(req_payload.message)?;
let response = (self.handler)(decoded)
.await
.map_err(|_| SeliumError::RequestHandlerFailure)?;
.map_err(|e| SeliumError::RequestHandlerFailure(format!("{e:?}")))?;
let encoded = self.encode_message(response)?;

let res_payload = MessagePayload {
Expand All @@ -260,22 +263,31 @@ where

/// Prepares a [Replier] stream to begin processing incoming messages.
/// This method will block the current task until the stream has been exhausted.
pub async fn listen(mut self) -> Result<()> {
pub async fn listen<H>(mut self, mut error_handler: H) -> Result<()>
where
H: FnMut(&SeliumError) -> bool,
{
while let Some(Ok(request)) = self.stream.next().await {
match request {
Frame::Message(req) => self.handle_request(req).await?,
Frame::Error(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(s) => return Err(SeliumError::OpenStream(s)),
Err(_) => return Err(SeliumError::OpenStream("Invalid UTF-8 error".into())),
},
_ => {
return Err(SeliumError::OpenStream(
"Invalid frame returned from server".into(),
))
if let Err(e) = self.handle_frame(request).await {
if !error_handler(&e) {
return Err(e);
}
}
}

Ok(())
}

async fn handle_frame(&mut self, request: Frame) -> Result<()> {
match request {
Frame::Message(req) => Ok(self.handle_request(req).await?),
Frame::Error(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(s) => Err(SeliumError::OpenStream(s)),
Err(_) => Err(SeliumError::OpenStream("Invalid UTF-8 error".into())),
},
_ => Err(SeliumError::OpenStream(
"Invalid frame returned from server".into(),
)),
}
}
}
4 changes: 4 additions & 0 deletions protocol/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@
- Added graceful shutdown protocol
- Features for Selium Cloud
- Bump dependency versions

## v0.2.1

- Bug fix for Selium Cloud
5 changes: 4 additions & 1 deletion protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "selium-protocol"
version = "0.2.0"
version = "0.2.1"
description = """
selium-protocol is a dependent crate of Selium. Do not use directly.
"""
Expand All @@ -12,6 +12,9 @@ homepage.workspace = true
repository.workspace = true
readme.workspace = true

[features]
__notopiccheck = []

[dependencies]
anyhow = "1.0"
bincode = "1.3"
Expand Down
Loading

0 comments on commit 3e08f3d

Please sign in to comment.