Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ chrono = { version = "0.4", features = ["serde"] }
url = "2"
atomic-option = "0.1"
fnv = "1.0.3"
failure = "0.1"
native-tls = { version = "0.2", optional = true }
clap = { version = "2.27.1", optional = true }
thiserror = "1.0.30"

[dev-dependencies]
mockstream = "0.0.3"
Expand Down
3 changes: 1 addition & 2 deletions src/bin/loadtest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#[macro_use]
extern crate clap;
extern crate failure;
extern crate faktory;
extern crate rand;
extern crate serde_json;
Expand Down Expand Up @@ -54,7 +53,7 @@ fn main() {
let popped = sync::Arc::new(atomic::AtomicUsize::new(0));

let start = time::Instant::now();
let threads: Vec<thread::JoinHandle<Result<_, failure::Error>>> = (0..threads)
let threads: Vec<thread::JoinHandle<Result<_, Error>>> = (0..threads)
.map(|_| {
let pushed = sync::Arc::clone(&pushed);
let popped = sync::Arc::clone(&popped);
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::Error;
use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect};
use atomic_option::AtomicOption;
use failure::Error;
use fnv::FnvHashMap;
use std::error::Error as StdError;
use std::io::prelude::*;
Expand Down Expand Up @@ -397,7 +397,7 @@ where
// the resulting OK that failed. in that case, we would get an error response
// when re-sending the job response. this should not count as critical. other
// errors, however, should!
if e.downcast_ref::<::std::io::Error>().is_some() {
if let Error::GenericIO(_) = e {
last_job_result.swap(res, atomic::Ordering::SeqCst);
return Err(e);
}
Expand Down
76 changes: 57 additions & 19 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,65 @@
use thiserror::Error;

use crate::proto::ConnectError;

/// The set of observable errors when interacting with a Faktory server.
#[derive(Debug, Error)]
#[allow(clippy::manual_non_exhaustive)]
pub enum Error {
/// The connection to the server, or one of its prerequisites, failed.
#[error("connection error: {0}")]
Connect(#[from] ConnectError),

/// Underlying io layer errors.
/// These are overwhelmingly network communication errors on the socket connection to the server.
#[error("underlying i/o: {0}")]
GenericIO(#[from] std::io::Error),

/// Application-level errors.
/// These generally indicate a mismatch between what the client expects and what the server expects.
#[error("protocol: {0}")]
Protocol(#[from] ProtocolError),

/// Faktory payloads are JSON encoded.
/// This error is one that was encountered when attempting to deserialize a response from the server.
/// These generally indicate a mismatch between what the client expects and what the server provided.
#[error("deserialize payload: {0}")]
DeserializePayload(#[from] serde_json::Error),

/// Indicates an error in the underlying TLS stream.
#[cfg(feature = "tls")]
#[error("underlying tls stream: {0}")]
TlsStream(#[from] native_tls::Error),

// We're going to add more error types in the future
// https://github.com/rust-lang/rust/issues/44109
//
// This forces users to write pattern matches with a catch-all `_` arm.
#[error("unreachable")]
#[doc(hidden)]
__Nonexhaustive,
}

/// The set of observable application-level errors when interacting with a Faktory server.
#[derive(Debug, Fail)]
#[derive(Debug, Error)]
#[allow(clippy::manual_non_exhaustive)]
pub enum FaktoryError {
pub enum ProtocolError {
/// The server reports that an issued request was malformed.
#[fail(display = "request was malformed: {}", desc)]
#[error("request was malformed: {desc}")]
Malformed {
/// Error reported by server
desc: String,
},

/// The server responded with an error.
#[fail(display = "an internal server error occurred: {}", msg)]
#[error("an internal server error occurred: {msg}")]
Internal {
/// The error message given by the server.
msg: String,
},

/// The server sent a response that did not match what was expected.
#[fail(
display = "expected {}, got unexpected response: {}",
expected, received
)]
#[error("expected {expected}, got unexpected response: {received}")]
BadType {
/// The expected response type.
expected: &'static str,
Expand All @@ -30,10 +69,7 @@ pub enum FaktoryError {
},

/// The server sent a malformed response.
#[fail(
display = "server sent malformed {} response: {} in {:?}",
typed_as, error, bytes
)]
#[error("server sent malformed {typed_as} response: {error} in {bytes:?}")]
BadResponse {
/// The type of the server response.
typed_as: &'static str,
Expand All @@ -47,30 +83,32 @@ pub enum FaktoryError {

// We're going to add more error types in the future
// https://github.com/rust-lang/rust/issues/44109
#[fail(display = "unreachable")]
//
// This forces users to write pattern matches with a catch-all `_` arm.
#[error("unreachable")]
#[doc(hidden)]
__Nonexhaustive,
}

impl FaktoryError {
impl ProtocolError {
pub(crate) fn new(line: String) -> Self {
let mut parts = line.splitn(2, ' ');
let code = parts.next();
let error = parts.next();
if error.is_none() {
return FaktoryError::Internal {
return ProtocolError::Internal {
msg: code.unwrap().to_string(),
};
}
let error = error.unwrap().to_string();

match code {
Some("ERR") => FaktoryError::Internal { msg: error },
Some("MALFORMED") => FaktoryError::Malformed { desc: error },
Some(c) => FaktoryError::Internal {
Some("ERR") => ProtocolError::Internal { msg: error },
Some("MALFORMED") => ProtocolError::Malformed { desc: error },
Some(c) => ProtocolError::Internal {
msg: format!("{} {}", c, error),
},
None => FaktoryError::Internal {
None => ProtocolError::Internal {
msg: "empty error response".to_string(),
},
}
Expand Down
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
#![deny(missing_docs)]
#![warn(rust_2018_idioms)]

#[macro_use]
extern crate failure;
#[macro_use]
extern crate serde_derive;

Expand All @@ -73,7 +71,7 @@ mod tls;
pub use tls::TlsStream;

pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::FaktoryError;
pub use crate::error::{Error, ProtocolError};
pub use crate::producer::Producer;
pub use crate::proto::Job;
pub use crate::proto::Reconnect;
22 changes: 12 additions & 10 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::error::Error;
use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl};
use failure::Error;
use std::io::prelude::*;
use std::net::TcpStream;

/// Default Error type, aliased in module for convenience.
type Result<T> = std::result::Result<T, Error>;

/// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers.
///
/// # Connecting to Faktory
Expand Down Expand Up @@ -81,7 +84,7 @@ impl Producer<TcpStream> {
/// ```
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub fn connect(url: Option<&str>) -> Result<Self, Error> {
pub fn connect(url: Option<&str>) -> Result<Self> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
Expand All @@ -93,38 +96,37 @@ impl Producer<TcpStream> {

impl<S: Read + Write> Producer<S> {
/// Connect to a Faktory server with a non-standard stream.
pub fn connect_with(stream: S, pwd: Option<String>) -> Result<Producer<S>, Error> {
Ok(Producer {
c: Client::new_producer(stream, pwd)?,
})
pub fn connect_with(stream: S, pwd: Option<String>) -> Result<Producer<S>> {
let c = Client::new_producer(stream, pwd)?;
Ok(Producer { c })
}

/// Enqueue the given job on the Faktory server.
///
/// Returns `Ok` if the job was successfully queued by the Faktory server.
pub fn enqueue(&mut self, job: Job) -> Result<(), Error> {
pub fn enqueue(&mut self, job: Job) -> Result<()> {
self.c.issue(&Push::from(job))?.await_ok()
}

/// Retrieve information about the running server.
///
/// The returned value is the result of running the `INFO` command on the server.
pub fn info(&mut self) -> Result<serde_json::Value, Error> {
pub fn info(&mut self) -> Result<serde_json::Value> {
self.c
.issue(&Info)?
.read_json()
.map(|v| v.expect("info command cannot give empty response"))
}

/// Pause the given queues.
pub fn queue_pause<T: AsRef<str>>(&mut self, queues: &[T]) -> Result<(), Error> {
pub fn queue_pause<T: AsRef<str>>(&mut self, queues: &[T]) -> Result<()> {
self.c
.issue(&QueueControl::new(QueueAction::Pause, queues))?
.await_ok()
}

/// Resume the given queues.
pub fn queue_resume<T: AsRef<str>>(&mut self, queues: &[T]) -> Result<(), Error> {
pub fn queue_resume<T: AsRef<str>>(&mut self, queues: &[T]) -> Result<()> {
self.c
.issue(&QueueControl::new(QueueAction::Resume, queues))?
.await_ok()
Expand Down
Loading