diff --git a/Cargo.toml b/Cargo.toml index 5acfec81..18e6b12c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,9 @@ chrono = { version = "0.4", features = ["serde"] } url = "2" atomic-option = "0.1" fnv = "1.0.3" -failure = "0.1" native-tls = { version = "0.2", optional = true } clap = { version = "2.27.1", optional = true } +thiserror = "1.0.30" [dev-dependencies] mockstream = "0.0.3" diff --git a/src/bin/loadtest.rs b/src/bin/loadtest.rs index cc82c368..5186bc78 100644 --- a/src/bin/loadtest.rs +++ b/src/bin/loadtest.rs @@ -1,6 +1,5 @@ #[macro_use] extern crate clap; -extern crate failure; extern crate faktory; extern crate rand; extern crate serde_json; @@ -54,7 +53,7 @@ fn main() { let popped = sync::Arc::new(atomic::AtomicUsize::new(0)); let start = time::Instant::now(); - let threads: Vec>> = (0..threads) + let threads: Vec>> = (0..threads) .map(|_| { let pushed = sync::Arc::clone(&pushed); let popped = sync::Arc::clone(&popped); diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 92015904..fb465cd5 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -1,6 +1,6 @@ +use crate::error::Error; use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect}; use atomic_option::AtomicOption; -use failure::Error; use fnv::FnvHashMap; use std::error::Error as StdError; use std::io::prelude::*; @@ -397,7 +397,7 @@ where // the resulting OK that failed. in that case, we would get an error response // when re-sending the job response. this should not count as critical. other // errors, however, should! - if e.downcast_ref::<::std::io::Error>().is_some() { + if let Error::IO(_) = e { last_job_result.swap(res, atomic::Ordering::SeqCst); return Err(e); } diff --git a/src/error.rs b/src/error.rs index 9dfa1c1b..6c3d4dd9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,26 +1,107 @@ +//! Enumerates all errors that this crate may return. +//! +//! [`Error`] is the top level error enum. +//! Most consumers should only need to interact with this type. +//! This is also where more generic errors such as I/O errors are placed, +//! whereas the more specific errors ([`Connection`] and [`Protocol`]) are +//! related to logic. +//! +//! [`Connect`] describes errors specific to the connection logic, for example +//! version mismatches or an invalid URL. +//! +//! [`Protocol`] describes lower-level errors relating to communication +//! with the faktory server. Typically, [`Protocol`] errors are the result +//! of the server sending a response this client did not expect. + +use thiserror::Error; + +/// The set of observable errors when interacting with a Faktory server. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum Error { + /// The connection to the server, or one of its prerequisites, failed. + #[error("connection")] + Connect(#[from] Connect), + + /// Underlying I/O layer errors. + /// + /// These are overwhelmingly network communication errors on the socket connection to the server. + #[error("underlying I/O")] + IO(#[from] std::io::Error), + + /// Application-level errors. + /// + /// These generally indicate a mismatch between what the client expects and what the server expects. + #[error("protocol")] + Protocol(#[from] Protocol), + + /// Faktory payloads are JSON encoded. + /// + /// This error is one that was encountered when attempting to serialize or deserialize communication with the server. + /// These generally indicate a mismatch between what the client expects and what the server provided. + #[error("serialization")] + Serialization(#[source] serde_json::Error), + + /// Indicates an error in the underlying TLS stream. + #[cfg(feature = "tls")] + #[error("underlying tls stream")] + TlsStream(#[source] native_tls::Error), +} + +/// Errors specific to connection logic. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum Connect { + /// The scheme portion of the connection address provided is invalid. + #[error("unknown scheme: {scheme}")] + BadScheme { + /// The scheme that was provided in the connection address. + scheme: String, + }, + + /// The provided connection address does not contain a hostname. + #[error("no hostname given")] + MissingHostname, + + /// The server requires authentication, but none was provided. + #[error("server requires authentication")] + AuthenticationNeeded, + + /// The server expects a different protocol version than this library supports. + #[error("server version mismatch (theirs: {theirs}, ours: {ours})")] + VersionMismatch { + /// The protocol version this library supports. + ours: usize, + + /// The protocol version the server expects. + theirs: usize, + }, + + /// The connection address provided was not able to be parsed. + #[error("parse URL")] + ParseUrl(#[source] url::ParseError), +} + /// The set of observable application-level errors when interacting with a Faktory server. -#[derive(Debug, Fail)] -#[allow(clippy::manual_non_exhaustive)] -pub enum FaktoryError { +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum Protocol { /// The server reports that an issued request was malformed. - #[fail(display = "request was malformed: {}", desc)] + #[error("request was malformed: {desc}")] Malformed { /// Error reported by server desc: String, }, /// The server responded with an error. - #[fail(display = "an internal server error occurred: {}", msg)] + #[error("an internal server error occurred: {msg}")] Internal { /// The error message given by the server. msg: String, }, /// The server sent a response that did not match what was expected. - #[fail( - display = "expected {}, got unexpected response: {}", - expected, received - )] + #[error("expected {expected}, got unexpected response: {received}")] BadType { /// The expected response type. expected: &'static str, @@ -30,10 +111,7 @@ pub enum FaktoryError { }, /// The server sent a malformed response. - #[fail( - display = "server sent malformed {} response: {} in {:?}", - typed_as, error, bytes - )] + #[error("server sent malformed {typed_as} response: {error} in {bytes:?}")] BadResponse { /// The type of the server response. typed_as: &'static str, @@ -44,33 +122,27 @@ pub enum FaktoryError { /// The relevant bytes sent by the server. bytes: Vec, }, - - // We're going to add more error types in the future - // https://github.com/rust-lang/rust/issues/44109 - #[fail(display = "unreachable")] - #[doc(hidden)] - __Nonexhaustive, } -impl FaktoryError { +impl Protocol { pub(crate) fn new(line: String) -> Self { let mut parts = line.splitn(2, ' '); let code = parts.next(); let error = parts.next(); if error.is_none() { - return FaktoryError::Internal { + return Protocol::Internal { msg: code.unwrap().to_string(), }; } let error = error.unwrap().to_string(); match code { - Some("ERR") => FaktoryError::Internal { msg: error }, - Some("MALFORMED") => FaktoryError::Malformed { desc: error }, - Some(c) => FaktoryError::Internal { + Some("ERR") => Protocol::Internal { msg: error }, + Some("MALFORMED") => Protocol::Malformed { desc: error }, + Some(c) => Protocol::Internal { msg: format!("{} {}", c, error), }, - None => FaktoryError::Internal { + None => Protocol::Internal { msg: "empty error response".to_string(), }, } diff --git a/src/lib.rs b/src/lib.rs index 94cee0e6..922725a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,13 +57,11 @@ #![deny(missing_docs)] #![warn(rust_2018_idioms)] -#[macro_use] -extern crate failure; #[macro_use] extern crate serde_derive; mod consumer; -mod error; +pub mod error; mod producer; mod proto; @@ -73,7 +71,7 @@ mod tls; pub use tls::TlsStream; pub use crate::consumer::{Consumer, ConsumerBuilder}; -pub use crate::error::FaktoryError; +pub use crate::error::Error; pub use crate::producer::Producer; pub use crate::proto::Job; pub use crate::proto::Reconnect; diff --git a/src/producer/mod.rs b/src/producer/mod.rs index db10eb9f..25b413bc 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -1,5 +1,5 @@ +use crate::error::Error; use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl}; -use failure::Error; use std::io::prelude::*; use std::net::TcpStream; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index b1a54418..ea7a4f58 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,6 +1,5 @@ -use crate::FaktoryError; +use crate::error::{self, Error}; use bufstream::BufStream; -use failure::Error; use libc::getpid; use std::io; use std::io::prelude::*; @@ -17,21 +16,6 @@ pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, Queue // responses that users can see pub use self::single::Hi; -#[derive(Debug, Fail)] -pub enum ConnectError { - #[fail(display = "unknown scheme: {}", scheme)] - BadScheme { scheme: String }, - #[fail(display = "no hostname given")] - MissingHostname, - #[fail(display = "server requires authentication")] - AuthenticationNeeded, - #[fail( - display = "server version mismatch (theirs: {}, ours: {})", - theirs, ours - )] - VersionMismatch { ours: usize, theirs: usize }, -} - pub(crate) fn get_env_url() -> String { use std::env; let var = env::var("FAKTORY_PROVIDER").unwrap_or_else(|_| "FAKTORY_URL".to_string()); @@ -43,16 +27,16 @@ pub(crate) fn host_from_url(url: &Url) -> String { } pub(crate) fn url_parse(url: &str) -> Result { - let url = Url::parse(url)?; + let url = Url::parse(url).map_err(error::Connect::ParseUrl)?; if url.scheme() != "tcp" { - return Err(ConnectError::BadScheme { + return Err(error::Connect::BadScheme { scheme: url.scheme().to_string(), } .into()); } if url.host_str().is_none() || url.host_str().unwrap().is_empty() { - return Err(ConnectError::MissingHostname.into()); + return Err(error::Connect::MissingHostname.into()); } Ok(url) @@ -154,7 +138,7 @@ impl Client { let hi = single::read_hi(&mut self.stream)?; if hi.version != EXPECTED_PROTOCOL_VERSION { - return Err(ConnectError::VersionMismatch { + return Err(error::Connect::VersionMismatch { ours: EXPECTED_PROTOCOL_VERSION, theirs: hi.version, } @@ -196,9 +180,10 @@ impl Client { if let Some(ref pwd) = self.opts.password { hello.set_password(&hi, pwd); } else { - return Err(ConnectError::AuthenticationNeeded.into()); + return Err(error::Connect::AuthenticationNeeded.into()); } } + single::write_command_and_await_ok(&mut self.stream, &hello) } } @@ -241,7 +226,7 @@ impl Client { { Some("terminate") => Ok(HeartbeatStatus::Terminate), Some("quiet") => Ok(HeartbeatStatus::Quiet), - _ => Err(FaktoryError::BadType { + _ => Err(error::Protocol::BadType { expected: "heartbeat response", received: format!("{}", s), } diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index ffb50311..d592cd91 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -1,5 +1,5 @@ -use super::Job; -use failure::Error; +use crate::{error::Error, Job}; + use std::io::prelude::*; pub trait FaktoryCommand { @@ -8,15 +8,14 @@ pub trait FaktoryCommand { /// Write queues as part of a command. They are written with a leading space /// followed by space separated queue names. -fn write_queues(w: &mut dyn Write, queues: &[S]) -> Result<(), serde_json::Error> +fn write_queues(w: &mut dyn Write, queues: &[S]) -> Result<(), Error> where W: Write, S: AsRef, { for q in queues { - w.write_all(b" ").map_err(serde_json::Error::io)?; - w.write_all(q.as_ref().as_bytes()) - .map_err(serde_json::Error::io)?; + w.write_all(b" ")?; + w.write_all(q.as_ref().as_bytes())?; } Ok(()) @@ -28,7 +27,7 @@ pub struct Info; impl FaktoryCommand for Info { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - Ok(w.write_all(b"INFO\r\n").map_err(serde_json::Error::io)?) + Ok(w.write_all(b"INFO\r\n")?) } } @@ -42,9 +41,9 @@ pub struct Ack { impl FaktoryCommand for Ack { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"ACK ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"ACK ")?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) } } @@ -65,9 +64,9 @@ pub struct Heartbeat { impl FaktoryCommand for Heartbeat { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"BEAT ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"BEAT ")?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) } } @@ -92,9 +91,9 @@ pub struct Fail { impl FaktoryCommand for Fail { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"FAIL ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"FAIL ")?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) } } @@ -123,7 +122,7 @@ pub struct End; impl FaktoryCommand for End { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - Ok(w.write_all(b"END\r\n").map_err(serde_json::Error::io)?) + Ok(w.write_all(b"END\r\n")?) } } @@ -142,11 +141,11 @@ where { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { if self.queues.is_empty() { - w.write_all(b"FETCH\r\n").map_err(serde_json::Error::io)?; + w.write_all(b"FETCH\r\n")?; } else { - w.write_all(b"FETCH").map_err(serde_json::Error::io)?; + w.write_all(b"FETCH")?; write_queues::(w, self.queues)?; - w.write_all(b"\r\n").map_err(serde_json::Error::io)?; + w.write_all(b"\r\n")?; } Ok(()) } @@ -212,9 +211,9 @@ impl Hello { impl FaktoryCommand for Hello { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"HELLO ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"HELLO ")?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) } } @@ -238,9 +237,9 @@ impl From for Push { impl FaktoryCommand for Push { fn issue(&self, w: &mut dyn Write) -> Result<(), Error> { - w.write_all(b"PUSH ").map_err(serde_json::Error::io)?; - serde_json::to_writer(&mut *w, &**self)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + w.write_all(b"PUSH ")?; + serde_json::to_writer(&mut *w, &**self).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) } } @@ -266,9 +265,9 @@ impl> FaktoryCommand for QueueControl<'_, S> { QueueAction::Resume => b"QUEUE RESUME".as_ref(), }; - w.write_all(command).map_err(serde_json::Error::io)?; + w.write_all(command)?; write_queues::(w, self.queues)?; - Ok(w.write_all(b"\r\n").map_err(serde_json::Error::io)?) + Ok(w.write_all(b"\r\n")?) } } diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index f9847ae2..9eca8d8e 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -1,11 +1,12 @@ use chrono::{DateTime, Utc}; -use failure::Error; use std::collections::HashMap; use std::io::prelude::*; mod cmd; mod resp; +use crate::error::Error; + pub use self::cmd::*; pub use self::resp::*; @@ -71,7 +72,7 @@ pub struct Job { /// /// This field is read-only. #[serde(skip_serializing)] - pub(crate) failure: Option, + failure: Option, /// Extra context to include with the job. /// @@ -155,6 +156,11 @@ impl Job { pub fn args(&self) -> &[serde_json::Value] { &self.args } + + /// Data about this job's most recent failure. + pub fn failure(&self) -> &Option { + &self.failure + } } pub fn write_command(w: &mut W, command: &C) -> Result<(), Error> { diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index b274d9aa..704bcba4 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,8 +1,7 @@ -use crate::FaktoryError; -use failure::Error; +use crate::error::{self, Error}; use std::io::prelude::*; -fn bad(expected: &'static str, got: &RawResponse) -> FaktoryError { +fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol { let stringy = match *got { RawResponse::String(ref s) => Some(&**s), RawResponse::Blob(ref b) => { @@ -16,11 +15,11 @@ fn bad(expected: &'static str, got: &RawResponse) -> FaktoryError { }; match stringy { - Some(s) => FaktoryError::BadType { + Some(s) => error::Protocol::BadType { expected, received: s.to_string(), }, - None => FaktoryError::BadType { + None => error::Protocol::BadType { expected, received: format!("{:?}", got), }, @@ -36,7 +35,9 @@ pub fn read_json(r: R) -> Result { - return Ok(serde_json::from_str(s).map(Some)?); + return serde_json::from_str(s) + .map(Some) + .map_err(Error::Serialization); } RawResponse::Blob(ref b) if b == b"OK" => { return Ok(None); @@ -45,7 +46,9 @@ pub fn read_json(r: R) -> Result return Ok(None), _ => {} @@ -70,7 +73,7 @@ pub fn read_hi(r: R) -> Result { let rr = read(r)?; if let RawResponse::String(ref s) = rr { if let Some(s) = s.strip_prefix("HI ") { - return Ok(serde_json::from_str(s)?); + return serde_json::from_str(s).map_err(Error::Serialization); } } @@ -130,7 +133,7 @@ fn read(mut r: R) -> Result { let l = s.len() - 2; s.truncate(l); - Err(FaktoryError::new(s).into()) + Err(error::Protocol::new(s).into()) } b':' => { // Integer @@ -144,7 +147,7 @@ fn read(mut r: R) -> Result { match (&*s).parse::() { Ok(i) => Ok(RawResponse::Number(i)), - Err(_) => Err(FaktoryError::BadResponse { + Err(_) => Err(error::Protocol::BadResponse { typed_as: "integer", error: "invalid integer value", bytes: s.into_bytes(), @@ -158,18 +161,20 @@ fn read(mut r: R) -> Result { let mut bytes = Vec::with_capacity(32); r.read_until(b'\n', &mut bytes)?; let s = std::str::from_utf8(&bytes[0..bytes.len() - 2]).map_err(|_| { - FaktoryError::BadResponse { + error::Protocol::BadResponse { typed_as: "bulk string", error: "server bulk response contains non-utf8 size prefix", bytes: bytes[0..bytes.len() - 2].to_vec(), } })?; - let size = s.parse::().map_err(|_| FaktoryError::BadResponse { - typed_as: "bulk string", - error: "server bulk response size prefix is not an integer", - bytes: s.as_bytes().to_vec(), - })?; + let size = s + .parse::() + .map_err(|_| error::Protocol::BadResponse { + typed_as: "bulk string", + error: "server bulk response size prefix is not an integer", + bytes: s.as_bytes().to_vec(), + })?; if size == -1 { Ok(RawResponse::Null) @@ -191,7 +196,7 @@ fn read(mut r: R) -> Result { // so we'll just give up unimplemented!(); } - c => Err(FaktoryError::BadResponse { + c => Err(error::Protocol::BadResponse { typed_as: "unknown", error: "invalid response type prefix", bytes: vec![c], @@ -223,8 +228,7 @@ impl From> for RawResponse { #[cfg(test)] mod test { use super::{read, RawResponse}; - use crate::FaktoryError; - use failure::Error; + use crate::error::{self, Error}; use serde_json::{self, Map, Value}; use std::io::{self, Cursor}; @@ -247,10 +251,9 @@ mod test { #[test] fn it_errors_on_bad_numbers() { let c = Cursor::new(b":x\r\n"); - let r = read(c).unwrap_err(); - if let &FaktoryError::BadResponse { + if let Error::Protocol(error::Protocol::BadResponse { typed_as, error, .. - } = r.downcast_ref().unwrap() + }) = read(c).unwrap_err() { assert_eq!(typed_as, "integer"); assert_eq!(error, "invalid integer value"); @@ -262,8 +265,7 @@ mod test { #[test] fn it_parses_errors() { let c = Cursor::new(b"-ERR foo\r\n"); - let r = read(c).unwrap_err(); - if let &FaktoryError::Internal { ref msg } = r.downcast_ref().unwrap() { + if let Error::Protocol(error::Protocol::Internal { ref msg }) = read(c).unwrap_err() { assert_eq!(msg, "foo"); } else { unreachable!(); @@ -286,10 +288,9 @@ mod test { #[test] fn it_errors_on_bad_sizes() { let c = Cursor::new(b"$x\r\n\r\n"); - let r = read(c).unwrap_err(); - if let &FaktoryError::BadResponse { + if let Error::Protocol(error::Protocol::BadResponse { typed_as, error, .. - } = r.downcast_ref().unwrap() + }) = read(c).unwrap_err() { assert_eq!(typed_as, "bulk string"); assert_eq!(error, "server bulk response size prefix is not an integer"); @@ -356,25 +357,30 @@ mod test { #[test] fn it_errors_on_bad_json_blob() { let c = Cursor::new(b"$9\r\n{\"hello\"}\r\n"); - let r = read_json(c).unwrap_err(); - let _: &serde_json::Error = r.downcast_ref().unwrap(); + if let Error::Serialization(err) = read_json(c).unwrap_err() { + let _: serde_json::Error = err; + } else { + unreachable!(); + } } #[test] fn it_errors_on_bad_json_string() { let c = Cursor::new(b"+{\"hello\"}\r\n"); - let r = read_json(c).unwrap_err(); - let _: &serde_json::Error = r.downcast_ref().unwrap(); + if let Error::Serialization(err) = read_json(c).unwrap_err() { + let _: serde_json::Error = err; + } else { + unreachable!(); + } } #[test] fn json_error_on_number() { let c = Cursor::new(b":9\r\n"); - let r = read_json(c).unwrap_err(); - if let &FaktoryError::BadType { + if let Error::Protocol(error::Protocol::BadType { expected, ref received, - } = r.downcast_ref().unwrap() + }) = read_json(c).unwrap_err() { assert_eq!(expected, "json"); assert_eq!(received, "Number(9)"); @@ -386,10 +392,9 @@ mod test { #[test] fn it_errors_on_unknown_resp_type() { let c = Cursor::new(b"^\r\n"); - let r = read_json(c).unwrap_err(); - if let &FaktoryError::BadResponse { + if let Error::Protocol(error::Protocol::BadResponse { typed_as, error, .. - } = r.downcast_ref().unwrap() + }) = read_json(c).unwrap_err() { assert_eq!(typed_as, "unknown"); assert_eq!(error, "invalid response type prefix"); diff --git a/src/tls.rs b/src/tls.rs index b5a588ae..f9db6ca4 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -1,5 +1,5 @@ use crate::proto::{self, Reconnect}; -use failure::Error; +use crate::Error; use native_tls::TlsConnector; use native_tls::TlsStream as NativeTlsStream; use std::io; @@ -41,7 +41,10 @@ impl TlsStream { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub fn connect(url: Option<&str>) -> Result { - TlsStream::with_connector(TlsConnector::builder().build()?, url) + TlsStream::with_connector( + TlsConnector::builder().build().map_err(Error::TlsStream)?, + url, + ) } /// Create a new TLS connection over TCP using a non-default TLS configuration. @@ -70,14 +73,13 @@ where /// Create a new TLS connection on an existing stream with a non-default TLS configuration. pub fn new(stream: S, tls: TlsConnector, hostname: &str) -> io::Result { let stream = tls - .clone() .connect(hostname, stream) .map_err(|e| io::Error::new(io::ErrorKind::ConnectionAborted, e))?; Ok(TlsStream { connector: tls, hostname: hostname.to_string(), - stream: stream, + stream, }) } } diff --git a/tests/consumer.rs b/tests/consumer.rs index de10dce7..ee7c4aea 100644 --- a/tests/consumer.rs +++ b/tests/consumer.rs @@ -1,4 +1,3 @@ -extern crate failure; extern crate faktory; extern crate mockstream; extern crate serde_json; diff --git a/tests/mock/mod.rs b/tests/mock/mod.rs index 8abeeae7..86835b21 100644 --- a/tests/mock/mod.rs +++ b/tests/mock/mod.rs @@ -81,12 +81,12 @@ impl Stream { let mut inner = Inner { take_next: 0, - streams: streams, + streams, }; let mine = inner.take_stream(); Stream { - mine: mine, + mine, all: Arc::new(Mutex::new(inner)), } } diff --git a/tests/producer.rs b/tests/producer.rs index 8160101c..1db4c82d 100644 --- a/tests/producer.rs +++ b/tests/producer.rs @@ -1,4 +1,3 @@ -extern crate failure; extern crate faktory; extern crate mockstream; extern crate serde_json; @@ -93,7 +92,7 @@ fn queue_control() { s.ignore(0); s.ok(0); - p.queue_pause(&vec!["test", "test2"]).unwrap(); + p.queue_pause(&["test", "test2"]).unwrap(); s.ok(0); p.queue_resume(&["test3".to_string(), "test4".to_string()]) diff --git a/tests/real.rs b/tests/real.rs index 3d503418..c60c2c04 100644 --- a/tests/real.rs +++ b/tests/real.rs @@ -1,4 +1,3 @@ -extern crate failure; extern crate faktory; extern crate serde_json; extern crate url;