Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ chrono = { version = "0.4", features = ["serde"] }
url = "2"
atomic-option = "0.1"
fnv = "1.0.3"
failure = "0.1"
native-tls = { version = "0.2", optional = true }
clap = { version = "2.27.1", optional = true }
thiserror = "1.0.30"

[dev-dependencies]
mockstream = "0.0.3"
Expand Down
3 changes: 1 addition & 2 deletions src/bin/loadtest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#[macro_use]
extern crate clap;
extern crate failure;
extern crate faktory;
extern crate rand;
extern crate serde_json;
Expand Down Expand Up @@ -54,7 +53,7 @@ fn main() {
let popped = sync::Arc::new(atomic::AtomicUsize::new(0));

let start = time::Instant::now();
let threads: Vec<thread::JoinHandle<Result<_, failure::Error>>> = (0..threads)
let threads: Vec<thread::JoinHandle<Result<_, Error>>> = (0..threads)
.map(|_| {
let pushed = sync::Arc::clone(&pushed);
let popped = sync::Arc::clone(&popped);
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::Error;
use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect};
use atomic_option::AtomicOption;
use failure::Error;
use fnv::FnvHashMap;
use std::error::Error as StdError;
use std::io::prelude::*;
Expand Down Expand Up @@ -397,7 +397,7 @@ where
// the resulting OK that failed. in that case, we would get an error response
// when re-sending the job response. this should not count as critical. other
// errors, however, should!
if e.downcast_ref::<::std::io::Error>().is_some() {
if let Error::IO(_) = e {
last_job_result.swap(res, atomic::Ordering::SeqCst);
return Err(e);
}
Expand Down
126 changes: 101 additions & 25 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,107 @@
//! Enumerates all errors that this crate may return.
//!
//! [`Error`] is the top level error enum.
//! Most consumers should only need to interact with this type.
//! This is also where more generic errors such as I/O errors are placed,
//! whereas the more specific errors ([`Connection`] and [`Protocol`]) are
//! related to logic.
//!
//! [`Connect`] describes errors specific to the connection logic, for example
//! version mismatches or an invalid URL.
//!
//! [`Protocol`] describes lower-level errors relating to communication
//! with the faktory server. Typically, [`Protocol`] errors are the result
//! of the server sending a response this client did not expect.

use thiserror::Error;

/// The set of observable errors when interacting with a Faktory server.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
/// The connection to the server, or one of its prerequisites, failed.
#[error("connection")]
Connect(#[from] Connect),

/// Underlying I/O layer errors.
///
/// These are overwhelmingly network communication errors on the socket connection to the server.
#[error("underlying I/O")]
IO(#[from] std::io::Error),

/// Application-level errors.
///
/// These generally indicate a mismatch between what the client expects and what the server expects.
#[error("protocol")]
Protocol(#[from] Protocol),

/// Faktory payloads are JSON encoded.
///
/// This error is one that was encountered when attempting to serialize or deserialize communication with the server.
/// These generally indicate a mismatch between what the client expects and what the server provided.
#[error("serialization")]
Serialization(#[source] serde_json::Error),

/// Indicates an error in the underlying TLS stream.
#[cfg(feature = "tls")]
#[error("underlying tls stream")]
TlsStream(#[source] native_tls::Error),
}

/// Errors specific to connection logic.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Connect {
/// The scheme portion of the connection address provided is invalid.
#[error("unknown scheme: {scheme}")]
BadScheme {
/// The scheme that was provided in the connection address.
scheme: String,
},

/// The provided connection address does not contain a hostname.
#[error("no hostname given")]
MissingHostname,

/// The server requires authentication, but none was provided.
#[error("server requires authentication")]
AuthenticationNeeded,

/// The server expects a different protocol version than this library supports.
#[error("server version mismatch (theirs: {theirs}, ours: {ours})")]
VersionMismatch {
/// The protocol version this library supports.
ours: usize,

/// The protocol version the server expects.
theirs: usize,
},

/// The connection address provided was not able to be parsed.
#[error("parse URL")]
ParseUrl(#[source] url::ParseError),
}

/// The set of observable application-level errors when interacting with a Faktory server.
#[derive(Debug, Fail)]
#[allow(clippy::manual_non_exhaustive)]
pub enum FaktoryError {
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Protocol {
/// The server reports that an issued request was malformed.
#[fail(display = "request was malformed: {}", desc)]
#[error("request was malformed: {desc}")]
Malformed {
/// Error reported by server
desc: String,
},

/// The server responded with an error.
#[fail(display = "an internal server error occurred: {}", msg)]
#[error("an internal server error occurred: {msg}")]
Internal {
/// The error message given by the server.
msg: String,
},

/// The server sent a response that did not match what was expected.
#[fail(
display = "expected {}, got unexpected response: {}",
expected, received
)]
#[error("expected {expected}, got unexpected response: {received}")]
BadType {
/// The expected response type.
expected: &'static str,
Expand All @@ -30,10 +111,7 @@ pub enum FaktoryError {
},

/// The server sent a malformed response.
#[fail(
display = "server sent malformed {} response: {} in {:?}",
typed_as, error, bytes
)]
#[error("server sent malformed {typed_as} response: {error} in {bytes:?}")]
BadResponse {
/// The type of the server response.
typed_as: &'static str,
Expand All @@ -44,35 +122,33 @@ pub enum FaktoryError {
/// The relevant bytes sent by the server.
bytes: Vec<u8>,
},

// We're going to add more error types in the future
// https://github.com/rust-lang/rust/issues/44109
#[fail(display = "unreachable")]
#[doc(hidden)]
__Nonexhaustive,
}

impl FaktoryError {
impl Protocol {
pub(crate) fn new(line: String) -> Self {
let mut parts = line.splitn(2, ' ');
let code = parts.next();
let error = parts.next();
if error.is_none() {
return FaktoryError::Internal {
return Protocol::Internal {
msg: code.unwrap().to_string(),
};
}
let error = error.unwrap().to_string();

match code {
Some("ERR") => FaktoryError::Internal { msg: error },
Some("MALFORMED") => FaktoryError::Malformed { desc: error },
Some(c) => FaktoryError::Internal {
Some("ERR") => Protocol::Internal { msg: error },
Some("MALFORMED") => Protocol::Malformed { desc: error },
Some(c) => Protocol::Internal {
msg: format!("{} {}", c, error),
},
None => FaktoryError::Internal {
None => Protocol::Internal {
msg: "empty error response".to_string(),
},
}
}
}

pub(crate) fn wrap_serde_io(err: std::io::Error) -> Error {
Error::Serialization(serde_json::Error::io(err))
}
6 changes: 2 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@
#![deny(missing_docs)]
#![warn(rust_2018_idioms)]

#[macro_use]
extern crate failure;
#[macro_use]
extern crate serde_derive;

mod consumer;
mod error;
pub mod error;
mod producer;
mod proto;

Expand All @@ -73,7 +71,7 @@ mod tls;
pub use tls::TlsStream;

pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::FaktoryError;
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Job;
pub use crate::proto::Reconnect;
2 changes: 1 addition & 1 deletion src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Error;
use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl};
use failure::Error;
use std::io::prelude::*;
use std::net::TcpStream;

Expand Down
31 changes: 8 additions & 23 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::FaktoryError;
use crate::error::{self, Error};
use bufstream::BufStream;
use failure::Error;
use libc::getpid;
use std::io;
use std::io::prelude::*;
Expand All @@ -17,21 +16,6 @@ pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, Queue
// responses that users can see
pub use self::single::Hi;

#[derive(Debug, Fail)]
pub enum ConnectError {
#[fail(display = "unknown scheme: {}", scheme)]
BadScheme { scheme: String },
#[fail(display = "no hostname given")]
MissingHostname,
#[fail(display = "server requires authentication")]
AuthenticationNeeded,
#[fail(
display = "server version mismatch (theirs: {}, ours: {})",
theirs, ours
)]
VersionMismatch { ours: usize, theirs: usize },
}

pub(crate) fn get_env_url() -> String {
use std::env;
let var = env::var("FAKTORY_PROVIDER").unwrap_or_else(|_| "FAKTORY_URL".to_string());
Expand All @@ -43,16 +27,16 @@ pub(crate) fn host_from_url(url: &Url) -> String {
}

pub(crate) fn url_parse(url: &str) -> Result<Url, Error> {
let url = Url::parse(url)?;
let url = Url::parse(url).map_err(error::Connect::ParseUrl)?;
if url.scheme() != "tcp" {
return Err(ConnectError::BadScheme {
return Err(error::Connect::BadScheme {
scheme: url.scheme().to_string(),
}
.into());
}

if url.host_str().is_none() || url.host_str().unwrap().is_empty() {
return Err(ConnectError::MissingHostname.into());
return Err(error::Connect::MissingHostname.into());
}

Ok(url)
Expand Down Expand Up @@ -154,7 +138,7 @@ impl<S: Read + Write> Client<S> {
let hi = single::read_hi(&mut self.stream)?;

if hi.version != EXPECTED_PROTOCOL_VERSION {
return Err(ConnectError::VersionMismatch {
return Err(error::Connect::VersionMismatch {
ours: EXPECTED_PROTOCOL_VERSION,
theirs: hi.version,
}
Expand Down Expand Up @@ -196,9 +180,10 @@ impl<S: Read + Write> Client<S> {
if let Some(ref pwd) = self.opts.password {
hello.set_password(&hi, pwd);
} else {
return Err(ConnectError::AuthenticationNeeded.into());
return Err(error::Connect::AuthenticationNeeded.into());
}
}

single::write_command_and_await_ok(&mut self.stream, &hello)
}
}
Expand Down Expand Up @@ -241,7 +226,7 @@ impl<S: Read + Write> Client<S> {
{
Some("terminate") => Ok(HeartbeatStatus::Terminate),
Some("quiet") => Ok(HeartbeatStatus::Quiet),
_ => Err(FaktoryError::BadType {
_ => Err(error::Protocol::BadType {
expected: "heartbeat response",
received: format!("{}", s),
}
Expand Down
Loading