From b9d0ae91a9058251c1ef9a74396bb28c614739ec Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Mon, 18 Apr 2022 18:29:35 -0700 Subject: [PATCH 01/20] Transition to proper errors; minor cleanup --- Cargo.toml | 2 +- src/bin/loadtest.rs | 3 +- src/consumer/mod.rs | 7 +++-- src/error.rs | 63 ++++++++++++++++++++++++++++------------ src/lib.rs | 4 +-- src/producer/mod.rs | 37 +++++++++++++---------- src/proto/mod.rs | 53 ++++++++++++++++++--------------- src/proto/single/cmd.rs | 28 +++++++++--------- src/proto/single/mod.rs | 5 ++-- src/proto/single/resp.rs | 63 ++++++++++++++++++++-------------------- tests/consumer.rs | 1 - tests/mock/mod.rs | 4 +-- tests/producer.rs | 3 +- tests/real.rs | 1 - 14 files changed, 155 insertions(+), 119 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5acfec81..18e6b12c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,9 @@ chrono = { version = "0.4", features = ["serde"] } url = "2" atomic-option = "0.1" fnv = "1.0.3" -failure = "0.1" native-tls = { version = "0.2", optional = true } clap = { version = "2.27.1", optional = true } +thiserror = "1.0.30" [dev-dependencies] mockstream = "0.0.3" diff --git a/src/bin/loadtest.rs b/src/bin/loadtest.rs index cc82c368..5186bc78 100644 --- a/src/bin/loadtest.rs +++ b/src/bin/loadtest.rs @@ -1,6 +1,5 @@ #[macro_use] extern crate clap; -extern crate failure; extern crate faktory; extern crate rand; extern crate serde_json; @@ -54,7 +53,7 @@ fn main() { let popped = sync::Arc::new(atomic::AtomicUsize::new(0)); let start = time::Instant::now(); - let threads: Vec>> = (0..threads) + let threads: Vec>> = (0..threads) .map(|_| { let pushed = sync::Arc::clone(&pushed); let popped = sync::Arc::clone(&popped); diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 92015904..30138fe8 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -1,6 +1,6 @@ +use crate::error::Error; use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect}; use atomic_option::AtomicOption; -use failure::Error; use fnv::FnvHashMap; use std::error::Error as StdError; use std::io::prelude::*; @@ -251,7 +251,8 @@ impl Consumer { impl Consumer { fn reconnect(&mut self) -> Result<(), Error> { - self.c.reconnect() + self.c.reconnect()?; + Ok(()) } } @@ -397,7 +398,7 @@ where // the resulting OK that failed. in that case, we would get an error response // when re-sending the job response. this should not count as critical. other // errors, however, should! - if e.downcast_ref::<::std::io::Error>().is_some() { + if let Error::GenericIO(_) = e { last_job_result.swap(res, atomic::Ordering::SeqCst); return Err(e); } diff --git a/src/error.rs b/src/error.rs index 9dfa1c1b..9820e989 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,26 +1,52 @@ +use thiserror::Error; + +use crate::proto::ConnectError; + +/// The set of observable errors when interacting with a Faktory server. +#[derive(Debug, Error)] +#[allow(clippy::manual_non_exhaustive)] +pub enum Error { + /// The connection to the server, or one of its prerequisites, failed. + #[error("connection error: {0}")] + Connect(#[from] ConnectError), + + /// Underlying io layer errors. + /// These are overwhelmingly network communication errors on the socket connection to the server. + #[error("underlying i/o: {0}")] + GenericIO(#[from] std::io::Error), + + /// Application-level errors. + /// These generally indicate a mismatch between what the client expects and what the server expects. + #[error("protocol: {0}")] + Protocol(#[from] Protocol), + + /// Faktory payloads are JSON encoded. + /// This error is one that was encountered when attempting to deserialize a response from the server. + /// These generally indicate a mismatch between what the client expects and what the server provided. + #[error("deserialize payload: {0}")] + DeserializePayload(#[from] serde_json::Error), +} + /// The set of observable application-level errors when interacting with a Faktory server. -#[derive(Debug, Fail)] +#[derive(Debug, Error)] #[allow(clippy::manual_non_exhaustive)] -pub enum FaktoryError { +pub enum Protocol { /// The server reports that an issued request was malformed. - #[fail(display = "request was malformed: {}", desc)] + #[error("request was malformed: {desc}")] Malformed { /// Error reported by server desc: String, }, /// The server responded with an error. - #[fail(display = "an internal server error occurred: {}", msg)] + #[error("an internal server error occurred: {msg}")] Internal { /// The error message given by the server. msg: String, }, /// The server sent a response that did not match what was expected. - #[fail( - display = "expected {}, got unexpected response: {}", - expected, received - )] + #[error("expected {expected}, got unexpected response: {received}")] BadType { /// The expected response type. expected: &'static str, @@ -30,10 +56,7 @@ pub enum FaktoryError { }, /// The server sent a malformed response. - #[fail( - display = "server sent malformed {} response: {} in {:?}", - typed_as, error, bytes - )] + #[error("server sent malformed {typed_as} response: {error} in {bytes:?}")] BadResponse { /// The type of the server response. typed_as: &'static str, @@ -47,30 +70,32 @@ pub enum FaktoryError { // We're going to add more error types in the future // https://github.com/rust-lang/rust/issues/44109 - #[fail(display = "unreachable")] + // + // This forces users to write pattern matches with a catch-all `_` arm. + #[error("unreachable")] #[doc(hidden)] __Nonexhaustive, } -impl FaktoryError { +impl Protocol { pub(crate) fn new(line: String) -> Self { let mut parts = line.splitn(2, ' '); let code = parts.next(); let error = parts.next(); if error.is_none() { - return FaktoryError::Internal { + return Protocol::Internal { msg: code.unwrap().to_string(), }; } let error = error.unwrap().to_string(); match code { - Some("ERR") => FaktoryError::Internal { msg: error }, - Some("MALFORMED") => FaktoryError::Malformed { desc: error }, - Some(c) => FaktoryError::Internal { + Some("ERR") => Protocol::Internal { msg: error }, + Some("MALFORMED") => Protocol::Malformed { desc: error }, + Some(c) => Protocol::Internal { msg: format!("{} {}", c, error), }, - None => FaktoryError::Internal { + None => Protocol::Internal { msg: "empty error response".to_string(), }, } diff --git a/src/lib.rs b/src/lib.rs index 94cee0e6..31f4bc68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,8 +57,6 @@ #![deny(missing_docs)] #![warn(rust_2018_idioms)] -#[macro_use] -extern crate failure; #[macro_use] extern crate serde_derive; @@ -73,7 +71,7 @@ mod tls; pub use tls::TlsStream; pub use crate::consumer::{Consumer, ConsumerBuilder}; -pub use crate::error::FaktoryError; +pub use crate::error::{Error, Protocol}; pub use crate::producer::Producer; pub use crate::proto::Job; pub use crate::proto::Reconnect; diff --git a/src/producer/mod.rs b/src/producer/mod.rs index db10eb9f..d5ca646c 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -1,8 +1,11 @@ +use crate::error::Error; use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl}; -use failure::Error; use std::io::prelude::*; use std::net::TcpStream; +/// Default Error type, aliased in module for convenience. +type Result = std::result::Result; + /// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers. /// /// # Connecting to Faktory @@ -81,7 +84,7 @@ impl Producer { /// ``` /// /// If `url` is given, but does not specify a port, it defaults to 7419. - pub fn connect(url: Option<&str>) -> Result { + pub fn connect(url: Option<&str>) -> Result { let url = match url { Some(url) => proto::url_parse(url), None => proto::url_parse(&proto::get_env_url()), @@ -93,41 +96,45 @@ impl Producer { impl Producer { /// Connect to a Faktory server with a non-standard stream. - pub fn connect_with(stream: S, pwd: Option) -> Result, Error> { - Ok(Producer { - c: Client::new_producer(stream, pwd)?, - }) + pub fn connect_with(stream: S, pwd: Option) -> Result> { + let c = Client::new_producer(stream, pwd)?; + Ok(Producer { c }) } /// Enqueue the given job on the Faktory server. /// /// Returns `Ok` if the job was successfully queued by the Faktory server. - pub fn enqueue(&mut self, job: Job) -> Result<(), Error> { - self.c.issue(&Push::from(job))?.await_ok() + pub fn enqueue(&mut self, job: Job) -> Result<()> { + self.c.issue(&Push::from(job))?.await_ok()?; + Ok(()) } /// Retrieve information about the running server. /// /// The returned value is the result of running the `INFO` command on the server. - pub fn info(&mut self) -> Result { - self.c + pub fn info(&mut self) -> Result { + let v = self + .c .issue(&Info)? .read_json() - .map(|v| v.expect("info command cannot give empty response")) + .map(|v| v.expect("info command cannot give empty response"))?; + Ok(v) } /// Pause the given queues. - pub fn queue_pause>(&mut self, queues: &[T]) -> Result<(), Error> { + pub fn queue_pause>(&mut self, queues: &[T]) -> Result<()> { self.c .issue(&QueueControl::new(QueueAction::Pause, queues))? - .await_ok() + .await_ok()?; + Ok(()) } /// Resume the given queues. - pub fn queue_resume>(&mut self, queues: &[T]) -> Result<(), Error> { + pub fn queue_resume>(&mut self, queues: &[T]) -> Result<()> { self.c .issue(&QueueControl::new(QueueAction::Resume, queues))? - .await_ok() + .await_ok()?; + Ok(()) } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index b1a54418..1892669a 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,10 +1,11 @@ -use crate::FaktoryError; +use crate::error::Error; +use crate::Protocol; use bufstream::BufStream; -use failure::Error; use libc::getpid; use std::io; use std::io::prelude::*; use std::net::TcpStream; +use thiserror::Error; use url::Url; pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2; @@ -17,21 +18,23 @@ pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, Queue // responses that users can see pub use self::single::Hi; -#[derive(Debug, Fail)] +#[derive(Debug, Error)] pub enum ConnectError { - #[fail(display = "unknown scheme: {}", scheme)] + #[error("unknown scheme: {scheme}")] BadScheme { scheme: String }, - #[fail(display = "no hostname given")] + #[error("no hostname given")] MissingHostname, - #[fail(display = "server requires authentication")] + #[error("server requires authentication")] AuthenticationNeeded, - #[fail( - display = "server version mismatch (theirs: {}, ours: {})", - theirs, ours - )] + #[error("server version mismatch (theirs: {theirs}, ours: {ours})")] VersionMismatch { ours: usize, theirs: usize }, + #[error("parse URL: {0}")] + ParseUrl(#[source] url::ParseError), } +/// Default Error type, aliased in module for convenience. +type Result = std::result::Result; + pub(crate) fn get_env_url() -> String { use std::env; let var = env::var("FAKTORY_PROVIDER").unwrap_or_else(|_| "FAKTORY_URL".to_string()); @@ -42,8 +45,8 @@ pub(crate) fn host_from_url(url: &Url) -> String { format!("{}:{}", url.host_str().unwrap(), url.port().unwrap_or(7419)) } -pub(crate) fn url_parse(url: &str) -> Result { - let url = Url::parse(url)?; +pub(crate) fn url_parse(url: &str) -> Result { + let url = Url::parse(url).map_err(ConnectError::ParseUrl)?; if url.scheme() != "tcp" { return Err(ConnectError::BadScheme { scheme: url.scheme().to_string(), @@ -117,12 +120,12 @@ impl Client where S: Read + Write + Reconnect, { - pub(crate) fn connect_again(&self) -> Result { + pub(crate) fn connect_again(&self) -> Result { let s = self.stream.get_ref().reconnect()?; Client::new(s, self.opts.clone()) } - pub fn reconnect(&mut self) -> Result<(), Error> { + pub fn reconnect(&mut self) -> Result<()> { let s = self.stream.get_ref().reconnect()?; self.stream = BufStream::new(s); self.init() @@ -130,7 +133,7 @@ where } impl Client { - pub(crate) fn new(stream: S, opts: ClientOptions) -> Result, Error> { + pub(crate) fn new(stream: S, opts: ClientOptions) -> Result> { let mut c = Client { stream: BufStream::new(stream), opts, @@ -139,7 +142,7 @@ impl Client { Ok(c) } - pub(crate) fn new_producer(stream: S, pwd: Option) -> Result, Error> { + pub(crate) fn new_producer(stream: S, pwd: Option) -> Result> { let opts = ClientOptions { password: pwd, is_producer: true, @@ -150,7 +153,7 @@ impl Client { } impl Client { - fn init(&mut self) -> Result<(), Error> { + fn init(&mut self) -> Result<()> { let hi = single::read_hi(&mut self.stream)?; if hi.version != EXPECTED_PROTOCOL_VERSION { @@ -199,7 +202,9 @@ impl Client { return Err(ConnectError::AuthenticationNeeded.into()); } } - single::write_command_and_await_ok(&mut self.stream, &hello) + + single::write_command_and_await_ok(&mut self.stream, &hello)?; + Ok(()) } } @@ -221,12 +226,12 @@ impl Client { pub(crate) fn issue( &mut self, c: &FC, - ) -> Result, Error> { + ) -> Result> { single::write_command(&mut self.stream, c)?; Ok(ReadToken(self)) } - pub(crate) fn heartbeat(&mut self) -> Result { + pub(crate) fn heartbeat(&mut self) -> Result { single::write_command( &mut self.stream, &Heartbeat::new(&**self.opts.wid.as_ref().unwrap()), @@ -241,7 +246,7 @@ impl Client { { Some("terminate") => Ok(HeartbeatStatus::Terminate), Some("quiet") => Ok(HeartbeatStatus::Quiet), - _ => Err(FaktoryError::BadType { + _ => Err(Protocol::BadType { expected: "heartbeat response", received: format!("{}", s), } @@ -250,7 +255,7 @@ impl Client { } } - pub(crate) fn fetch(&mut self, queues: &[Q]) -> Result, Error> + pub(crate) fn fetch(&mut self, queues: &[Q]) -> Result> where Q: AsRef, { @@ -259,11 +264,11 @@ impl Client { } impl<'a, S: Read + Write> ReadToken<'a, S> { - pub(crate) fn await_ok(self) -> Result<(), Error> { + pub(crate) fn await_ok(self) -> Result<()> { single::read_ok(&mut self.0.stream) } - pub(crate) fn read_json(self) -> Result, Error> + pub(crate) fn read_json(self) -> Result> where T: serde::de::DeserializeOwned, { diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index ffb50311..22296f45 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -1,14 +1,16 @@ -use super::Job; -use failure::Error; +use super::{Error, Job}; use std::io::prelude::*; +/// Default Error type, aliased in module for convenience. +type Result = std::result::Result; + pub trait FaktoryCommand { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error>; + fn issue(&self, w: &mut dyn Write) -> Result<()>; } /// Write queues as part of a command. They are written with a leading space /// followed by space separated queue names. -fn write_queues(w: &mut dyn Write, queues: &[S]) -> Result<(), serde_json::Error> +fn write_queues(w: &mut dyn Write, queues: &[S]) -> std::result::Result<(), serde_json::Error> where W: Write, S: AsRef, @@ -27,7 +29,7 @@ where pub struct Info; impl FaktoryCommand for Info { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { Ok(w.write_all(b"INFO\r\n").map_err(serde_json::Error::io)?) } } @@ -41,7 +43,7 @@ pub struct Ack { } impl FaktoryCommand for Ack { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { w.write_all(b"ACK ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -64,7 +66,7 @@ pub struct Heartbeat { } impl FaktoryCommand for Heartbeat { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { w.write_all(b"BEAT ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -91,7 +93,7 @@ pub struct Fail { } impl FaktoryCommand for Fail { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { w.write_all(b"FAIL ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -122,7 +124,7 @@ impl Fail { pub struct End; impl FaktoryCommand for End { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { Ok(w.write_all(b"END\r\n").map_err(serde_json::Error::io)?) } } @@ -140,7 +142,7 @@ impl<'a, S> FaktoryCommand for Fetch<'a, S> where S: AsRef, { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { if self.queues.is_empty() { w.write_all(b"FETCH\r\n").map_err(serde_json::Error::io)?; } else { @@ -211,7 +213,7 @@ impl Hello { } impl FaktoryCommand for Hello { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { w.write_all(b"HELLO ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -237,7 +239,7 @@ impl From for Push { } impl FaktoryCommand for Push { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { w.write_all(b"PUSH ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, &**self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -260,7 +262,7 @@ where } impl> FaktoryCommand for QueueControl<'_, S> { - fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { + fn issue(&self, w: &mut dyn Write) -> Result<()> { let command = match self.action { QueueAction::Pause => b"QUEUE PAUSE".as_ref(), QueueAction::Resume => b"QUEUE RESUME".as_ref(), diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index f9847ae2..c4ecbcfd 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -1,11 +1,12 @@ use chrono::{DateTime, Utc}; -use failure::Error; use std::collections::HashMap; use std::io::prelude::*; mod cmd; mod resp; +use crate::error::Error; + pub use self::cmd::*; pub use self::resp::*; @@ -71,7 +72,7 @@ pub struct Job { /// /// This field is read-only. #[serde(skip_serializing)] - pub(crate) failure: Option, + pub failure: Option, /// Extra context to include with the job. /// diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index b274d9aa..cc29b329 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,8 +1,9 @@ -use crate::FaktoryError; -use failure::Error; +use crate::Protocol; use std::io::prelude::*; -fn bad(expected: &'static str, got: &RawResponse) -> FaktoryError { +use super::Error; + +fn bad(expected: &'static str, got: &RawResponse) -> Protocol { let stringy = match *got { RawResponse::String(ref s) => Some(&**s), RawResponse::Blob(ref b) => { @@ -16,11 +17,11 @@ fn bad(expected: &'static str, got: &RawResponse) -> FaktoryError { }; match stringy { - Some(s) => FaktoryError::BadType { + Some(s) => Protocol::BadType { expected, received: s.to_string(), }, - None => FaktoryError::BadType { + None => Protocol::BadType { expected, received: format!("{:?}", got), }, @@ -130,7 +131,7 @@ fn read(mut r: R) -> Result { let l = s.len() - 2; s.truncate(l); - Err(FaktoryError::new(s).into()) + Err(Protocol::new(s).into()) } b':' => { // Integer @@ -144,7 +145,7 @@ fn read(mut r: R) -> Result { match (&*s).parse::() { Ok(i) => Ok(RawResponse::Number(i)), - Err(_) => Err(FaktoryError::BadResponse { + Err(_) => Err(Protocol::BadResponse { typed_as: "integer", error: "invalid integer value", bytes: s.into_bytes(), @@ -158,14 +159,14 @@ fn read(mut r: R) -> Result { let mut bytes = Vec::with_capacity(32); r.read_until(b'\n', &mut bytes)?; let s = std::str::from_utf8(&bytes[0..bytes.len() - 2]).map_err(|_| { - FaktoryError::BadResponse { + Protocol::BadResponse { typed_as: "bulk string", error: "server bulk response contains non-utf8 size prefix", bytes: bytes[0..bytes.len() - 2].to_vec(), } })?; - let size = s.parse::().map_err(|_| FaktoryError::BadResponse { + let size = s.parse::().map_err(|_| Protocol::BadResponse { typed_as: "bulk string", error: "server bulk response size prefix is not an integer", bytes: s.as_bytes().to_vec(), @@ -191,7 +192,7 @@ fn read(mut r: R) -> Result { // so we'll just give up unimplemented!(); } - c => Err(FaktoryError::BadResponse { + c => Err(Protocol::BadResponse { typed_as: "unknown", error: "invalid response type prefix", bytes: vec![c], @@ -222,9 +223,8 @@ impl From> for RawResponse { #[cfg(test)] mod test { - use super::{read, RawResponse}; - use crate::FaktoryError; - use failure::Error; + use super::{read, Error, RawResponse}; + use crate::Protocol; use serde_json::{self, Map, Value}; use std::io::{self, Cursor}; @@ -247,10 +247,9 @@ mod test { #[test] fn it_errors_on_bad_numbers() { let c = Cursor::new(b":x\r\n"); - let r = read(c).unwrap_err(); - if let &FaktoryError::BadResponse { + if let Error::Protocol(Protocol::BadResponse { typed_as, error, .. - } = r.downcast_ref().unwrap() + }) = read(c).unwrap_err() { assert_eq!(typed_as, "integer"); assert_eq!(error, "invalid integer value"); @@ -262,8 +261,7 @@ mod test { #[test] fn it_parses_errors() { let c = Cursor::new(b"-ERR foo\r\n"); - let r = read(c).unwrap_err(); - if let &FaktoryError::Internal { ref msg } = r.downcast_ref().unwrap() { + if let Error::Protocol(Protocol::Internal { ref msg }) = read(c).unwrap_err() { assert_eq!(msg, "foo"); } else { unreachable!(); @@ -286,10 +284,9 @@ mod test { #[test] fn it_errors_on_bad_sizes() { let c = Cursor::new(b"$x\r\n\r\n"); - let r = read(c).unwrap_err(); - if let &FaktoryError::BadResponse { + if let Error::Protocol(Protocol::BadResponse { typed_as, error, .. - } = r.downcast_ref().unwrap() + }) = read(c).unwrap_err() { assert_eq!(typed_as, "bulk string"); assert_eq!(error, "server bulk response size prefix is not an integer"); @@ -356,25 +353,30 @@ mod test { #[test] fn it_errors_on_bad_json_blob() { let c = Cursor::new(b"$9\r\n{\"hello\"}\r\n"); - let r = read_json(c).unwrap_err(); - let _: &serde_json::Error = r.downcast_ref().unwrap(); + if let Error::DeserializePayload(err) = read_json(c).unwrap_err() { + let _: serde_json::Error = err; + } else { + unreachable!(); + } } #[test] fn it_errors_on_bad_json_string() { let c = Cursor::new(b"+{\"hello\"}\r\n"); - let r = read_json(c).unwrap_err(); - let _: &serde_json::Error = r.downcast_ref().unwrap(); + if let Error::DeserializePayload(err) = read_json(c).unwrap_err() { + let _: serde_json::Error = err; + } else { + unreachable!(); + } } #[test] fn json_error_on_number() { let c = Cursor::new(b":9\r\n"); - let r = read_json(c).unwrap_err(); - if let &FaktoryError::BadType { + if let Error::Protocol(Protocol::BadType { expected, ref received, - } = r.downcast_ref().unwrap() + }) = read_json(c).unwrap_err() { assert_eq!(expected, "json"); assert_eq!(received, "Number(9)"); @@ -386,10 +388,9 @@ mod test { #[test] fn it_errors_on_unknown_resp_type() { let c = Cursor::new(b"^\r\n"); - let r = read_json(c).unwrap_err(); - if let &FaktoryError::BadResponse { + if let Error::Protocol(Protocol::BadResponse { typed_as, error, .. - } = r.downcast_ref().unwrap() + }) = read_json(c).unwrap_err() { assert_eq!(typed_as, "unknown"); assert_eq!(error, "invalid response type prefix"); diff --git a/tests/consumer.rs b/tests/consumer.rs index de10dce7..ee7c4aea 100644 --- a/tests/consumer.rs +++ b/tests/consumer.rs @@ -1,4 +1,3 @@ -extern crate failure; extern crate faktory; extern crate mockstream; extern crate serde_json; diff --git a/tests/mock/mod.rs b/tests/mock/mod.rs index 8abeeae7..86835b21 100644 --- a/tests/mock/mod.rs +++ b/tests/mock/mod.rs @@ -81,12 +81,12 @@ impl Stream { let mut inner = Inner { take_next: 0, - streams: streams, + streams, }; let mine = inner.take_stream(); Stream { - mine: mine, + mine, all: Arc::new(Mutex::new(inner)), } } diff --git a/tests/producer.rs b/tests/producer.rs index 8160101c..1db4c82d 100644 --- a/tests/producer.rs +++ b/tests/producer.rs @@ -1,4 +1,3 @@ -extern crate failure; extern crate faktory; extern crate mockstream; extern crate serde_json; @@ -93,7 +92,7 @@ fn queue_control() { s.ignore(0); s.ok(0); - p.queue_pause(&vec!["test", "test2"]).unwrap(); + p.queue_pause(&["test", "test2"]).unwrap(); s.ok(0); p.queue_resume(&["test3".to_string(), "test4".to_string()]) diff --git a/tests/real.rs b/tests/real.rs index 3d503418..c60c2c04 100644 --- a/tests/real.rs +++ b/tests/real.rs @@ -1,4 +1,3 @@ -extern crate failure; extern crate faktory; extern crate serde_json; extern crate url; From 782d7348dde0dcaf57e72328407f79d3d64674dc Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Mon, 18 Apr 2022 18:41:39 -0700 Subject: [PATCH 02/20] Minor cleanup --- src/consumer/mod.rs | 3 +-- src/producer/mod.rs | 15 +++++---------- src/proto/mod.rs | 3 +-- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 30138fe8..93aa087a 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -251,8 +251,7 @@ impl Consumer { impl Consumer { fn reconnect(&mut self) -> Result<(), Error> { - self.c.reconnect()?; - Ok(()) + self.c.reconnect() } } diff --git a/src/producer/mod.rs b/src/producer/mod.rs index d5ca646c..2ebc5074 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -105,36 +105,31 @@ impl Producer { /// /// Returns `Ok` if the job was successfully queued by the Faktory server. pub fn enqueue(&mut self, job: Job) -> Result<()> { - self.c.issue(&Push::from(job))?.await_ok()?; - Ok(()) + self.c.issue(&Push::from(job))?.await_ok() } /// Retrieve information about the running server. /// /// The returned value is the result of running the `INFO` command on the server. pub fn info(&mut self) -> Result { - let v = self - .c + self.c .issue(&Info)? .read_json() - .map(|v| v.expect("info command cannot give empty response"))?; - Ok(v) + .map(|v| v.expect("info command cannot give empty response")) } /// Pause the given queues. pub fn queue_pause>(&mut self, queues: &[T]) -> Result<()> { self.c .issue(&QueueControl::new(QueueAction::Pause, queues))? - .await_ok()?; - Ok(()) + .await_ok() } /// Resume the given queues. pub fn queue_resume>(&mut self, queues: &[T]) -> Result<()> { self.c .issue(&QueueControl::new(QueueAction::Resume, queues))? - .await_ok()?; - Ok(()) + .await_ok() } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 1892669a..2ac245d5 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -203,8 +203,7 @@ impl Client { } } - single::write_command_and_await_ok(&mut self.stream, &hello)?; - Ok(()) + single::write_command_and_await_ok(&mut self.stream, &hello) } } From 610002a5f249cd5f061b9b40b65a07c505991ac2 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Mon, 18 Apr 2022 18:51:40 -0700 Subject: [PATCH 03/20] Fix `native-tls` oversight. --- src/error.rs | 5 +++++ src/tls.rs | 5 ++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/error.rs b/src/error.rs index 9820e989..ac5f30eb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,6 +25,11 @@ pub enum Error { /// These generally indicate a mismatch between what the client expects and what the server provided. #[error("deserialize payload: {0}")] DeserializePayload(#[from] serde_json::Error), + + /// Indicates an error in the underlying TLS stream. + #[cfg(feature = "tls")] + #[error("underlying tls stream: {0}")] + TlsStream(#[from] native_tls::Error), } /// The set of observable application-level errors when interacting with a Faktory server. diff --git a/src/tls.rs b/src/tls.rs index b5a588ae..0408d4fd 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -1,5 +1,5 @@ use crate::proto::{self, Reconnect}; -use failure::Error; +use crate::Error; use native_tls::TlsConnector; use native_tls::TlsStream as NativeTlsStream; use std::io; @@ -70,14 +70,13 @@ where /// Create a new TLS connection on an existing stream with a non-default TLS configuration. pub fn new(stream: S, tls: TlsConnector, hostname: &str) -> io::Result { let stream = tls - .clone() .connect(hostname, stream) .map_err(|e| io::Error::new(io::ErrorKind::ConnectionAborted, e))?; Ok(TlsStream { connector: tls, hostname: hostname.to_string(), - stream: stream, + stream, }) } } From f8d8a921ec4985930c257b096f28491430fc1ccc Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Mon, 18 Apr 2022 18:53:55 -0700 Subject: [PATCH 04/20] Oh, and make faktory::Error nonexhaustive --- src/error.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/error.rs b/src/error.rs index ac5f30eb..3230853b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -30,6 +30,14 @@ pub enum Error { #[cfg(feature = "tls")] #[error("underlying tls stream: {0}")] TlsStream(#[from] native_tls::Error), + + // We're going to add more error types in the future + // https://github.com/rust-lang/rust/issues/44109 + // + // This forces users to write pattern matches with a catch-all `_` arm. + #[error("unreachable")] + #[doc(hidden)] + __Nonexhaustive, } /// The set of observable application-level errors when interacting with a Faktory server. From 4673dbec9a9a4fb0c3d76d81ddd5a8bfba195fa0 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Mon, 18 Apr 2022 18:56:24 -0700 Subject: [PATCH 05/20] `Protocol` -> `ProtocolError` This makes imports for clients using this library more clear. --- src/error.rs | 16 ++++++++-------- src/lib.rs | 2 +- src/proto/mod.rs | 4 ++-- src/proto/single/resp.rs | 30 +++++++++++++++--------------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/error.rs b/src/error.rs index 3230853b..da7c0add 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,7 +18,7 @@ pub enum Error { /// Application-level errors. /// These generally indicate a mismatch between what the client expects and what the server expects. #[error("protocol: {0}")] - Protocol(#[from] Protocol), + Protocol(#[from] ProtocolError), /// Faktory payloads are JSON encoded. /// This error is one that was encountered when attempting to deserialize a response from the server. @@ -43,7 +43,7 @@ pub enum Error { /// The set of observable application-level errors when interacting with a Faktory server. #[derive(Debug, Error)] #[allow(clippy::manual_non_exhaustive)] -pub enum Protocol { +pub enum ProtocolError { /// The server reports that an issued request was malformed. #[error("request was malformed: {desc}")] Malformed { @@ -90,25 +90,25 @@ pub enum Protocol { __Nonexhaustive, } -impl Protocol { +impl ProtocolError { pub(crate) fn new(line: String) -> Self { let mut parts = line.splitn(2, ' '); let code = parts.next(); let error = parts.next(); if error.is_none() { - return Protocol::Internal { + return ProtocolError::Internal { msg: code.unwrap().to_string(), }; } let error = error.unwrap().to_string(); match code { - Some("ERR") => Protocol::Internal { msg: error }, - Some("MALFORMED") => Protocol::Malformed { desc: error }, - Some(c) => Protocol::Internal { + Some("ERR") => ProtocolError::Internal { msg: error }, + Some("MALFORMED") => ProtocolError::Malformed { desc: error }, + Some(c) => ProtocolError::Internal { msg: format!("{} {}", c, error), }, - None => Protocol::Internal { + None => ProtocolError::Internal { msg: "empty error response".to_string(), }, } diff --git a/src/lib.rs b/src/lib.rs index 31f4bc68..85ade1b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,7 +71,7 @@ mod tls; pub use tls::TlsStream; pub use crate::consumer::{Consumer, ConsumerBuilder}; -pub use crate::error::{Error, Protocol}; +pub use crate::error::{Error, ProtocolError}; pub use crate::producer::Producer; pub use crate::proto::Job; pub use crate::proto::Reconnect; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 2ac245d5..a54dd838 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,5 +1,5 @@ use crate::error::Error; -use crate::Protocol; +use crate::ProtocolError; use bufstream::BufStream; use libc::getpid; use std::io; @@ -245,7 +245,7 @@ impl Client { { Some("terminate") => Ok(HeartbeatStatus::Terminate), Some("quiet") => Ok(HeartbeatStatus::Quiet), - _ => Err(Protocol::BadType { + _ => Err(ProtocolError::BadType { expected: "heartbeat response", received: format!("{}", s), } diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index cc29b329..2e746f3d 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,9 +1,9 @@ -use crate::Protocol; +use crate::ProtocolError; use std::io::prelude::*; use super::Error; -fn bad(expected: &'static str, got: &RawResponse) -> Protocol { +fn bad(expected: &'static str, got: &RawResponse) -> ProtocolError { let stringy = match *got { RawResponse::String(ref s) => Some(&**s), RawResponse::Blob(ref b) => { @@ -17,11 +17,11 @@ fn bad(expected: &'static str, got: &RawResponse) -> Protocol { }; match stringy { - Some(s) => Protocol::BadType { + Some(s) => ProtocolError::BadType { expected, received: s.to_string(), }, - None => Protocol::BadType { + None => ProtocolError::BadType { expected, received: format!("{:?}", got), }, @@ -131,7 +131,7 @@ fn read(mut r: R) -> Result { let l = s.len() - 2; s.truncate(l); - Err(Protocol::new(s).into()) + Err(ProtocolError::new(s).into()) } b':' => { // Integer @@ -145,7 +145,7 @@ fn read(mut r: R) -> Result { match (&*s).parse::() { Ok(i) => Ok(RawResponse::Number(i)), - Err(_) => Err(Protocol::BadResponse { + Err(_) => Err(ProtocolError::BadResponse { typed_as: "integer", error: "invalid integer value", bytes: s.into_bytes(), @@ -159,14 +159,14 @@ fn read(mut r: R) -> Result { let mut bytes = Vec::with_capacity(32); r.read_until(b'\n', &mut bytes)?; let s = std::str::from_utf8(&bytes[0..bytes.len() - 2]).map_err(|_| { - Protocol::BadResponse { + ProtocolError::BadResponse { typed_as: "bulk string", error: "server bulk response contains non-utf8 size prefix", bytes: bytes[0..bytes.len() - 2].to_vec(), } })?; - let size = s.parse::().map_err(|_| Protocol::BadResponse { + let size = s.parse::().map_err(|_| ProtocolError::BadResponse { typed_as: "bulk string", error: "server bulk response size prefix is not an integer", bytes: s.as_bytes().to_vec(), @@ -192,7 +192,7 @@ fn read(mut r: R) -> Result { // so we'll just give up unimplemented!(); } - c => Err(Protocol::BadResponse { + c => Err(ProtocolError::BadResponse { typed_as: "unknown", error: "invalid response type prefix", bytes: vec![c], @@ -224,7 +224,7 @@ impl From> for RawResponse { #[cfg(test)] mod test { use super::{read, Error, RawResponse}; - use crate::Protocol; + use crate::ProtocolError; use serde_json::{self, Map, Value}; use std::io::{self, Cursor}; @@ -247,7 +247,7 @@ mod test { #[test] fn it_errors_on_bad_numbers() { let c = Cursor::new(b":x\r\n"); - if let Error::Protocol(Protocol::BadResponse { + if let Error::Protocol(ProtocolError::BadResponse { typed_as, error, .. }) = read(c).unwrap_err() { @@ -261,7 +261,7 @@ mod test { #[test] fn it_parses_errors() { let c = Cursor::new(b"-ERR foo\r\n"); - if let Error::Protocol(Protocol::Internal { ref msg }) = read(c).unwrap_err() { + if let Error::Protocol(ProtocolError::Internal { ref msg }) = read(c).unwrap_err() { assert_eq!(msg, "foo"); } else { unreachable!(); @@ -284,7 +284,7 @@ mod test { #[test] fn it_errors_on_bad_sizes() { let c = Cursor::new(b"$x\r\n\r\n"); - if let Error::Protocol(Protocol::BadResponse { + if let Error::Protocol(ProtocolError::BadResponse { typed_as, error, .. }) = read(c).unwrap_err() { @@ -373,7 +373,7 @@ mod test { #[test] fn json_error_on_number() { let c = Cursor::new(b":9\r\n"); - if let Error::Protocol(Protocol::BadType { + if let Error::Protocol(ProtocolError::BadType { expected, ref received, }) = read_json(c).unwrap_err() @@ -388,7 +388,7 @@ mod test { #[test] fn it_errors_on_unknown_resp_type() { let c = Cursor::new(b"^\r\n"); - if let Error::Protocol(Protocol::BadResponse { + if let Error::Protocol(ProtocolError::BadResponse { typed_as, error, .. }) = read_json(c).unwrap_err() { From 0752135d4ef14d1d402d6cb22e8f2ee8e1f17c24 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 24 Apr 2022 11:24:14 -0700 Subject: [PATCH 06/20] PR comments --- src/consumer/mod.rs | 2 +- src/error.rs | 103 ++++++++++++++++++++++++++------------- src/lib.rs | 4 +- src/producer/mod.rs | 20 ++++---- src/proto/mod.rs | 55 +++++++-------------- src/proto/single/cmd.rs | 25 +++++----- src/proto/single/mod.rs | 2 +- src/proto/single/resp.rs | 33 ++++++------- 8 files changed, 126 insertions(+), 118 deletions(-) diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 93aa087a..fb465cd5 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -397,7 +397,7 @@ where // the resulting OK that failed. in that case, we would get an error response // when re-sending the job response. this should not count as critical. other // errors, however, should! - if let Error::GenericIO(_) = e { + if let Error::IO(_) = e { last_job_result.swap(res, atomic::Ordering::SeqCst); return Err(e); } diff --git a/src/error.rs b/src/error.rs index da7c0add..727251b5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,49 +1,90 @@ -use thiserror::Error; +//! Enumerates all errors that this crate may return. +//! +//! [Error] is the top level error enum. +//! Most consumers should only need to interact with this type. +//! This is also where more generic errors such as IO errors are placed, +//! whereas the more specific errors ([Connection] and [Protocol]) are +//! related to logic. +//! +//! [Connect] describes errors specific to the connection logic, for example +//! version mismatches or an invalid URL. +//! +//! [Protocol] describes lower-level errors relating to communication +//! with the faktory server. Typically, [Protocol] errors are the result +//! of the server sending a response this client did not expect. -use crate::proto::ConnectError; +use thiserror::Error; /// The set of observable errors when interacting with a Faktory server. #[derive(Debug, Error)] -#[allow(clippy::manual_non_exhaustive)] +#[non_exhaustive] pub enum Error { /// The connection to the server, or one of its prerequisites, failed. - #[error("connection error: {0}")] - Connect(#[from] ConnectError), + #[error("connection")] + Connect(#[from] Connect), - /// Underlying io layer errors. + /// Underlying IO layer errors. + /// /// These are overwhelmingly network communication errors on the socket connection to the server. - #[error("underlying i/o: {0}")] - GenericIO(#[from] std::io::Error), + #[error("underlying IO")] + IO(#[from] std::io::Error), /// Application-level errors. + /// /// These generally indicate a mismatch between what the client expects and what the server expects. - #[error("protocol: {0}")] - Protocol(#[from] ProtocolError), + #[error("protocol")] + Protocol(#[from] Protocol), /// Faktory payloads are JSON encoded. + /// /// This error is one that was encountered when attempting to deserialize a response from the server. /// These generally indicate a mismatch between what the client expects and what the server provided. - #[error("deserialize payload: {0}")] + #[error("deserialize payload")] DeserializePayload(#[from] serde_json::Error), /// Indicates an error in the underlying TLS stream. #[cfg(feature = "tls")] - #[error("underlying tls stream: {0}")] + #[error("underlying tls stream")] TlsStream(#[from] native_tls::Error), +} + +/// Errors specific to connection logic. +#[derive(Debug, Error)] +pub enum Connect { + /// The scheme portion of the connection address provided is invalid. + #[error("unknown scheme: {scheme}")] + BadScheme { + /// The scheme that was provided in the connection address. + scheme: String, + }, - // We're going to add more error types in the future - // https://github.com/rust-lang/rust/issues/44109 - // - // This forces users to write pattern matches with a catch-all `_` arm. - #[error("unreachable")] - #[doc(hidden)] - __Nonexhaustive, + /// The provided connection address does not contain a hostname. + #[error("no hostname given")] + MissingHostname, + + /// The server requires authentication, but none was provided. + #[error("server requires authentication")] + AuthenticationNeeded, + + /// The server expects a different protocol version than this library supports. + #[error("server version mismatch (theirs: {theirs}, ours: {ours})")] + VersionMismatch { + /// The protocol version this library supports. + ours: usize, + + /// The protocol version the server expects. + theirs: usize, + }, + + /// The connection address provided was not able to be parsed. + #[error("parse URL")] + ParseUrl(#[source] url::ParseError), } /// The set of observable application-level errors when interacting with a Faktory server. #[derive(Debug, Error)] -#[allow(clippy::manual_non_exhaustive)] -pub enum ProtocolError { +#[non_exhaustive] +pub enum Protocol { /// The server reports that an issued request was malformed. #[error("request was malformed: {desc}")] Malformed { @@ -80,35 +121,27 @@ pub enum ProtocolError { /// The relevant bytes sent by the server. bytes: Vec, }, - - // We're going to add more error types in the future - // https://github.com/rust-lang/rust/issues/44109 - // - // This forces users to write pattern matches with a catch-all `_` arm. - #[error("unreachable")] - #[doc(hidden)] - __Nonexhaustive, } -impl ProtocolError { +impl Protocol { pub(crate) fn new(line: String) -> Self { let mut parts = line.splitn(2, ' '); let code = parts.next(); let error = parts.next(); if error.is_none() { - return ProtocolError::Internal { + return Protocol::Internal { msg: code.unwrap().to_string(), }; } let error = error.unwrap().to_string(); match code { - Some("ERR") => ProtocolError::Internal { msg: error }, - Some("MALFORMED") => ProtocolError::Malformed { desc: error }, - Some(c) => ProtocolError::Internal { + Some("ERR") => Protocol::Internal { msg: error }, + Some("MALFORMED") => Protocol::Malformed { desc: error }, + Some(c) => Protocol::Internal { msg: format!("{} {}", c, error), }, - None => ProtocolError::Internal { + None => Protocol::Internal { msg: "empty error response".to_string(), }, } diff --git a/src/lib.rs b/src/lib.rs index 85ade1b8..922725a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,7 @@ extern crate serde_derive; mod consumer; -mod error; +pub mod error; mod producer; mod proto; @@ -71,7 +71,7 @@ mod tls; pub use tls::TlsStream; pub use crate::consumer::{Consumer, ConsumerBuilder}; -pub use crate::error::{Error, ProtocolError}; +pub use crate::error::Error; pub use crate::producer::Producer; pub use crate::proto::Job; pub use crate::proto::Reconnect; diff --git a/src/producer/mod.rs b/src/producer/mod.rs index 2ebc5074..25b413bc 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -3,9 +3,6 @@ use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl}; use std::io::prelude::*; use std::net::TcpStream; -/// Default Error type, aliased in module for convenience. -type Result = std::result::Result; - /// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers. /// /// # Connecting to Faktory @@ -84,7 +81,7 @@ impl Producer { /// ``` /// /// If `url` is given, but does not specify a port, it defaults to 7419. - pub fn connect(url: Option<&str>) -> Result { + pub fn connect(url: Option<&str>) -> Result { let url = match url { Some(url) => proto::url_parse(url), None => proto::url_parse(&proto::get_env_url()), @@ -96,22 +93,23 @@ impl Producer { impl Producer { /// Connect to a Faktory server with a non-standard stream. - pub fn connect_with(stream: S, pwd: Option) -> Result> { - let c = Client::new_producer(stream, pwd)?; - Ok(Producer { c }) + pub fn connect_with(stream: S, pwd: Option) -> Result, Error> { + Ok(Producer { + c: Client::new_producer(stream, pwd)?, + }) } /// Enqueue the given job on the Faktory server. /// /// Returns `Ok` if the job was successfully queued by the Faktory server. - pub fn enqueue(&mut self, job: Job) -> Result<()> { + pub fn enqueue(&mut self, job: Job) -> Result<(), Error> { self.c.issue(&Push::from(job))?.await_ok() } /// Retrieve information about the running server. /// /// The returned value is the result of running the `INFO` command on the server. - pub fn info(&mut self) -> Result { + pub fn info(&mut self) -> Result { self.c .issue(&Info)? .read_json() @@ -119,14 +117,14 @@ impl Producer { } /// Pause the given queues. - pub fn queue_pause>(&mut self, queues: &[T]) -> Result<()> { + pub fn queue_pause>(&mut self, queues: &[T]) -> Result<(), Error> { self.c .issue(&QueueControl::new(QueueAction::Pause, queues))? .await_ok() } /// Resume the given queues. - pub fn queue_resume>(&mut self, queues: &[T]) -> Result<()> { + pub fn queue_resume>(&mut self, queues: &[T]) -> Result<(), Error> { self.c .issue(&QueueControl::new(QueueAction::Resume, queues))? .await_ok() diff --git a/src/proto/mod.rs b/src/proto/mod.rs index a54dd838..df6cec5b 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,11 +1,9 @@ -use crate::error::Error; -use crate::ProtocolError; +use crate::error::{Connect, Error, Protocol}; use bufstream::BufStream; use libc::getpid; use std::io; use std::io::prelude::*; use std::net::TcpStream; -use thiserror::Error; use url::Url; pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2; @@ -18,23 +16,6 @@ pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, Queue // responses that users can see pub use self::single::Hi; -#[derive(Debug, Error)] -pub enum ConnectError { - #[error("unknown scheme: {scheme}")] - BadScheme { scheme: String }, - #[error("no hostname given")] - MissingHostname, - #[error("server requires authentication")] - AuthenticationNeeded, - #[error("server version mismatch (theirs: {theirs}, ours: {ours})")] - VersionMismatch { ours: usize, theirs: usize }, - #[error("parse URL: {0}")] - ParseUrl(#[source] url::ParseError), -} - -/// Default Error type, aliased in module for convenience. -type Result = std::result::Result; - pub(crate) fn get_env_url() -> String { use std::env; let var = env::var("FAKTORY_PROVIDER").unwrap_or_else(|_| "FAKTORY_URL".to_string()); @@ -45,17 +26,17 @@ pub(crate) fn host_from_url(url: &Url) -> String { format!("{}:{}", url.host_str().unwrap(), url.port().unwrap_or(7419)) } -pub(crate) fn url_parse(url: &str) -> Result { - let url = Url::parse(url).map_err(ConnectError::ParseUrl)?; +pub(crate) fn url_parse(url: &str) -> Result { + let url = Url::parse(url).map_err(Connect::ParseUrl)?; if url.scheme() != "tcp" { - return Err(ConnectError::BadScheme { + return Err(Connect::BadScheme { scheme: url.scheme().to_string(), } .into()); } if url.host_str().is_none() || url.host_str().unwrap().is_empty() { - return Err(ConnectError::MissingHostname.into()); + return Err(Connect::MissingHostname.into()); } Ok(url) @@ -120,12 +101,12 @@ impl Client where S: Read + Write + Reconnect, { - pub(crate) fn connect_again(&self) -> Result { + pub(crate) fn connect_again(&self) -> Result { let s = self.stream.get_ref().reconnect()?; Client::new(s, self.opts.clone()) } - pub fn reconnect(&mut self) -> Result<()> { + pub fn reconnect(&mut self) -> Result<(), Error> { let s = self.stream.get_ref().reconnect()?; self.stream = BufStream::new(s); self.init() @@ -133,7 +114,7 @@ where } impl Client { - pub(crate) fn new(stream: S, opts: ClientOptions) -> Result> { + pub(crate) fn new(stream: S, opts: ClientOptions) -> Result, Error> { let mut c = Client { stream: BufStream::new(stream), opts, @@ -142,7 +123,7 @@ impl Client { Ok(c) } - pub(crate) fn new_producer(stream: S, pwd: Option) -> Result> { + pub(crate) fn new_producer(stream: S, pwd: Option) -> Result, Error> { let opts = ClientOptions { password: pwd, is_producer: true, @@ -153,11 +134,11 @@ impl Client { } impl Client { - fn init(&mut self) -> Result<()> { + fn init(&mut self) -> Result<(), Error> { let hi = single::read_hi(&mut self.stream)?; if hi.version != EXPECTED_PROTOCOL_VERSION { - return Err(ConnectError::VersionMismatch { + return Err(Connect::VersionMismatch { ours: EXPECTED_PROTOCOL_VERSION, theirs: hi.version, } @@ -199,7 +180,7 @@ impl Client { if let Some(ref pwd) = self.opts.password { hello.set_password(&hi, pwd); } else { - return Err(ConnectError::AuthenticationNeeded.into()); + return Err(Connect::AuthenticationNeeded.into()); } } @@ -225,12 +206,12 @@ impl Client { pub(crate) fn issue( &mut self, c: &FC, - ) -> Result> { + ) -> Result, Error> { single::write_command(&mut self.stream, c)?; Ok(ReadToken(self)) } - pub(crate) fn heartbeat(&mut self) -> Result { + pub(crate) fn heartbeat(&mut self) -> Result { single::write_command( &mut self.stream, &Heartbeat::new(&**self.opts.wid.as_ref().unwrap()), @@ -245,7 +226,7 @@ impl Client { { Some("terminate") => Ok(HeartbeatStatus::Terminate), Some("quiet") => Ok(HeartbeatStatus::Quiet), - _ => Err(ProtocolError::BadType { + _ => Err(Protocol::BadType { expected: "heartbeat response", received: format!("{}", s), } @@ -254,7 +235,7 @@ impl Client { } } - pub(crate) fn fetch(&mut self, queues: &[Q]) -> Result> + pub(crate) fn fetch(&mut self, queues: &[Q]) -> Result, Error> where Q: AsRef, { @@ -263,11 +244,11 @@ impl Client { } impl<'a, S: Read + Write> ReadToken<'a, S> { - pub(crate) fn await_ok(self) -> Result<()> { + pub(crate) fn await_ok(self) -> Result<(), Error> { single::read_ok(&mut self.0.stream) } - pub(crate) fn read_json(self) -> Result> + pub(crate) fn read_json(self) -> Result, Error> where T: serde::de::DeserializeOwned, { diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 22296f45..27e69efc 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -1,16 +1,13 @@ use super::{Error, Job}; use std::io::prelude::*; -/// Default Error type, aliased in module for convenience. -type Result = std::result::Result; - pub trait FaktoryCommand { - fn issue(&self, w: &mut dyn Write) -> Result<()>; + fn issue(&self, w: &mut dyn Write) -> Result<(), Error>; } /// Write queues as part of a command. They are written with a leading space /// followed by space separated queue names. -fn write_queues(w: &mut dyn Write, queues: &[S]) -> std::result::Result<(), serde_json::Error> +fn write_queues(w: &mut dyn Write, queues: &[S]) -> Result<(), serde_json::Error> where W: Write, S: AsRef, @@ -29,7 +26,7 @@ where pub struct Info; impl FaktoryCommand for Info { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { Ok(w.write_all(b"INFO\r\n").map_err(serde_json::Error::io)?) } } @@ -43,7 +40,7 @@ pub struct Ack { } impl FaktoryCommand for Ack { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"ACK ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -66,7 +63,7 @@ pub struct Heartbeat { } impl FaktoryCommand for Heartbeat { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"BEAT ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -93,7 +90,7 @@ pub struct Fail { } impl FaktoryCommand for Fail { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"FAIL ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -124,7 +121,7 @@ impl Fail { pub struct End; impl FaktoryCommand for End { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { Ok(w.write_all(b"END\r\n").map_err(serde_json::Error::io)?) } } @@ -142,7 +139,7 @@ impl<'a, S> FaktoryCommand for Fetch<'a, S> where S: AsRef, { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { if self.queues.is_empty() { w.write_all(b"FETCH\r\n").map_err(serde_json::Error::io)?; } else { @@ -213,7 +210,7 @@ impl Hello { } impl FaktoryCommand for Hello { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"HELLO ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -239,7 +236,7 @@ impl From for Push { } impl FaktoryCommand for Push { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"PUSH ").map_err(serde_json::Error::io)?; serde_json::to_writer(&mut *w, &**self)?; Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) @@ -262,7 +259,7 @@ where } impl> FaktoryCommand for QueueControl<'_, S> { - fn issue(&self, w: &mut dyn Write) -> Result<()> { + fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { let command = match self.action { QueueAction::Pause => b"QUEUE PAUSE".as_ref(), QueueAction::Resume => b"QUEUE RESUME".as_ref(), diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index c4ecbcfd..a869d598 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -72,7 +72,7 @@ pub struct Job { /// /// This field is read-only. #[serde(skip_serializing)] - pub failure: Option, + failure: Option, /// Extra context to include with the job. /// diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 2e746f3d..e626ac16 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,9 +1,8 @@ -use crate::ProtocolError; -use std::io::prelude::*; - use super::Error; +use crate::error::Protocol; +use std::io::prelude::*; -fn bad(expected: &'static str, got: &RawResponse) -> ProtocolError { +fn bad(expected: &'static str, got: &RawResponse) -> Protocol { let stringy = match *got { RawResponse::String(ref s) => Some(&**s), RawResponse::Blob(ref b) => { @@ -17,11 +16,11 @@ fn bad(expected: &'static str, got: &RawResponse) -> ProtocolError { }; match stringy { - Some(s) => ProtocolError::BadType { + Some(s) => Protocol::BadType { expected, received: s.to_string(), }, - None => ProtocolError::BadType { + None => Protocol::BadType { expected, received: format!("{:?}", got), }, @@ -131,7 +130,7 @@ fn read(mut r: R) -> Result { let l = s.len() - 2; s.truncate(l); - Err(ProtocolError::new(s).into()) + Err(Protocol::new(s).into()) } b':' => { // Integer @@ -145,7 +144,7 @@ fn read(mut r: R) -> Result { match (&*s).parse::() { Ok(i) => Ok(RawResponse::Number(i)), - Err(_) => Err(ProtocolError::BadResponse { + Err(_) => Err(Protocol::BadResponse { typed_as: "integer", error: "invalid integer value", bytes: s.into_bytes(), @@ -159,14 +158,14 @@ fn read(mut r: R) -> Result { let mut bytes = Vec::with_capacity(32); r.read_until(b'\n', &mut bytes)?; let s = std::str::from_utf8(&bytes[0..bytes.len() - 2]).map_err(|_| { - ProtocolError::BadResponse { + Protocol::BadResponse { typed_as: "bulk string", error: "server bulk response contains non-utf8 size prefix", bytes: bytes[0..bytes.len() - 2].to_vec(), } })?; - let size = s.parse::().map_err(|_| ProtocolError::BadResponse { + let size = s.parse::().map_err(|_| Protocol::BadResponse { typed_as: "bulk string", error: "server bulk response size prefix is not an integer", bytes: s.as_bytes().to_vec(), @@ -192,7 +191,7 @@ fn read(mut r: R) -> Result { // so we'll just give up unimplemented!(); } - c => Err(ProtocolError::BadResponse { + c => Err(Protocol::BadResponse { typed_as: "unknown", error: "invalid response type prefix", bytes: vec![c], @@ -224,7 +223,7 @@ impl From> for RawResponse { #[cfg(test)] mod test { use super::{read, Error, RawResponse}; - use crate::ProtocolError; + use crate::error::Protocol; use serde_json::{self, Map, Value}; use std::io::{self, Cursor}; @@ -247,7 +246,7 @@ mod test { #[test] fn it_errors_on_bad_numbers() { let c = Cursor::new(b":x\r\n"); - if let Error::Protocol(ProtocolError::BadResponse { + if let Error::Protocol(Protocol::BadResponse { typed_as, error, .. }) = read(c).unwrap_err() { @@ -261,7 +260,7 @@ mod test { #[test] fn it_parses_errors() { let c = Cursor::new(b"-ERR foo\r\n"); - if let Error::Protocol(ProtocolError::Internal { ref msg }) = read(c).unwrap_err() { + if let Error::Protocol(Protocol::Internal { ref msg }) = read(c).unwrap_err() { assert_eq!(msg, "foo"); } else { unreachable!(); @@ -284,7 +283,7 @@ mod test { #[test] fn it_errors_on_bad_sizes() { let c = Cursor::new(b"$x\r\n\r\n"); - if let Error::Protocol(ProtocolError::BadResponse { + if let Error::Protocol(Protocol::BadResponse { typed_as, error, .. }) = read(c).unwrap_err() { @@ -373,7 +372,7 @@ mod test { #[test] fn json_error_on_number() { let c = Cursor::new(b":9\r\n"); - if let Error::Protocol(ProtocolError::BadType { + if let Error::Protocol(Protocol::BadType { expected, ref received, }) = read_json(c).unwrap_err() @@ -388,7 +387,7 @@ mod test { #[test] fn it_errors_on_unknown_resp_type() { let c = Cursor::new(b"^\r\n"); - if let Error::Protocol(ProtocolError::BadResponse { + if let Error::Protocol(Protocol::BadResponse { typed_as, error, .. }) = read_json(c).unwrap_err() { From a8501c94a4eaf6afa764f762eb5474991af7fe75 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 24 Apr 2022 11:25:53 -0700 Subject: [PATCH 07/20] Add `#[non_exhaustive]` to `Connect` --- src/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/error.rs b/src/error.rs index 727251b5..d7a2a9bb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -50,6 +50,7 @@ pub enum Error { /// Errors specific to connection logic. #[derive(Debug, Error)] +#[non_exhaustive] pub enum Connect { /// The scheme portion of the connection address provided is invalid. #[error("unknown scheme: {scheme}")] From 34fb9426262674a7ff0001f9d4e8907bbac0d243 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 24 Apr 2022 11:29:31 -0700 Subject: [PATCH 08/20] Revert change to `Job.failure` visibility --- src/proto/single/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index a869d598..4e76ccfa 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -72,7 +72,7 @@ pub struct Job { /// /// This field is read-only. #[serde(skip_serializing)] - failure: Option, + pub(crate) failure: Option, /// Extra context to include with the job. /// From 12d11b1568a61457551ccc6e2bfd6e0f3023c6cf Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 24 Apr 2022 11:31:47 -0700 Subject: [PATCH 09/20] Minor import cleanup --- src/proto/single/resp.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index e626ac16..74fbc065 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,5 +1,4 @@ -use super::Error; -use crate::error::Protocol; +use crate::error::{Error, Protocol}; use std::io::prelude::*; fn bad(expected: &'static str, got: &RawResponse) -> Protocol { @@ -222,8 +221,8 @@ impl From> for RawResponse { #[cfg(test)] mod test { - use super::{read, Error, RawResponse}; - use crate::error::Protocol; + use super::{read, RawResponse}; + use crate::error::{Error, Protocol}; use serde_json::{self, Map, Value}; use std::io::{self, Cursor}; From 65082b9ad86cabb20834873aee966ee6ecb84c14 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Mon, 25 Apr 2022 09:58:00 -0700 Subject: [PATCH 10/20] Make `Job.failure` getter so clippy is happy --- src/proto/single/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 4e76ccfa..9eca8d8e 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -72,7 +72,7 @@ pub struct Job { /// /// This field is read-only. #[serde(skip_serializing)] - pub(crate) failure: Option, + failure: Option, /// Extra context to include with the job. /// @@ -156,6 +156,11 @@ impl Job { pub fn args(&self) -> &[serde_json::Value] { &self.args } + + /// Data about this job's most recent failure. + pub fn failure(&self) -> &Option { + &self.failure + } } pub fn write_command(w: &mut W, command: &C) -> Result<(), Error> { From fcc671f50821c078035d694dea6450de7148c4d4 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 07:38:41 -0700 Subject: [PATCH 11/20] `use crate::error::{self, Error}` --- src/proto/mod.rs | 14 +++++------ src/proto/single/resp.rs | 52 +++++++++++++++++++++------------------- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index df6cec5b..ea7a4f58 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,4 +1,4 @@ -use crate::error::{Connect, Error, Protocol}; +use crate::error::{self, Error}; use bufstream::BufStream; use libc::getpid; use std::io; @@ -27,16 +27,16 @@ pub(crate) fn host_from_url(url: &Url) -> String { } pub(crate) fn url_parse(url: &str) -> Result { - let url = Url::parse(url).map_err(Connect::ParseUrl)?; + let url = Url::parse(url).map_err(error::Connect::ParseUrl)?; if url.scheme() != "tcp" { - return Err(Connect::BadScheme { + return Err(error::Connect::BadScheme { scheme: url.scheme().to_string(), } .into()); } if url.host_str().is_none() || url.host_str().unwrap().is_empty() { - return Err(Connect::MissingHostname.into()); + return Err(error::Connect::MissingHostname.into()); } Ok(url) @@ -138,7 +138,7 @@ impl Client { let hi = single::read_hi(&mut self.stream)?; if hi.version != EXPECTED_PROTOCOL_VERSION { - return Err(Connect::VersionMismatch { + return Err(error::Connect::VersionMismatch { ours: EXPECTED_PROTOCOL_VERSION, theirs: hi.version, } @@ -180,7 +180,7 @@ impl Client { if let Some(ref pwd) = self.opts.password { hello.set_password(&hi, pwd); } else { - return Err(Connect::AuthenticationNeeded.into()); + return Err(error::Connect::AuthenticationNeeded.into()); } } @@ -226,7 +226,7 @@ impl Client { { Some("terminate") => Ok(HeartbeatStatus::Terminate), Some("quiet") => Ok(HeartbeatStatus::Quiet), - _ => Err(Protocol::BadType { + _ => Err(error::Protocol::BadType { expected: "heartbeat response", received: format!("{}", s), } diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 74fbc065..c1a2bb89 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,7 +1,7 @@ -use crate::error::{Error, Protocol}; +use crate::error::{self, Error}; use std::io::prelude::*; -fn bad(expected: &'static str, got: &RawResponse) -> Protocol { +fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol { let stringy = match *got { RawResponse::String(ref s) => Some(&**s), RawResponse::Blob(ref b) => { @@ -15,11 +15,11 @@ fn bad(expected: &'static str, got: &RawResponse) -> Protocol { }; match stringy { - Some(s) => Protocol::BadType { + Some(s) => error::Protocol::BadType { expected, received: s.to_string(), }, - None => Protocol::BadType { + None => error::Protocol::BadType { expected, received: format!("{:?}", got), }, @@ -91,7 +91,7 @@ pub fn read_ok(r: R) -> Result<(), Error> { // ---------------------------------------------- // -// below is the implementation of the Redis RESP protocol +// below is the implementation of the Redis RESP error::protocol // // ---------------------------------------------- @@ -109,7 +109,7 @@ fn read(mut r: R) -> Result { match cmdbuf[0] { b'+' => { // Simple String - // https://redis.io/topics/protocol#resp-simple-strings + // https://redis.io/topics/error::protocol#resp-simple-strings let mut s = String::new(); r.read_line(&mut s)?; @@ -121,7 +121,7 @@ fn read(mut r: R) -> Result { } b'-' => { // Error - // https://redis.io/topics/protocol#resp-errors + // https://redis.io/topics/error::protocol#resp-errors let mut s = String::new(); r.read_line(&mut s)?; @@ -129,11 +129,11 @@ fn read(mut r: R) -> Result { let l = s.len() - 2; s.truncate(l); - Err(Protocol::new(s).into()) + Err(error::Protocol::new(s).into()) } b':' => { // Integer - // https://redis.io/topics/protocol#resp-integers + // https://redis.io/topics/error::protocol#resp-integers let mut s = String::with_capacity(32); r.read_line(&mut s)?; @@ -143,7 +143,7 @@ fn read(mut r: R) -> Result { match (&*s).parse::() { Ok(i) => Ok(RawResponse::Number(i)), - Err(_) => Err(Protocol::BadResponse { + Err(_) => Err(error::Protocol::BadResponse { typed_as: "integer", error: "invalid integer value", bytes: s.into_bytes(), @@ -153,22 +153,24 @@ fn read(mut r: R) -> Result { } b'$' => { // Bulk String - // https://redis.io/topics/protocol#resp-bulk-strings + // https://redis.io/topics/error::protocol#resp-bulk-strings let mut bytes = Vec::with_capacity(32); r.read_until(b'\n', &mut bytes)?; let s = std::str::from_utf8(&bytes[0..bytes.len() - 2]).map_err(|_| { - Protocol::BadResponse { + error::Protocol::BadResponse { typed_as: "bulk string", error: "server bulk response contains non-utf8 size prefix", bytes: bytes[0..bytes.len() - 2].to_vec(), } })?; - let size = s.parse::().map_err(|_| Protocol::BadResponse { - typed_as: "bulk string", - error: "server bulk response size prefix is not an integer", - bytes: s.as_bytes().to_vec(), - })?; + let size = s + .parse::() + .map_err(|_| error::Protocol::BadResponse { + typed_as: "bulk string", + error: "server bulk response size prefix is not an integer", + bytes: s.as_bytes().to_vec(), + })?; if size == -1 { Ok(RawResponse::Null) @@ -182,7 +184,7 @@ fn read(mut r: R) -> Result { } b'*' => { // Arrays - // https://redis.io/topics/protocol#resp-arrays + // https://redis.io/topics/error::protocol#resp-arrays // // not used in faktory. // *and* you can't really skip them unless you parse them. @@ -190,7 +192,7 @@ fn read(mut r: R) -> Result { // so we'll just give up unimplemented!(); } - c => Err(Protocol::BadResponse { + c => Err(error::Protocol::BadResponse { typed_as: "unknown", error: "invalid response type prefix", bytes: vec![c], @@ -222,7 +224,7 @@ impl From> for RawResponse { #[cfg(test)] mod test { use super::{read, RawResponse}; - use crate::error::{Error, Protocol}; + use crate::error::{self, Error}; use serde_json::{self, Map, Value}; use std::io::{self, Cursor}; @@ -245,7 +247,7 @@ mod test { #[test] fn it_errors_on_bad_numbers() { let c = Cursor::new(b":x\r\n"); - if let Error::Protocol(Protocol::BadResponse { + if let Error::Protocol(error::Protocol::BadResponse { typed_as, error, .. }) = read(c).unwrap_err() { @@ -259,7 +261,7 @@ mod test { #[test] fn it_parses_errors() { let c = Cursor::new(b"-ERR foo\r\n"); - if let Error::Protocol(Protocol::Internal { ref msg }) = read(c).unwrap_err() { + if let Error::Protocol(error::Protocol::Internal { ref msg }) = read(c).unwrap_err() { assert_eq!(msg, "foo"); } else { unreachable!(); @@ -282,7 +284,7 @@ mod test { #[test] fn it_errors_on_bad_sizes() { let c = Cursor::new(b"$x\r\n\r\n"); - if let Error::Protocol(Protocol::BadResponse { + if let Error::Protocol(error::Protocol::BadResponse { typed_as, error, .. }) = read(c).unwrap_err() { @@ -371,7 +373,7 @@ mod test { #[test] fn json_error_on_number() { let c = Cursor::new(b":9\r\n"); - if let Error::Protocol(Protocol::BadType { + if let Error::Protocol(error::Protocol::BadType { expected, ref received, }) = read_json(c).unwrap_err() @@ -386,7 +388,7 @@ mod test { #[test] fn it_errors_on_unknown_resp_type() { let c = Cursor::new(b"^\r\n"); - if let Error::Protocol(Protocol::BadResponse { + if let Error::Protocol(error::Protocol::BadResponse { typed_as, error, .. }) = read_json(c).unwrap_err() { From 8e2e0a706f04fee16dbae6cb9082cb21e1a79c03 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 07:40:44 -0700 Subject: [PATCH 12/20] Code format comment --- src/error.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/error.rs b/src/error.rs index d7a2a9bb..85cbeca3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,16 +1,16 @@ //! Enumerates all errors that this crate may return. //! -//! [Error] is the top level error enum. +//! [`Error`] is the top level error enum. //! Most consumers should only need to interact with this type. //! This is also where more generic errors such as IO errors are placed, -//! whereas the more specific errors ([Connection] and [Protocol]) are +//! whereas the more specific errors ([`Connection`] and [`Protocol`]) are //! related to logic. //! -//! [Connect] describes errors specific to the connection logic, for example +//! [`Connect`] describes errors specific to the connection logic, for example //! version mismatches or an invalid URL. //! -//! [Protocol] describes lower-level errors relating to communication -//! with the faktory server. Typically, [Protocol] errors are the result +//! [`Protocol`] describes lower-level errors relating to communication +//! with the faktory server. Typically, [`Protocol`] errors are the result //! of the server sending a response this client did not expect. use thiserror::Error; From d3711c7d92ecf6e8f547ceaa97f3dd01cb51a404 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 07:41:10 -0700 Subject: [PATCH 13/20] `I/O` --- src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 85cbeca3..51cbcb83 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,7 +2,7 @@ //! //! [`Error`] is the top level error enum. //! Most consumers should only need to interact with this type. -//! This is also where more generic errors such as IO errors are placed, +//! This is also where more generic errors such as I/O errors are placed, //! whereas the more specific errors ([`Connection`] and [`Protocol`]) are //! related to logic. //! From b563ed7c5ebdaf14a7bec794ca45f0b50793a2b6 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 07:54:58 -0700 Subject: [PATCH 14/20] `DeserializePayload`: `#[from]` -> `#[source]` --- src/error.rs | 6 ++++- src/proto/single/cmd.rs | 50 +++++++++++++++++++++------------------- src/proto/single/resp.rs | 10 +++++--- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/src/error.rs b/src/error.rs index 51cbcb83..72317720 100644 --- a/src/error.rs +++ b/src/error.rs @@ -40,7 +40,7 @@ pub enum Error { /// This error is one that was encountered when attempting to deserialize a response from the server. /// These generally indicate a mismatch between what the client expects and what the server provided. #[error("deserialize payload")] - DeserializePayload(#[from] serde_json::Error), + DeserializePayload(#[source] serde_json::Error), /// Indicates an error in the underlying TLS stream. #[cfg(feature = "tls")] @@ -148,3 +148,7 @@ impl Protocol { } } } + +pub(crate) fn wrap_serde_io(err: std::io::Error) -> Error { + Error::DeserializePayload(serde_json::Error::io(err)) +} diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 27e69efc..6fddab95 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -1,3 +1,5 @@ +use crate::error; + use super::{Error, Job}; use std::io::prelude::*; @@ -27,7 +29,7 @@ pub struct Info; impl FaktoryCommand for Info { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - Ok(w.write_all(b"INFO\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"INFO\r\n").map_err(error::wrap_serde_io) } } @@ -41,9 +43,9 @@ pub struct Ack { impl FaktoryCommand for Ack { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"ACK ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"ACK ").map_err(error::wrap_serde_io)?; + serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -64,9 +66,9 @@ pub struct Heartbeat { impl FaktoryCommand for Heartbeat { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"BEAT ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"BEAT ").map_err(error::wrap_serde_io)?; + serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -91,9 +93,9 @@ pub struct Fail { impl FaktoryCommand for Fail { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"FAIL ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"FAIL ").map_err(error::wrap_serde_io)?; + serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -122,7 +124,7 @@ pub struct End; impl FaktoryCommand for End { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - Ok(w.write_all(b"END\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"END\r\n").map_err(error::wrap_serde_io) } } @@ -141,11 +143,11 @@ where { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { if self.queues.is_empty() { - w.write_all(b"FETCH\r\n").map_err(serde_json::Error::io)?; + w.write_all(b"FETCH\r\n").map_err(error::wrap_serde_io)?; } else { - w.write_all(b"FETCH").map_err(serde_json::Error::io)?; - write_queues::(w, self.queues)?; - w.write_all(b"\r\n").map_err(serde_json::Error::io)?; + w.write_all(b"FETCH").map_err(error::wrap_serde_io)?; + write_queues::(w, self.queues).map_err(Error::DeserializePayload)?; + w.write_all(b"\r\n").map_err(error::wrap_serde_io)?; } Ok(()) } @@ -211,9 +213,9 @@ impl Hello { impl FaktoryCommand for Hello { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"HELLO ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"HELLO ").map_err(error::wrap_serde_io)?; + serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -237,9 +239,9 @@ impl From for Push { impl FaktoryCommand for Push { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"PUSH ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, &**self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"PUSH ").map_err(error::wrap_serde_io)?; + serde_json::to_writer(&mut *w, &**self).map_err(Error::DeserializePayload)?; + w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -265,9 +267,9 @@ impl> FaktoryCommand for QueueControl<'_, S> { QueueAction::Resume => b"QUEUE RESUME".as_ref(), }; - w.write_all(command).map_err(serde_json::Error::io)?; - write_queues::(w, self.queues)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(command).map_err(error::wrap_serde_io)?; + write_queues::(w, self.queues).map_err(Error::DeserializePayload)?; + w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index c1a2bb89..d958bf57 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -35,7 +35,9 @@ pub fn read_json(r: R) -> Result { - return Ok(serde_json::from_str(s).map(Some)?); + return serde_json::from_str(s) + .map(Some) + .map_err(Error::DeserializePayload); } RawResponse::Blob(ref b) if b == b"OK" => { return Ok(None); @@ -44,7 +46,9 @@ pub fn read_json(r: R) -> Result return Ok(None), _ => {} @@ -69,7 +73,7 @@ pub fn read_hi(r: R) -> Result { let rr = read(r)?; if let RawResponse::String(ref s) = rr { if let Some(s) = s.strip_prefix("HI ") { - return Ok(serde_json::from_str(s)?); + return serde_json::from_str(s).map_err(Error::DeserializePayload); } } From 1a9ff9d4d2f3aa92d818f68e03df327b34f9093c Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 07:56:20 -0700 Subject: [PATCH 15/20] `DeserializePayload` -> `Serialization` --- src/error.rs | 8 ++++---- src/proto/single/cmd.rs | 14 +++++++------- src/proto/single/resp.rs | 10 +++++----- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/error.rs b/src/error.rs index 72317720..1c1254c6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -37,10 +37,10 @@ pub enum Error { /// Faktory payloads are JSON encoded. /// - /// This error is one that was encountered when attempting to deserialize a response from the server. + /// This error is one that was encountered when attempting to serialize or deserialize communication with the server. /// These generally indicate a mismatch between what the client expects and what the server provided. - #[error("deserialize payload")] - DeserializePayload(#[source] serde_json::Error), + #[error("serialization")] + Serialization(#[source] serde_json::Error), /// Indicates an error in the underlying TLS stream. #[cfg(feature = "tls")] @@ -150,5 +150,5 @@ impl Protocol { } pub(crate) fn wrap_serde_io(err: std::io::Error) -> Error { - Error::DeserializePayload(serde_json::Error::io(err)) + Error::Serialization(serde_json::Error::io(err)) } diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 6fddab95..2879de0e 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -44,7 +44,7 @@ pub struct Ack { impl FaktoryCommand for Ack { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"ACK ").map_err(error::wrap_serde_io)?; - serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -67,7 +67,7 @@ pub struct Heartbeat { impl FaktoryCommand for Heartbeat { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"BEAT ").map_err(error::wrap_serde_io)?; - serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -94,7 +94,7 @@ pub struct Fail { impl FaktoryCommand for Fail { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"FAIL ").map_err(error::wrap_serde_io)?; - serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -146,7 +146,7 @@ where w.write_all(b"FETCH\r\n").map_err(error::wrap_serde_io)?; } else { w.write_all(b"FETCH").map_err(error::wrap_serde_io)?; - write_queues::(w, self.queues).map_err(Error::DeserializePayload)?; + write_queues::(w, self.queues).map_err(Error::Serialization)?; w.write_all(b"\r\n").map_err(error::wrap_serde_io)?; } Ok(()) @@ -214,7 +214,7 @@ impl Hello { impl FaktoryCommand for Hello { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"HELLO ").map_err(error::wrap_serde_io)?; - serde_json::to_writer(&mut *w, self).map_err(Error::DeserializePayload)?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -240,7 +240,7 @@ impl From for Push { impl FaktoryCommand for Push { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { w.write_all(b"PUSH ").map_err(error::wrap_serde_io)?; - serde_json::to_writer(&mut *w, &**self).map_err(Error::DeserializePayload)?; + serde_json::to_writer(&mut *w, &**self).map_err(Error::Serialization)?; w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } @@ -268,7 +268,7 @@ impl> FaktoryCommand for QueueControl<'_, S> { }; w.write_all(command).map_err(error::wrap_serde_io)?; - write_queues::(w, self.queues).map_err(Error::DeserializePayload)?; + write_queues::(w, self.queues).map_err(Error::Serialization)?; w.write_all(b"\r\n").map_err(error::wrap_serde_io) } } diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index d958bf57..c5516933 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -37,7 +37,7 @@ pub fn read_json(r: R) -> Result { return serde_json::from_str(s) .map(Some) - .map_err(Error::DeserializePayload); + .map_err(Error::Serialization); } RawResponse::Blob(ref b) if b == b"OK" => { return Ok(None); @@ -48,7 +48,7 @@ pub fn read_json(r: R) -> Result return Ok(None), _ => {} @@ -73,7 +73,7 @@ pub fn read_hi(r: R) -> Result { let rr = read(r)?; if let RawResponse::String(ref s) = rr { if let Some(s) = s.strip_prefix("HI ") { - return serde_json::from_str(s).map_err(Error::DeserializePayload); + return serde_json::from_str(s).map_err(Error::Serialization); } } @@ -357,7 +357,7 @@ mod test { #[test] fn it_errors_on_bad_json_blob() { let c = Cursor::new(b"$9\r\n{\"hello\"}\r\n"); - if let Error::DeserializePayload(err) = read_json(c).unwrap_err() { + if let Error::Serialization(err) = read_json(c).unwrap_err() { let _: serde_json::Error = err; } else { unreachable!(); @@ -367,7 +367,7 @@ mod test { #[test] fn it_errors_on_bad_json_string() { let c = Cursor::new(b"+{\"hello\"}\r\n"); - if let Error::DeserializePayload(err) = read_json(c).unwrap_err() { + if let Error::Serialization(err) = read_json(c).unwrap_err() { let _: serde_json::Error = err; } else { unreachable!(); From 541292ef2594c4d4bdf1843242dd60fd1841916c Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 07:58:16 -0700 Subject: [PATCH 16/20] `TlsStram`: `#[from]` -> `#[source]` --- src/error.rs | 2 +- src/tls.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/error.rs b/src/error.rs index 1c1254c6..93841646 100644 --- a/src/error.rs +++ b/src/error.rs @@ -45,7 +45,7 @@ pub enum Error { /// Indicates an error in the underlying TLS stream. #[cfg(feature = "tls")] #[error("underlying tls stream")] - TlsStream(#[from] native_tls::Error), + TlsStream(#[source] native_tls::Error), } /// Errors specific to connection logic. diff --git a/src/tls.rs b/src/tls.rs index 0408d4fd..f9db6ca4 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -41,7 +41,10 @@ impl TlsStream { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub fn connect(url: Option<&str>) -> Result { - TlsStream::with_connector(TlsConnector::builder().build()?, url) + TlsStream::with_connector( + TlsConnector::builder().build().map_err(Error::TlsStream)?, + url, + ) } /// Create a new TLS connection over TCP using a non-default TLS configuration. From c6f2bbe7d0347ba53fb9ff1818490e22d601c6f7 Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 08:02:28 -0700 Subject: [PATCH 17/20] Missed `IO` -> `I/O` --- src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/error.rs b/src/error.rs index 93841646..e98d73e2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,10 +23,10 @@ pub enum Error { #[error("connection")] Connect(#[from] Connect), - /// Underlying IO layer errors. + /// Underlying I/O layer errors. /// /// These are overwhelmingly network communication errors on the socket connection to the server. - #[error("underlying IO")] + #[error("underlying I/O")] IO(#[from] std::io::Error), /// Application-level errors. From f3869a2c8882536382a08494f1ea6a09434c356f Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Sun, 1 May 2022 08:11:39 -0700 Subject: [PATCH 18/20] Self-review cleanup --- src/proto/single/cmd.rs | 6 ++++-- src/proto/single/resp.rs | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 2879de0e..a4ecf11d 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -1,6 +1,8 @@ -use crate::error; +use crate::{ + error::{self, Error}, + Job, +}; -use super::{Error, Job}; use std::io::prelude::*; pub trait FaktoryCommand { diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index c5516933..a35081fd 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -95,7 +95,7 @@ pub fn read_ok(r: R) -> Result<(), Error> { // ---------------------------------------------- // -// below is the implementation of the Redis RESP error::protocol +// below is the implementation of the Redis RESP protocol // // ---------------------------------------------- @@ -113,7 +113,7 @@ fn read(mut r: R) -> Result { match cmdbuf[0] { b'+' => { // Simple String - // https://redis.io/topics/error::protocol#resp-simple-strings + // https://redis.io/topics/protocol#resp-simple-strings let mut s = String::new(); r.read_line(&mut s)?; @@ -125,7 +125,7 @@ fn read(mut r: R) -> Result { } b'-' => { // Error - // https://redis.io/topics/error::protocol#resp-errors + // https://redis.io/topics/protocol#resp-errors let mut s = String::new(); r.read_line(&mut s)?; @@ -137,7 +137,7 @@ fn read(mut r: R) -> Result { } b':' => { // Integer - // https://redis.io/topics/error::protocol#resp-integers + // https://redis.io/topics/protocol#resp-integers let mut s = String::with_capacity(32); r.read_line(&mut s)?; @@ -188,7 +188,7 @@ fn read(mut r: R) -> Result { } b'*' => { // Arrays - // https://redis.io/topics/error::protocol#resp-arrays + // https://redis.io/topics/protocol#resp-arrays // // not used in faktory. // *and* you can't really skip them unless you parse them. From feb9dfdc55490814e0e222465124152190130e0f Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Tue, 10 May 2022 14:22:24 -0700 Subject: [PATCH 19/20] =?UTF-8?q?Fix=20overzealous=20find=20and=20replace?= =?UTF-8?q?=20=F0=9F=98=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/proto/single/resp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index a35081fd..704bcba4 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -157,7 +157,7 @@ fn read(mut r: R) -> Result { } b'$' => { // Bulk String - // https://redis.io/topics/error::protocol#resp-bulk-strings + // https://redis.io/topics/protocol#resp-bulk-strings let mut bytes = Vec::with_capacity(32); r.read_until(b'\n', &mut bytes)?; let s = std::str::from_utf8(&bytes[0..bytes.len() - 2]).map_err(|_| { From 9cbbced853d53cff9cad6277a6722cb5b9c396ed Mon Sep 17 00:00:00 2001 From: Kit Martin Date: Tue, 10 May 2022 14:29:05 -0700 Subject: [PATCH 20/20] PR comments --- src/error.rs | 4 ---- src/proto/single/cmd.rs | 50 +++++++++++++++++++---------------------- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/src/error.rs b/src/error.rs index e98d73e2..6c3d4dd9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -148,7 +148,3 @@ impl Protocol { } } } - -pub(crate) fn wrap_serde_io(err: std::io::Error) -> Error { - Error::Serialization(serde_json::Error::io(err)) -} diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index a4ecf11d..d592cd91 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -1,7 +1,4 @@ -use crate::{ - error::{self, Error}, - Job, -}; +use crate::{error::Error, Job}; use std::io::prelude::*; @@ -11,15 +8,14 @@ pub trait FaktoryCommand { /// Write queues as part of a command. They are written with a leading space /// followed by space separated queue names. -fn write_queues(w: &mut dyn Write, queues: &[S]) -> Result<(), serde_json::Error> +fn write_queues(w: &mut dyn Write, queues: &[S]) -> Result<(), Error> where W: Write, S: AsRef, { for q in queues { - w.write_all(b" ").map_err(serde_json::Error::io)?; - w.write_all(q.as_ref().as_bytes()) - .map_err(serde_json::Error::io)?; + w.write_all(b" ")?; + w.write_all(q.as_ref().as_bytes())?; } Ok(()) @@ -31,7 +27,7 @@ pub struct Info; impl FaktoryCommand for Info { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"INFO\r\n").map_err(error::wrap_serde_io) + Ok(w.write_all(b"INFO\r\n")?) } } @@ -45,9 +41,9 @@ pub struct Ack { impl FaktoryCommand for Ack { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"ACK ").map_err(error::wrap_serde_io)?; + w.write_all(b"ACK ")?; serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; - w.write_all(b"\r\n").map_err(error::wrap_serde_io) + Ok(w.write_all(b"\r\n")?) } } @@ -68,9 +64,9 @@ pub struct Heartbeat { impl FaktoryCommand for Heartbeat { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"BEAT ").map_err(error::wrap_serde_io)?; + w.write_all(b"BEAT ")?; serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; - w.write_all(b"\r\n").map_err(error::wrap_serde_io) + Ok(w.write_all(b"\r\n")?) } } @@ -95,9 +91,9 @@ pub struct Fail { impl FaktoryCommand for Fail { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"FAIL ").map_err(error::wrap_serde_io)?; + w.write_all(b"FAIL ")?; serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; - w.write_all(b"\r\n").map_err(error::wrap_serde_io) + Ok(w.write_all(b"\r\n")?) } } @@ -126,7 +122,7 @@ pub struct End; impl FaktoryCommand for End { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"END\r\n").map_err(error::wrap_serde_io) + Ok(w.write_all(b"END\r\n")?) } } @@ -145,11 +141,11 @@ where { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { if self.queues.is_empty() { - w.write_all(b"FETCH\r\n").map_err(error::wrap_serde_io)?; + w.write_all(b"FETCH\r\n")?; } else { - w.write_all(b"FETCH").map_err(error::wrap_serde_io)?; - write_queues::(w, self.queues).map_err(Error::Serialization)?; - w.write_all(b"\r\n").map_err(error::wrap_serde_io)?; + w.write_all(b"FETCH")?; + write_queues::(w, self.queues)?; + w.write_all(b"\r\n")?; } Ok(()) } @@ -215,9 +211,9 @@ impl Hello { impl FaktoryCommand for Hello { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"HELLO ").map_err(error::wrap_serde_io)?; + w.write_all(b"HELLO ")?; serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; - w.write_all(b"\r\n").map_err(error::wrap_serde_io) + Ok(w.write_all(b"\r\n")?) } } @@ -241,9 +237,9 @@ impl From for Push { impl FaktoryCommand for Push { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"PUSH ").map_err(error::wrap_serde_io)?; + w.write_all(b"PUSH ")?; serde_json::to_writer(&mut *w, &**self).map_err(Error::Serialization)?; - w.write_all(b"\r\n").map_err(error::wrap_serde_io) + Ok(w.write_all(b"\r\n")?) } } @@ -269,9 +265,9 @@ impl> FaktoryCommand for QueueControl<'_, S> { QueueAction::Resume => b"QUEUE RESUME".as_ref(), }; - w.write_all(command).map_err(error::wrap_serde_io)?; - write_queues::(w, self.queues).map_err(Error::Serialization)?; - w.write_all(b"\r\n").map_err(error::wrap_serde_io) + w.write_all(command)?; + write_queues::(w, self.queues)?; + Ok(w.write_all(b"\r\n")?) } }