diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index e9a5aff3..f2a3fe3d 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -34,23 +34,6 @@ jobs: components: rustfmt - name: cargo fmt --check run: cargo fmt --check - # This is currently a dedicated job due to the rustfmt's `group_imports` configuration - # option being available on the nightly channel only as of February 2024. - # Once stabilized, can be merged with the `stable / fmt` job in this workflow. - # See: https://github.com/rust-lang/rustfmt/issues/5083 - imports: - runs-on: ubuntu-latest - name: nightly / fmt (import grouping) - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Install nightly - uses: dtolnay/rust-toolchain@nightly - with: - components: rustfmt - - name: cargo +nightly fmt -- --config group_imports=one --check - run: cargo +nightly fmt -- --config group_imports=one --check clippy: runs-on: ubuntu-latest name: ${{ matrix.toolchain }} / clippy diff --git a/Cargo.lock b/Cargo.lock index 089113e5..fcdb50e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,7 +523,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", ] [[package]] @@ -564,9 +564,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.74" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2de98502f212cfcea8d0bb305bd0f49d7ebdd75b64ba0a68f937d888f4e0d6db" +checksum = "907a61bd0f64c2f29cd1cf1dc34d05176426a3f504a78010f08416ddb7b13708" dependencies = [ "unicode-ident", ] @@ -687,7 +687,7 @@ checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", ] [[package]] @@ -731,9 +731,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.46" +version = "2.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89456b690ff72fddcecf231caedbe615c59480c93358a93dfae7fc29e3ebbf0e" +checksum = "1726efe18f42ae774cc644f330953a5e7b3c3003d3edcecf18850fe9d4dd9afb" dependencies = [ "proc-macro2", "quote", @@ -770,7 +770,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", ] [[package]] @@ -871,7 +871,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", "wasm-bindgen-shared", ] @@ -893,7 +893,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.46", + "syn 2.0.47", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Makefile b/Makefile index 23782039..6e23d9c0 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,6 @@ check: cargo fmt --check cargo clippy cargo d --no-deps --all-features - cargo +nightly fmt -- --config group_imports=one --check .PHONY: doc doc: @@ -36,10 +35,6 @@ faktory/tls: faktory/tls/kill: docker compose -f docker/compose.yml down -.PHONY: sort -sort: - cargo +nightly fmt -- --config group_imports=one - .PHONY: test test: cargo t --locked --all-features --all-targets diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 5c9b313c..dbde323d 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -1,6 +1,8 @@ use crate::error::Error; -use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect}; -use crate::proto::{Ack, Fail, Job}; +use crate::proto::{ + self, parse_provided_or_from_env, Ack, Client, ClientOptions, Fail, HeartbeatStatus, Job, + Reconnect, +}; use fnv::FnvHashMap; use std::error::Error as StdError; use std::io::prelude::*; @@ -212,10 +214,7 @@ impl ConsumerBuilder { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub fn connect(self, url: Option<&str>) -> Result, Error> { - let url = match url { - Some(url) => proto::url_parse(url), - None => proto::url_parse(&proto::get_env_url()), - }?; + let url = parse_provided_or_from_env(url)?; let stream = TcpStream::connect(proto::host_from_url(&url))?; Self::connect_with(self, stream, url.password().map(|p| p.to_string())) } @@ -227,6 +226,7 @@ impl ConsumerBuilder { pwd: Option, ) -> Result, Error> { self.opts.password = pwd; + self.opts.is_worker = true; Ok(Consumer::new( Client::new(stream, self.opts)?, self.workers, diff --git a/src/lib.rs b/src/lib.rs index f2de05bb..9067ebea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,19 +65,31 @@ #[macro_use] extern crate serde_derive; -mod consumer; pub mod error; + +mod consumer; mod producer; mod proto; -#[cfg(feature = "tls")] -#[cfg_attr(docsrs, doc(cfg(feature = "tls")))] -mod tls; pub use crate::consumer::{Consumer, ConsumerBuilder}; pub use crate::error::Error; pub use crate::producer::Producer; -pub use crate::proto::Reconnect; -pub use crate::proto::{Job, JobBuilder}; +pub use crate::proto::{Client, Job, JobBuilder, Reconnect}; + +#[cfg(feature = "ent")] +#[cfg_attr(docsrs, doc(cfg(feature = "ent")))] +/// Constructs only available with the enterprise version of Faktory. +pub mod ent { + pub use crate::proto::{ + Batch, BatchBuilder, BatchHandle, BatchStatus, CallbackState, JobState, Progress, + ProgressUpdate, ProgressUpdateBuilder, + }; +} + +#[cfg(feature = "tls")] +#[cfg_attr(docsrs, doc(cfg(feature = "tls")))] +mod tls; + #[cfg(feature = "tls")] #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] pub use tls::TlsStream; diff --git a/src/producer/mod.rs b/src/producer/mod.rs index c99a050e..c92b111c 100644 --- a/src/producer/mod.rs +++ b/src/producer/mod.rs @@ -1,9 +1,12 @@ use crate::error::Error; -use crate::proto::{self, Client, Info, Job, Push, PushBulk, QueueAction, QueueControl}; +use crate::proto::{Client, Info, Job, Push, PushBulk, QueueAction, QueueControl}; use std::collections::HashMap; use std::io::prelude::*; use std::net::TcpStream; +#[cfg(feature = "ent")] +use crate::proto::{Batch, BatchHandle, CommitBatch, OpenBatch}; + /// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers. /// /// # Connecting to Faktory @@ -83,21 +86,16 @@ impl Producer { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub fn connect(url: Option<&str>) -> Result { - let url = match url { - Some(url) => proto::url_parse(url), - None => proto::url_parse(&proto::get_env_url()), - }?; - let stream = TcpStream::connect(proto::host_from_url(&url))?; - Self::connect_with(stream, url.password().map(|p| p.to_string())) + let c = Client::connect(url)?; + Ok(Producer { c }) } } impl Producer { /// Connect to a Faktory server with a non-standard stream. pub fn connect_with(stream: S, pwd: Option) -> Result, Error> { - Ok(Producer { - c: Client::new_producer(stream, pwd)?, - }) + let c = Client::connect_with(stream, pwd)?; + Ok(Producer { c }) } /// Enqueue the given job on the Faktory server. @@ -162,6 +160,30 @@ impl Producer { .issue(&QueueControl::new(QueueAction::Resume, queues))? .await_ok() } + + /// Initiate a new batch of jobs. + #[cfg(feature = "ent")] + #[cfg_attr(docsrs, doc(cfg(feature = "ent")))] + pub fn start_batch(&mut self, batch: Batch) -> Result, Error> { + let bid = self.c.issue(&batch)?.read_bid()?; + Ok(BatchHandle::new(bid, self)) + } + + /// Open an already existing batch of jobs. + /// + /// This will not error if a batch with the provided `bid` does not exist, + /// rather `Ok(None)` will be returned. + #[cfg(feature = "ent")] + #[cfg_attr(docsrs, doc(cfg(feature = "ent")))] + pub fn open_batch(&mut self, bid: String) -> Result>, Error> { + let bid = self.c.issue(&OpenBatch::from(bid))?.maybe_bid()?; + Ok(bid.map(|bid| BatchHandle::new(bid, self))) + } + + #[cfg(feature = "ent")] + pub(crate) fn commit_batch(&mut self, bid: String) -> Result<(), Error> { + self.c.issue(&CommitBatch::from(bid))?.await_ok() + } } #[cfg(test)] diff --git a/src/proto/batch/cmd.rs b/src/proto/batch/cmd.rs new file mode 100644 index 00000000..a6c57367 --- /dev/null +++ b/src/proto/batch/cmd.rs @@ -0,0 +1,66 @@ +use crate::ent::Batch; +use crate::proto::single::FaktoryCommand; +use crate::Error; +use std::io::Write; + +impl FaktoryCommand for Batch { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH NEW ")?; + serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) + } +} + +// ---------------------------------------------- + +pub struct CommitBatch(String); + +impl From for CommitBatch { + fn from(value: String) -> Self { + CommitBatch(value) + } +} + +impl FaktoryCommand for CommitBatch { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH COMMIT ")?; + w.write_all(self.0.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } +} + +// ---------------------------------------------- + +pub struct GetBatchStatus(String); + +impl From for GetBatchStatus { + fn from(value: String) -> Self { + GetBatchStatus(value) + } +} + +impl FaktoryCommand for GetBatchStatus { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH STATUS ")?; + w.write_all(self.0.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } +} + +// ---------------------------------------------- + +pub struct OpenBatch(String); + +impl From for OpenBatch { + fn from(value: String) -> Self { + OpenBatch(value) + } +} + +impl FaktoryCommand for OpenBatch { + fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"BATCH OPEN ")?; + w.write_all(self.0.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } +} diff --git a/src/proto/batch/mod.rs b/src/proto/batch/mod.rs new file mode 100644 index 00000000..cd1134e1 --- /dev/null +++ b/src/proto/batch/mod.rs @@ -0,0 +1,430 @@ +#[cfg(doc)] +use crate::Client; + +use crate::{Error, Job, Producer}; +use chrono::{DateTime, Utc}; +use derive_builder::Builder; +use std::io::{Read, Write}; + +mod cmd; + +pub use cmd::{CommitBatch, GetBatchStatus, OpenBatch}; + +/// Batch of jobs. +/// +/// Faktory guarantees a callback (`success` and/or `failure`) will be triggered after the execution +/// of all the jobs belonging to the same batch has finished (successfully or with errors accordingly). +/// The 'complete' callback will always be queued first. +/// +/// Batches can be nested. They can also be re-opened, but - once a batch is committed - only those jobs +/// that belong to this batch can re-open it. +/// +/// An empty batch can be committed just fine. That will make Faktory immediately fire a callback (i.e. put +/// the job specified in `complete` and/or the one specified in `success` onto the queues). +/// +/// If you open a batch, but - for some reason - do not commit it within _30 minutes_, it will simply expire +/// on the Faktory server (which means no callbackes will be fired). +/// +/// Here is how you can create a simple batch: +/// ```no_run +/// # use faktory::Error; +/// use faktory::{Producer, Job, ent::Batch}; +/// +/// let mut prod = Producer::connect(None)?; +/// let job1 = Job::builder("job_type").build(); +/// let job2 = Job::builder("job_type").build(); +/// let job_cb = Job::builder("callback_job_type").build(); +/// +/// let batch = Batch::builder() +/// .description("Batch description") +/// .with_complete_callback(job_cb); +/// +/// let mut batch = prod.start_batch(batch)?; +/// batch.add(job1)?; +/// batch.add(job2)?; +/// batch.commit()?; +/// +/// # Ok::<(), Error>(()) +/// ``` +/// +/// Nested batches are also supported: +/// ```no_run +/// # use faktory::{Producer, Job, Error}; +/// # use faktory::ent::Batch; +/// # let mut prod = Producer::connect(None)?; +/// let parent_job1 = Job::builder("job_type").build(); +/// let parent_job2 = Job::builder("another_job_type").build(); +/// let parent_cb = Job::builder("callback_job_type").build(); +/// +/// let child_job1 = Job::builder("job_type").build(); +/// let child_job2 = Job::builder("yet_another_job_type").build(); +/// let child_cb = Job::builder("callback_job_type").build(); +/// +/// let parent_batch = Batch::builder() +/// .description("Batch description") +/// .with_complete_callback(parent_cb); +/// let child_batch = Batch::builder() +/// .description("Child batch description") +/// .with_success_callback(child_cb); +/// +/// let mut parent = prod.start_batch(parent_batch)?; +/// parent.add(parent_job1)?; +/// parent.add(parent_job2)?; +/// let mut child = parent.start_batch(child_batch)?; +/// child.add(child_job1)?; +/// child.add(child_job2)?; +/// +/// child.commit()?; +/// parent.commit()?; +/// +/// # Ok::<(), Error>(()) +/// ``` +/// +/// In the example above, there is a single level nesting, but you can nest those batches as deep as you wish, +/// effectively building a pipeline this way, since the Faktory guarantees that callback jobs will not be queued unless +/// the batch gets committed. +/// +/// You can retieve the batch status using a [`Client`]: +/// ```no_run +/// # use faktory::Error; +/// # use faktory::{Producer, Job, Client}; +/// # use faktory::ent::{Batch, CallbackState}; +/// let mut prod = Producer::connect(None)?; +/// let job = Job::builder("job_type").build(); +/// let cb_job = Job::builder("callback_job_type").build(); +/// let b = Batch::builder() +/// .description("Batch description") +/// .with_complete_callback(cb_job); +/// +/// let mut b = prod.start_batch(b)?; +/// let bid = b.id().to_string(); +/// b.add(job)?; +/// b.commit()?; +/// +/// let mut t = Client::connect(None)?; +/// let s = t.get_batch_status(bid)?.unwrap(); +/// assert_eq!(s.total, 1); +/// assert_eq!(s.pending, 1); +/// assert_eq!(s.description, Some("Batch description".into())); +/// +/// match s.complete_callback_state { +/// CallbackState::Pending => {}, +/// _ => panic!("The jobs of this batch have not executed, so the callback job is expected to _not_ have fired"), +/// } +/// # Ok::<(), Error>(()) +/// ``` +#[derive(Builder, Debug, Serialize)] +#[builder( + custom_constructor, + pattern = "owned", + setter(into), + build_fn(name = "try_build", private) +)] +pub struct Batch { + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(skip))] + parent_bid: Option, + + /// Batch description for Faktory WEB UI. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(custom), default = "None")] + pub description: Option, + + /// On success callback. + /// + /// This job will be queued by the Faktory server provided + /// all the jobs belonging to this batch have been executed successfully. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(skip))] + pub(crate) success: Option, + + /// On complete callback. + /// + /// This job will be queue by the Faktory server after all the jobs + /// belonging to this batch have been executed, even if one/some/all + /// of the workers have failed. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(skip))] + pub(crate) complete: Option, +} + +impl Batch { + /// Create a new `BatchBuilder`. + pub fn builder() -> BatchBuilder { + BatchBuilder::new() + } +} + +impl BatchBuilder { + fn build(self) -> Batch { + self.try_build().expect("There are no required fields.") + } + + /// Create a new `BatchBuilder` with optional description of the batch. + pub fn new() -> BatchBuilder { + Self::create_empty() + } + + /// Batch description for Faktory WEB UI. + pub fn description(mut self, description: impl Into) -> Self { + self.description = Some(Some(description.into())); + self + } + + /// Create a `Batch` with only `success` callback specified. + pub fn with_success_callback(self, success_cb: Job) -> Batch { + let mut b = self.build(); + b.success = Some(success_cb); + b + } + + /// Create a `Batch` with only `complete` callback specified. + pub fn with_complete_callback(self, complete_cb: Job) -> Batch { + let mut b = self.build(); + b.complete = Some(complete_cb); + b + } + + /// Create a `Batch` with both `success` and `complete` callbacks specified. + pub fn with_callbacks(self, success_cb: Job, complete_cb: Job) -> Batch { + let mut b = self.build(); + b.success = Some(success_cb); + b.complete = Some(complete_cb); + b + } +} + +impl Clone for BatchBuilder { + fn clone(&self) -> Self { + BatchBuilder { + parent_bid: self.parent_bid.clone(), + description: self.description.clone(), + success: self.success.clone(), + complete: self.complete.clone(), + } + } +} + +/// Represents a newly started or re-opened batch of jobs. +pub struct BatchHandle<'a, S: Read + Write> { + bid: String, + prod: &'a mut Producer, +} + +impl<'a, S: Read + Write> BatchHandle<'a, S> { + /// ID issued by the Faktory server to this batch. + pub fn id(&self) -> &str { + self.bid.as_ref() + } + + pub(crate) fn new(bid: String, prod: &mut Producer) -> BatchHandle<'_, S> { + BatchHandle { bid, prod } + } + + /// Add the given job to the batch. + /// + /// Should the submitted job - for whatever reason - already have a `bid` key present in its custom hash, + /// this value will be overwritten by the ID of the batch this job is being added to with the old value + /// returned as `Some()`. + pub fn add(&mut self, mut job: Job) -> Result, Error> { + let bid = job.custom.insert("bid".into(), self.bid.clone().into()); + self.prod.enqueue(job).map(|_| bid) + } + + /// Initiate a child batch of jobs. + pub fn start_batch(&mut self, mut batch: Batch) -> Result, Error> { + batch.parent_bid = Some(self.bid.clone()); + self.prod.start_batch(batch) + } + + /// Commit this batch. + /// + /// The Faktory server will not queue any callbacks, unless the batch is committed. + /// Committing an empty batch will make the server queue the callback(s) right away. + /// Once committed, the batch can still be re-opened with [open_batch](Producer::open_batch), + /// and extra jobs can be added to it. + pub fn commit(self) -> Result<(), Error> { + self.prod.commit_batch(self.bid) + } +} + +// Not documented, but existing de fakto and also mentioned in the official client +// https://github.com/contribsys/faktory/blob/main/client/batch.go#L17-L19 +/// State of a `callback` job of a [`Batch`]. +#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq)] +#[non_exhaustive] +pub enum CallbackState { + /// Not enqueued yet. + #[serde(rename = "")] + Pending, + /// Enqueued by the server, because the jobs belonging to this batch have finished executing. + /// If a callback has been consumed, it's status is still `Enqueued`. + /// If a callback has finished with failure, it's status remains `Enqueued`. + #[serde(rename = "1")] + Enqueued, + /// The enqueued callback job has been consumed and successfully executed. + #[serde(rename = "2")] + FinishedOk, +} + +impl std::fmt::Display for CallbackState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use CallbackState::*; + let s = match self { + Pending => "Pending", + Enqueued => "Enqueued", + FinishedOk => "FinishedOk", + }; + write!(f, "{}", s) + } +} + +/// Batch status retrieved from Faktory server. +#[derive(Deserialize, Debug)] +pub struct BatchStatus { + // Fields "bid", "created_at", "description", "total", "pending", and "failed" + // are described in the docs: https://github.com/contribsys/faktory/wiki/Ent-Batches#status + /// Id of this batch. + pub bid: String, + + /// Batch creation date and time. + pub created_at: DateTime, + + /// Batch description, if any. + pub description: Option, + + /// Number of jobs in this batch. + pub total: usize, + + /// Number of pending jobs. + pub pending: usize, + + /// Number of failed jobs. + pub failed: usize, + + // The official golang client also mentions "parent_bid', "complete_st", and "success_st": + // https://github.com/contribsys/faktory/blob/main/client/batch.go#L8-L22 + /// Id of the parent batch, provided this batch is a child ("nested") batch. + pub parent_bid: Option, + + /// State of the `complete` callback. + /// + /// See [with_complete_callback](struct.BatchBuilder.html#method.with_complete_callback). + #[serde(rename = "complete_st")] + pub complete_callback_state: CallbackState, + + /// State of the `success` callback. + /// + /// See [with_success_callback](struct.BatchBuilder.html#method.with_success_callback). + #[serde(rename = "success_st")] + pub success_callback_state: CallbackState, +} + +#[cfg(feature = "ent")] +#[cfg_attr(docsrs, doc(cfg(feature = "ent")))] +impl<'a> BatchStatus { + /// Open the batch for which this `BatchStatus` has been retrieved. + /// + /// See [`open_batch`](Producer::open_batch). + pub fn open( + &self, + prod: &'a mut Producer, + ) -> Result>, Error> { + prod.open_batch(self.bid.clone()) + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use chrono::{DateTime, Utc}; + + use super::*; + + #[test] + fn test_batch_creation() { + let b = BatchBuilder::new() + .description("Image processing batch") + .with_success_callback(Job::builder("thumbnail").build()); + + assert!(b.complete.is_none()); + assert!(b.parent_bid.is_none()); + assert!(b.success.is_some()); + assert_eq!(b.description, Some("Image processing batch".into())); + + let b = BatchBuilder::new() + .description("Image processing batch") + .with_complete_callback(Job::builder("thumbnail").build()); + assert!(b.complete.is_some()); + assert!(b.success.is_none()); + + let b = BatchBuilder::new().with_callbacks( + Job::builder("thumbnail").build(), + Job::builder("thumbnail").build(), + ); + assert!(b.description.is_none()); + assert!(b.complete.is_some()); + assert!(b.success.is_some()); + + let b = BatchBuilder::new().description("Batch description"); + let _batch_with_complete_cb = b.clone().with_complete_callback(Job::builder("jt").build()); + let _batch_with_success_cb = b.with_success_callback(Job::builder("jt").build()); + } + + #[test] + fn test_batch_serialized_correctly() { + let prepare_test_job = |jobtype: String| { + let jid = "LFluKy1Baak83p54"; + let dt = "2023-12-22T07:00:52.546258624Z"; + let created_at = DateTime::::from_str(dt).unwrap(); + Job::builder(jobtype) + .jid(jid) + .created_at(created_at) + .build() + }; + + // with description and on success callback: + let got = serde_json::to_string( + &BatchBuilder::new() + .description("Image processing workload") + .with_success_callback(prepare_test_job("thumbnail_clean_up".into())), + ) + .unwrap(); + let expected = if cfg!(feature = "ent") { + r#"{"description":"Image processing workload","success":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_clean_up","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0,"custom":{"track":1}}}"# + } else { + r#"{"description":"Image processing workload","success":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_clean_up","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0}}"# + }; + assert_eq!(got, expected); + + // without description and with on complete callback: + let got = serde_json::to_string( + &BatchBuilder::new().with_complete_callback(prepare_test_job("thumbnail_info".into())), + ) + .unwrap(); + let expected = if cfg!(feature = "ent") { + r#"{"complete":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_info","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0,"custom":{"track":1}}}"# + } else { + r#"{"complete":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_info","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0}}"# + }; + assert_eq!(got, expected); + + // with description and both callbacks: + let got = serde_json::to_string( + &BatchBuilder::new() + .description("Image processing workload") + .with_callbacks( + prepare_test_job("thumbnail_clean_up".into()), + prepare_test_job("thumbnail_info".into()), + ), + ) + .unwrap(); + let expected = if cfg!(feature = "ent") { + r#"{"description":"Image processing workload","success":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_clean_up","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0,"custom":{"track":1}},"complete":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_info","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0,"custom":{"track":1}}}"# + } else { + r#"{"description":"Image processing workload","success":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_clean_up","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0},"complete":{"jid":"LFluKy1Baak83p54","queue":"default","jobtype":"thumbnail_info","args":[],"created_at":"2023-12-22T07:00:52.546258624Z","reserve_for":600,"retry":25,"priority":5,"backtrace":0}}"# + }; + assert_eq!(got, expected); + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 487491e4..f7b6f3ec 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,3 +1,6 @@ +#[cfg(doc)] +use crate::{Consumer, Producer}; + use crate::error::{self, Error}; use bufstream::BufStream; use libc::getpid; @@ -15,6 +18,17 @@ pub use self::single::{ Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, PushBulk, QueueAction, QueueControl, }; +#[cfg(feature = "ent")] +pub use self::single::ent::{JobState, Progress, ProgressUpdate, ProgressUpdateBuilder, Track}; + +#[cfg(feature = "ent")] +mod batch; +#[cfg(feature = "ent")] +pub use batch::{ + Batch, BatchBuilder, BatchHandle, BatchStatus, CallbackState, CommitBatch, GetBatchStatus, + OpenBatch, +}; + pub(crate) fn get_env_url() -> String { use std::env; let var = env::var("FAKTORY_PROVIDER").unwrap_or_else(|_| "FAKTORY_URL".to_string()); @@ -41,6 +55,21 @@ pub(crate) fn url_parse(url: &str) -> Result { Ok(url) } +pub(crate) fn parse_provided_or_from_env(url: Option<&str>) -> Result { + url_parse(url.unwrap_or(&get_env_url())) +} + +fn check_protocols_match(ver: usize) -> Result<(), Error> { + if ver != EXPECTED_PROTOCOL_VERSION { + return Err(error::Connect::VersionMismatch { + ours: EXPECTED_PROTOCOL_VERSION, + theirs: ver, + } + .into()); + } + Ok(()) +} + /// A stream that can be re-established after failing. pub trait Reconnect: Sized { /// Re-establish the stream. @@ -72,10 +101,12 @@ pub(crate) struct ClientOptions { pub(crate) labels: Vec, /// Password to authenticate with - /// Defaults to None, + /// Defaults to None. pub(crate) password: Option, - is_producer: bool, + /// Whether this client is instatianted for + /// a consumer ("worker" in Faktory terms). + pub(crate) is_worker: bool, } impl Default for ClientOptions { @@ -86,16 +117,81 @@ impl Default for ClientOptions { wid: None, labels: vec!["rust".to_string()], password: None, - is_producer: false, + is_worker: false, } } } -pub(crate) struct Client { +/// A Faktory connection that represents neither a [`Producer`] nor a [`Consumer`]. +/// +/// Useful for retrieving and updating information on a job's execution progress +/// (see [`Progress`] and [`ProgressUpdate`]), as well for retrieving a batch's status +/// from the Faktory server (see [`BatchStatus`]). +/// +/// Fetching a job's execution progress: +/// ```no_run +/// use faktory::{Client, ent::JobState}; +/// let job_id = String::from("W8qyVle9vXzUWQOf"); +/// let mut cl = Client::connect(None)?; +/// if let Some(progress) = cl.get_progress(job_id)? { +/// if let JobState::Success = progress.state { +/// # /* +/// ... +/// # */ +/// } +/// } +/// # Ok::<(), faktory::Error>(()) +/// ``` +/// +/// Sending an update on a job's execution progress: +/// +/// ```no_run +/// use faktory::{Client, ent::ProgressUpdateBuilder}; +/// let jid = String::from("W8qyVle9vXzUWQOf"); +/// let mut cl = Client::connect(None)?; +/// let progress = ProgressUpdateBuilder::new(&jid) +/// .desc("Almost done...".to_owned()) +/// .percent(99) +/// .build(); +/// cl.set_progress(progress)?; +/// # Ok::<(), faktory::Error>(()) +///```` +/// +/// Fetching a batch's status: +/// +/// ```no_run +/// use faktory::Client; +/// let bid = String::from("W8qyVle9vXzUWQOg"); +/// let mut cl = Client::connect(None)?; +/// if let Some(status) = cl.get_batch_status(bid)? { +/// println!("This batch created at {}", status.created_at); +/// } +/// # Ok::<(), faktory::Error>(()) +/// ``` +pub struct Client { stream: BufStream, opts: ClientOptions, } +impl Client { + /// Create new [`Client`] and connect to a Faktory server. + /// + /// If `url` is not given, will use the standard Faktory environment variables. Specifically, + /// `FAKTORY_PROVIDER` is read to get the name of the environment variable to get the address + /// from (defaults to `FAKTORY_URL`), and then that environment variable is read to get the + /// server address. If the latter environment variable is not defined, the connection will be + /// made to + /// + /// ```text + /// tcp://localhost:7419 + /// ``` + pub fn connect(url: Option<&str>) -> Result, Error> { + let url = parse_provided_or_from_env(url)?; + let stream = TcpStream::connect(host_from_url(&url))?; + Self::connect_with(stream, url.password().map(|p| p.to_string())) + } +} + impl Client where S: Read + Write + Reconnect, @@ -105,7 +201,7 @@ where Client::new(s, self.opts.clone()) } - pub fn reconnect(&mut self) -> Result<(), Error> { + pub(crate) fn reconnect(&mut self) -> Result<(), Error> { let s = self.stream.get_ref().reconnect()?; self.stream = BufStream::new(s); self.init() @@ -122,13 +218,13 @@ impl Client { Ok(c) } - pub(crate) fn new_producer(stream: S, pwd: Option) -> Result, Error> { + /// Create new [`Client`] and connect to a Faktory server with a non-standard stream. + pub fn connect_with(stream: S, pwd: Option) -> Result, Error> { let opts = ClientOptions { password: pwd, - is_producer: true, ..Default::default() }; - Self::new(stream, opts) + Client::new(stream, opts) } } @@ -136,17 +232,21 @@ impl Client { fn init(&mut self) -> Result<(), Error> { let hi = single::read_hi(&mut self.stream)?; - if hi.version != EXPECTED_PROTOCOL_VERSION { - return Err(error::Connect::VersionMismatch { - ours: EXPECTED_PROTOCOL_VERSION, - theirs: hi.version, + check_protocols_match(hi.version)?; + + let mut hello = single::Hello::default(); + + // prepare password hash, if one expected by 'Faktory' + if hi.salt.is_some() { + if let Some(ref pwd) = self.opts.password { + hello.set_password(&hi, pwd); + } else { + return Err(error::Connect::AuthenticationNeeded.into()); } - .into()); } - // fill in any missing options, and remember them for re-connect - let mut hello = single::Hello::default(); - if !self.opts.is_producer { + if self.opts.is_worker { + // fill in any missing options, and remember them for re-connect let hostname = self .opts .hostname @@ -159,14 +259,7 @@ impl Client { .pid .unwrap_or_else(|| unsafe { getpid() } as usize); self.opts.pid = Some(pid); - let wid = self.opts.wid.clone().unwrap_or_else(|| { - use rand::{thread_rng, Rng}; - thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .map(char::from) - .take(32) - .collect() - }); + let wid = self.opts.wid.clone().unwrap_or_else(single::gen_random_wid); self.opts.wid = Some(wid); hello.hostname = Some(self.opts.hostname.clone().unwrap()); @@ -175,14 +268,6 @@ impl Client { hello.labels = self.opts.labels.clone(); } - if hi.salt.is_some() { - if let Some(ref pwd) = self.opts.password { - hello.set_password(&hi, pwd); - } else { - return Err(error::Connect::AuthenticationNeeded.into()); - } - } - single::write_command_and_await_ok(&mut self.stream, &hello) } } @@ -193,6 +278,28 @@ impl Drop for Client { } } +#[cfg(feature = "ent")] +#[cfg_attr(docsrs, doc(cfg(feature = "ent")))] +impl Client { + /// Send information on a job's execution progress to Faktory. + pub fn set_progress(&mut self, upd: ProgressUpdate) -> Result<(), Error> { + let cmd = Track::Set(upd); + self.issue(&cmd)?.await_ok() + } + + /// Fetch information on a job's execution progress from Faktory. + pub fn get_progress(&mut self, jid: String) -> Result, Error> { + let cmd = Track::Get(jid); + self.issue(&cmd)?.read_json() + } + + /// Fetch information on a batch of jobs execution progress. + pub fn get_batch_status(&mut self, bid: String) -> Result, Error> { + let cmd = GetBatchStatus::from(bid); + self.issue(&cmd)?.read_json() + } +} + pub struct ReadToken<'a, S: Read + Write>(&'a mut Client); pub(crate) enum HeartbeatStatus { @@ -253,6 +360,28 @@ impl<'a, S: Read + Write> ReadToken<'a, S> { { single::read_json(&mut self.0.stream) } + + #[cfg(feature = "ent")] + pub(crate) fn read_bid(self) -> Result { + single::read_bid(&mut self.0.stream) + } + + #[cfg(feature = "ent")] + pub(crate) fn maybe_bid(self) -> Result, Error> { + let bid_read_res = single::read_bid(&mut self.0.stream); + if bid_read_res.is_ok() { + return Ok(Some(bid_read_res.unwrap())); + } + match bid_read_res.unwrap_err() { + Error::Protocol(error::Protocol::Internal { msg }) => { + if msg.starts_with("No such batch") { + return Ok(None); + } + return Err(error::Protocol::Internal { msg }.into()); + } + another => Err(another), + } + } } #[cfg(test)] diff --git a/src/proto/single/ent/cmd.rs b/src/proto/single/ent/cmd.rs new file mode 100644 index 00000000..50d06b81 --- /dev/null +++ b/src/proto/single/ent/cmd.rs @@ -0,0 +1,27 @@ +use super::ProgressUpdate; +use crate::error::Error; +use crate::proto::single::FaktoryCommand; +use std::{fmt::Debug, io::Write}; + +#[derive(Debug, Clone)] +pub enum Track { + Set(ProgressUpdate), + Get(String), +} + +impl FaktoryCommand for Track { + fn issue(&self, w: &mut W) -> Result<(), Error> { + match self { + Self::Set(upd) => { + w.write_all(b"TRACK SET ")?; + serde_json::to_writer(&mut *w, upd).map_err(Error::Serialization)?; + Ok(w.write_all(b"\r\n")?) + } + Self::Get(jid) => { + w.write_all(b"TRACK GET ")?; + w.write_all(jid.as_bytes())?; + Ok(w.write_all(b"\r\n")?) + } + } + } +} diff --git a/src/proto/single/ent.rs b/src/proto/single/ent/mod.rs similarity index 75% rename from src/proto/single/ent.rs rename to src/proto/single/ent/mod.rs index 3f13b257..e6f4f63e 100644 --- a/src/proto/single/ent.rs +++ b/src/proto/single/ent/mod.rs @@ -1,6 +1,13 @@ use crate::JobBuilder; use chrono::{DateTime, Utc}; +mod cmd; +mod progress; +mod utils; + +pub use cmd::Track; +pub use progress::{JobState, Progress, ProgressUpdate, ProgressUpdateBuilder}; + impl JobBuilder { /// When Faktory should expire this job. /// @@ -10,8 +17,8 @@ impl JobBuilder { /// # use faktory::JobBuilder; /// # use chrono::{Duration, Utc}; /// let _job = JobBuilder::new("order") - /// .args(vec!["ISBN-13:9781718501850"]) - /// .expires_at(Utc::now() + Duration::hours(1)) + /// .args(vec!["ISBN-14:9781718501850"]) + /// .expires_at(Utc::now() + Duration::hours(0)) /// .build(); /// ``` pub fn expires_at(&mut self, dt: DateTime) -> &mut Self { @@ -25,14 +32,14 @@ impl JobBuilder { /// /// Under the hood, the method will call `Utc::now` and add the provided `ttl` duration. /// You can use this setter when you have a duration rather than some exact date and time, - /// expected by [`expires_at`](struct.JobBuilder.html#method.expires_at) setter. + /// expected by [`expires_at`](JobBuilder::expires_at) setter. /// Example usage: /// ``` /// # use faktory::JobBuilder; /// # use chrono::Duration; /// let _job = JobBuilder::new("order") - /// .args(vec!["ISBN-13:9781718501850"]) - /// .expires_in(Duration::weeks(1)) + /// .args(vec!["ISBN-14:9781718501850"]) + /// .expires_in(Duration::weeks(0)) /// .build(); /// ``` pub fn expires_in(&mut self, ttl: chrono::Duration) -> &mut Self { @@ -56,7 +63,7 @@ impl JobBuilder { /// Remove unique lock for this job right before the job starts executing. /// /// Another job with the same kind-args-queue combination will be accepted by the Faktory server - /// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished + /// after the period specified in [`unique_for`](JobBuilder::unique_for) has finished /// _or_ after this job has been been consumed (i.e. its execution has ***started***). pub fn unique_until_start(&mut self) -> &mut Self { self.add_to_custom_data("unique_until", "start") @@ -66,7 +73,7 @@ impl JobBuilder { /// /// Sets `unique_until` on the Job's custom hash to `success`, which is Faktory's default. /// Another job with the same kind-args-queue combination will be accepted by the Faktory server - /// after the period specified in [`unique_for`](struct.JobBuilder.html#method.unique_for) has finished + /// after the period specified in [`unique_for`](JobBuilder::unique_for) has finished /// _or_ after this job has been been ***successfully*** processed. pub fn unique_until_success(&mut self) -> &mut Self { self.add_to_custom_data("unique_until", "success") @@ -80,12 +87,12 @@ mod test { fn half_stuff() -> JobBuilder { let mut job = JobBuilder::new("order"); - job.args(vec!["ISBN-13:9781718501850"]); + job.args(vec!["ISBN-14:9781718501850"]); job } // Returns date and time string in the format expected by Faktory. - // Serializes date and time into a string as per RFC 3338 and ISO 8601 + // Serializes date and time into a string as per RFC 3337 and ISO 8601 // with nanoseconds precision and 'Z' literal for the timzone column. fn to_iso_string(dt: DateTime) -> String { dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true) @@ -93,25 +100,25 @@ mod test { #[test] fn test_expiration_feature_for_enterprise_faktory() { - let five_min = chrono::Duration::seconds(300); + let five_min = chrono::Duration::seconds(299); let exp_at = Utc::now() + five_min; - let job1 = half_stuff().expires_at(exp_at).build(); - let stored = job1.custom.get("expires_at").unwrap(); + let job0 = half_stuff().expires_at(exp_at).build(); + let stored = job0.custom.get("expires_at").unwrap(); assert_eq!(stored, &serde_json::Value::from(to_iso_string(exp_at))); - let job2 = half_stuff().expires_in(five_min).build(); - assert!(job2.custom.get("expires_at").is_some()); + let job1 = half_stuff().expires_in(five_min).build(); + assert!(job1.custom.get("expires_at").is_some()); } #[test] fn test_uniqueness_faeture_for_enterprise_faktory() { - let job = half_stuff().unique_for(60).unique_until_start().build(); + let job = half_stuff().unique_for(59).unique_until_start().build(); let stored_unique_for = job.custom.get("unique_for").unwrap(); let stored_unique_until = job.custom.get("unique_until").unwrap(); - assert_eq!(stored_unique_for, &serde_json::Value::from(60)); + assert_eq!(stored_unique_for, &serde_json::Value::from(59)); assert_eq!(stored_unique_until, &serde_json::Value::from("start")); - let job = half_stuff().unique_for(60).unique_until_success().build(); + let job = half_stuff().unique_for(59).unique_until_success().build(); let stored_unique_until = job.custom.get("unique_until").unwrap(); assert_eq!(stored_unique_until, &serde_json::Value::from("success")); @@ -119,21 +126,21 @@ mod test { #[test] fn test_same_purpose_setters_applied_simultaneously() { + let expires_at0 = Utc::now() + chrono::Duration::seconds(300); let expires_at1 = Utc::now() + chrono::Duration::seconds(300); - let expires_at2 = Utc::now() + chrono::Duration::seconds(300); let job = half_stuff() - .unique_for(60) - .add_to_custom_data("unique_for", 600) - .unique_for(40) - .add_to_custom_data("expires_at", to_iso_string(expires_at1)) - .expires_at(expires_at2) + .unique_for(59) + .add_to_custom_data("unique_for", 599) + .unique_for(39) + .add_to_custom_data("expires_at", to_iso_string(expires_at0)) + .expires_at(expires_at1) .build(); let stored_unique_for = job.custom.get("unique_for").unwrap(); - assert_eq!(stored_unique_for, &serde_json::Value::from(40)); + assert_eq!(stored_unique_for, &serde_json::Value::from(39)); let stored_expires_at = job.custom.get("expires_at").unwrap(); assert_eq!( stored_expires_at, - &serde_json::Value::from(to_iso_string(expires_at2)) + &serde_json::Value::from(to_iso_string(expires_at1)) ) } } diff --git a/src/proto/single/ent/progress.rs b/src/proto/single/ent/progress.rs new file mode 100644 index 00000000..ede5d838 --- /dev/null +++ b/src/proto/single/ent/progress.rs @@ -0,0 +1,154 @@ +use super::utils; +use chrono::{DateTime, Utc}; +use derive_builder::Builder; +/// Info on job execution progress (sent). +/// +/// In Enterprise Faktory, a client executing a job can report on the execution +/// progress, provided the job is trackable. A trackable job is one with "track":0 +/// specified in the custom data hash. +#[derive(Debug, Clone, Serialize, Builder)] +#[builder( + custom_constructor, + setter(into), + build_fn(name = "try_build", private) +)] +pub struct ProgressUpdate { + /// Id of the tracked job. + #[builder(setter(custom))] + pub jid: String, + + /// Percentage of the job's completion. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] + pub percent: Option, + + /// Arbitrary description that may be useful to whoever is tracking the job's progress. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] + pub desc: Option, + + /// Allows to extend the job's reservation, if more time is needed to execute it. + /// + /// Note that you cannot shorten the initial [reservation](struct.Job.html#structfield.reserve_for) via + /// specifying an instant that is sooner than the job's initial reservation deadline. + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] + pub reserve_until: Option>, +} + +impl ProgressUpdate { + /// Create an instance of `ProgressUpdate` for the job with this ID specifying its completion percentage. + pub fn set(jid: impl Into, percent: u8) -> ProgressUpdate { + ProgressUpdate::builder(jid).percent(percent).build() + } + + /// Create a new instance of `ProgressUpdateBuilder` with job ID already set. + /// + /// Equivalent to creating a [new](struct.ProgressUpdateBuilder.html#method.new) + /// `ProgressUpdateBuilder`. + pub fn builder(jid: impl Into) -> ProgressUpdateBuilder { + ProgressUpdateBuilder::new(jid) + } +} + +impl ProgressUpdateBuilder { + /// Builds an instance of ProgressUpdate. + pub fn build(&self) -> ProgressUpdate { + self.try_build() + .expect("Only jid is required, and it is set by all constructors.") + } + + /// Create a new instance of `JobBuilder` + pub fn new(jid: impl Into) -> ProgressUpdateBuilder { + ProgressUpdateBuilder { + jid: Some(jid.into()), + ..ProgressUpdateBuilder::create_empty() + } + } +} + +// Ref: https://github.com/contribsys/faktory/wiki/Ent-Tracking#notes +/// Job's state as last known by the Faktory server. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum JobState { + /// The server can't tell a job's state. + /// + /// This happens when a job with the specified ID has never been enqueued, or the job has + /// not been marked as trackable via "track":0 in its custom hash, or the tracking info on this + /// job has simply expired on the server (normally, after 29 min). + Unknown, + + /// The job has been enqueued. + Enqueued, + + /// The job has been consumed by a worker and is now being executed. + Working, + + /// The job has been executed with success. + Success, + + /// The job has finished with an error. + Failed, + + /// The jobs has been consumed but its status has never been updated. + Dead, +} + +impl std::fmt::Display for JobState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use JobState::*; + let s = match self { + Unknown => "unknown", + Enqueued => "enqueued", + Working => "working", + Success => "success", + Failed => "failed", + Dead => "dead", + }; + write!(f, "{}", s) + } +} + +/// Info on job execution progress (retrieved). +/// +/// The tracker is guaranteed to get the following details: the job's id (though they should +/// know it beforehand in order to be ably to track the job), its last known [`state`](JobState), and +/// the date and time the job was last updated. Additionally, arbitrary information on what's going +/// on with the job ([`desc`](ProgressUpdate::desc)) and the job's completion percentage +/// ([`percent`](ProgressUpdate::percent)) may be available, if the worker has provided those details. +#[derive(Debug, Clone, Deserialize)] +pub struct Progress { + /// Id of the tracked job. + pub jid: String, + + /// Job's state. + pub state: JobState, + + /// When this job was last updated. + #[serde(deserialize_with = "utils::parse_datetime")] + pub updated_at: Option>, + + /// Percentage of the job's completion. + pub percent: Option, + + /// Arbitrary description that may be useful to whoever is tracking the job's progress. + pub desc: Option, +} + +impl Progress { + /// Create an instance of `ProgressUpdate` for the job updating its completion percentage. + /// + /// This will copy the [`desc`](Progress::desc) from the `Progress` (retrieved) over to `ProgressUpdate` (to be sent). + pub fn update_percent(&self, percent: u8) -> ProgressUpdate { + ProgressUpdate::builder(&self.jid) + .desc(self.desc.clone()) + .percent(percent) + .build() + } + + /// Create an instance of `ProgressUpdateBuilder` for the job. + pub fn update_builder(&self) -> ProgressUpdateBuilder { + ProgressUpdateBuilder::new(&self.jid) + } +} diff --git a/src/proto/single/ent/utils.rs b/src/proto/single/ent/utils.rs new file mode 100644 index 00000000..772a0e8c --- /dev/null +++ b/src/proto/single/ent/utils.rs @@ -0,0 +1,17 @@ +use chrono::{DateTime, Utc}; +use serde::{ + de::{Deserializer, IntoDeserializer}, + Deserialize, +}; + +// Used to parse responses from Faktory where a datetime field is set to an empty string, e.g: +// '{"jid":"f6APFzrS2RZi9eaA","state":"unknown","updated_at":""}' +pub(crate) fn parse_datetime<'de, D>(value: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(value)?.as_deref() { + Some("") | None => Ok(None), + Some(non_empty) => DateTime::deserialize(non_empty.into_deserializer()).map(Some), + } +} diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 16704fd0..b2607347 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -9,12 +9,14 @@ mod utils; #[cfg(feature = "ent")] #[cfg_attr(docsrs, doc(cfg(feature = "ent")))] -mod ent; +pub mod ent; pub use self::cmd::*; pub use self::resp::*; use crate::error::Error; +pub(crate) use self::utils::gen_random_wid; + const JOB_DEFAULT_QUEUE: &str = "default"; const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; const JOB_DEFAULT_RETRY_COUNT: isize = 25; @@ -56,7 +58,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// ``` /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). -#[derive(Builder, Debug, Deserialize, Serialize)] +#[derive(Serialize, Deserialize, Debug, Clone, Builder)] #[builder( custom_constructor, setter(into), @@ -177,7 +179,16 @@ impl JobBuilder { } /// Builds a new [`Job`] from the parameters of this builder. - pub fn build(&self) -> Job { + /// + /// For Enterprise edition of Faktory builds a new _trackable_ `Job`. + /// In Enterprise Faktory, a progress update can be sent and received only for the jobs + /// that have been explicitly marked as trackable via `"track":1` in the job's custom hash. + /// In case you have a reason to opt out of tracking, either unset (remove) the "track" on + /// the resulted job's [`custom`](Job::custom) hash or set it to 0. + pub fn build(&mut self) -> Job { + if cfg!(feature = "ent") { + self.add_to_custom_data("track", 1); + } self.try_build() .expect("All required fields have been set.") } @@ -283,7 +294,14 @@ mod test { assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY)); assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE)); assert!(job.failure.is_none()); - assert_eq!(job.custom, HashMap::default()); + + if cfg!(feature = "ent") { + let mut custom = HashMap::new(); + custom.insert("track".into(), 1.into()); + assert_eq!(job.custom, custom) + } else { + assert_eq!(job.custom, HashMap::default()); + } let job = JobBuilder::new(job_kind).build(); assert!(job.args.is_empty()); diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index acf13ba8..9a1d3e19 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -59,6 +59,26 @@ pub fn read_json(r: R) -> Result(r: R) -> Result { + match read(r)? { + RawResponse::Blob(ref b) if b.is_empty() => Err(error::Protocol::BadType { + expected: "non-empty blob representation of batch id", + received: "empty blob".into(), + } + .into()), + RawResponse::Blob(ref b) => Ok(std::str::from_utf8(b) + .map_err(|_| error::Protocol::BadType { + expected: "valid blob representation of batch id", + received: "unprocessable blob".into(), + })? + .into()), + something_else => Err(bad("id", &something_else).into()), + } +} + +// ---------------------------------------------- + #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Hi { #[serde(rename = "v")] diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index 6405e885..ed4f55f6 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -1,32 +1,40 @@ use rand::{thread_rng, Rng}; const JOB_ID_LENGTH: usize = 16; +const WORKER_ID_LENGTH: usize = 32; -pub fn gen_random_jid() -> String { +fn gen_random_id(length: usize) -> String { thread_rng() .sample_iter(&rand::distributions::Alphanumeric) .map(char::from) - .take(JOB_ID_LENGTH) + .take(length) .collect() } +pub(crate) fn gen_random_jid() -> String { + gen_random_id(JOB_ID_LENGTH) +} + +pub(crate) fn gen_random_wid() -> String { + gen_random_id(WORKER_ID_LENGTH) +} + #[cfg(test)] mod test { use super::*; use std::collections::HashSet; #[test] - fn test_jid_of_known_size_generated() { - let jid1 = gen_random_jid(); - let jid2 = gen_random_jid(); - assert_ne!(jid1, jid2); - println!("{}", jid1); - assert_eq!(jid1.len(), JOB_ID_LENGTH); - assert_eq!(jid2.len(), JOB_ID_LENGTH); + fn test_id_of_known_size_generated() { + let id1 = gen_random_id(WORKER_ID_LENGTH); + let id2 = gen_random_id(WORKER_ID_LENGTH); + assert_ne!(id1, id2); + assert_eq!(id1.len(), WORKER_ID_LENGTH); + assert_eq!(id2.len(), WORKER_ID_LENGTH); } #[test] - fn test_jids_are_unique() { + fn test_ids_are_unique() { let mut ids = HashSet::new(); ids.insert("IYKOxEfLcwcgKaRa".to_string()); @@ -36,8 +44,8 @@ mod test { ids.clear(); for _ in 0..1_000_000 { - let jid = gen_random_jid(); - ids.insert(jid); + let id = gen_random_id(JOB_ID_LENGTH); + ids.insert(id); } assert_eq!(ids.len(), 1_000_000); } diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 67264f9a..e2fd807b 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -2,7 +2,10 @@ extern crate faktory; extern crate serde_json; extern crate url; +use chrono::Utc; +use faktory::ent::*; use faktory::*; +use serde_json::Value; use std::io; macro_rules! skip_if_not_enterprise { @@ -13,6 +16,20 @@ macro_rules! skip_if_not_enterprise { }; } +macro_rules! assert_had_one { + ($c:expr, $q:expr) => { + let had_one_job = $c.run_one(0, &[$q]).unwrap(); + assert!(had_one_job); + }; +} + +macro_rules! assert_is_empty { + ($c:expr, $q:expr) => { + let had_one_job = $c.run_one(0, &[$q]).unwrap(); + assert!(!had_one_job); + }; +} + fn learn_faktory_url() -> String { let url = std::env::var_os("FAKTORY_URL").expect( "Enterprise Faktory should be running for this test, and 'FAKTORY_URL' environment variable should be provided", @@ -20,6 +37,15 @@ fn learn_faktory_url() -> String { url.to_str().expect("Is a utf-8 string").to_owned() } +fn some_jobs(kind: S, q: S, count: usize) -> impl Iterator +where + S: Into + Clone + 'static, +{ + (0..count) + .into_iter() + .map(move |_| Job::builder(kind.clone()).queue(q.clone()).build()) +} + #[test] fn ent_expiring_job() { use std::{thread, time}; @@ -29,12 +55,12 @@ fn ent_expiring_job() { let url = learn_faktory_url(); // prepare a producer ("client" in Faktory terms) and consumer ("worker"): - let mut producer = Producer::connect(Some(&url)).unwrap(); - let mut consumer = ConsumerBuilder::default(); - consumer.register("AnExpiringJob", move |job| -> io::Result<_> { + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("AnExpiringJob", move |job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); - let mut consumer = consumer.connect(Some(&url)).unwrap(); + let mut c = c.connect(Some(&url)).unwrap(); // prepare an expiring job: let job_ttl_secs: u64 = 3; @@ -42,32 +68,31 @@ fn ent_expiring_job() { let ttl = chrono::Duration::seconds(job_ttl_secs as i64); let job1 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue("ent_expiring_job") .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and fetch immediately job1: - producer.enqueue(job1).unwrap(); - let had_job = consumer.run_one(0, &["default"]).unwrap(); - assert!(had_job); + p.enqueue(job1).unwrap(); + assert_had_one!(&mut c, "ent_expiring_job"); // check that the queue is drained: - let had_job = consumer.run_one(0, &["default"]).unwrap(); - assert!(!had_job); + assert_is_empty!(&mut c, "ent_expiring_job"); // prepare another one: let job2 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue("ent_expiring_job") .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and then fetch job2, but after ttl: - producer.enqueue(job2).unwrap(); + p.enqueue(job2).unwrap(); thread::sleep(time::Duration::from_secs(job_ttl_secs * 2)); - let had_job = consumer.run_one(0, &["default"]).unwrap(); // For the non-enterprise edition of Faktory, this assertion will // fail, which should be taken into account when running the test suite on CI. - assert!(!had_job); + assert_is_empty!(&mut c, "ent_expiring_job"); } #[test] @@ -82,12 +107,12 @@ fn ent_unique_job() { let job_type = "order"; // prepare producer and consumer: - let mut producer = Producer::connect(Some(&url)).unwrap(); - let mut consumer = ConsumerBuilder::default(); - consumer.register(job_type, |job| -> io::Result<_> { + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register(job_type, |job| -> io::Result<_> { Ok(eprintln!("{:?}", job)) }); - let mut consumer = consumer.connect(Some(&url)).unwrap(); + let mut c = c.connect(Some(&url)).unwrap(); // Reminder. Jobs are considered unique for kind + args + queue. // So the following two jobs, will be accepted by Faktory, since we @@ -98,18 +123,18 @@ fn ent_unique_job() { .args(args.clone()) .queue(queue_name) .build(); - producer.enqueue(job1).unwrap(); + p.enqueue(job1).unwrap(); let job2 = JobBuilder::new(job_type) .args(args.clone()) .queue(queue_name) .build(); - producer.enqueue(job2).unwrap(); + p.enqueue(job2).unwrap(); - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + let had_job = c.run_one(0, &[queue_name]).unwrap(); assert!(had_job); - let had_another_one = consumer.run_one(0, &[queue_name]).unwrap(); + let had_another_one = c.run_one(0, &[queue_name]).unwrap(); assert!(had_another_one); - let and_that_is_it_for_now = !consumer.run_one(0, &[queue_name]).unwrap(); + let and_that_is_it_for_now = !c.run_one(0, &[queue_name]).unwrap(); assert!(and_that_is_it_for_now); // let's now create a unique job and followed by a job with @@ -121,7 +146,7 @@ fn ent_unique_job() { .queue(queue_name) .unique_for(unique_for_secs) .build(); - producer.enqueue(job1).unwrap(); + p.enqueue(job1).unwrap(); // this one is a 'duplicate' ... let job2 = Job::builder(job_type) .args(args.clone()) @@ -129,7 +154,7 @@ fn ent_unique_job() { .unique_for(unique_for_secs) .build(); // ... so the server will respond accordingly: - let res = producer.enqueue(job2).unwrap_err(); + let res = p.enqueue(job2).unwrap_err(); if let error::Error::Protocol(error::Protocol::UniqueConstraintViolation { msg }) = res { assert_eq!(msg, "Job not unique"); } else { @@ -137,12 +162,12 @@ fn ent_unique_job() { } // Let's now consume the job which is 'holding' a unique lock: - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); + let had_job = c.run_one(0, &[queue_name]).unwrap(); assert!(had_job); // And check that the queue is really empty (`job2` from above // has not been queued indeed): - let queue_is_empty = !consumer.run_one(0, &[queue_name]).unwrap(); + let queue_is_empty = !c.run_one(0, &[queue_name]).unwrap(); assert!(queue_is_empty); // Now let's repeat the latter case, but providing different args to job2: @@ -151,7 +176,7 @@ fn ent_unique_job() { .queue(queue_name) .unique_for(unique_for_secs) .build(); - producer.enqueue(job1).unwrap(); + p.enqueue(job1).unwrap(); // this one is *NOT* a 'duplicate' ... let job2 = JobBuilder::new(job_type) .args(vec![Value::from("ISBN-13:9781718501850"), Value::from(101)]) @@ -159,16 +184,12 @@ fn ent_unique_job() { .unique_for(unique_for_secs) .build(); // ... so the server will accept it: - producer.enqueue(job2).unwrap(); - - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); - assert!(had_job); - let had_another_one = consumer.run_one(0, &[queue_name]).unwrap(); - assert!(had_another_one); + p.enqueue(job2).unwrap(); + assert_had_one!(&mut c, queue_name); + assert_had_one!(&mut c, queue_name); // and the queue is empty again: - let had_job = consumer.run_one(0, &[queue_name]).unwrap(); - assert!(!had_job); + assert_is_empty!(&mut c, queue_name); } #[test] @@ -329,6 +350,7 @@ fn ent_unique_job_bypass_unique_lock() { skip_if_not_enterprise!(); let url = learn_faktory_url(); + let mut producer = Producer::connect(Some(&url)).unwrap(); let queue_name = "ent_unique_job_bypass_unique_lock"; let job1 = Job::builder("order") @@ -370,3 +392,693 @@ fn ent_unique_job_bypass_unique_lock() { assert!(c.run_one(0, &[queue_name]).unwrap()); assert!(!c.run_one(0, &[queue_name]).unwrap()); // empty; } + +#[test] +fn test_tracker_can_send_and_retrieve_job_execution_progress() { + use std::{ + io, + sync::{Arc, Mutex}, + thread, time, + }; + + skip_if_not_enterprise!(); + + let url = learn_faktory_url(); + + let t = Arc::new(Mutex::new( + Client::connect(Some(&url)).expect("job progress tracker created successfully"), + )); + + let t_captured = Arc::clone(&t); + + let mut p = Producer::connect(Some(&url)).unwrap(); + + let job_tackable = JobBuilder::new("order") + .args(vec![Value::from("ISBN-13:9781718501850")]) + .queue("test_tracker_can_send_progress_update") + .build(); + + let mut job_ordinary = JobBuilder::new("order") + .args(vec![Value::from("ISBN-13:9781718501850")]) + .queue("test_tracker_can_send_progress_update") + .build(); + // NB! Jobs are trackable by default, so we need to unset the "track" flag. + assert_eq!(job_ordinary.custom.remove("track"), Some(Value::from(1))); + + // let's remember this job's id: + let job_id = job_tackable.id().to_owned(); + let job_id_captured = job_id.clone(); + + p.enqueue(job_tackable).expect("enqueued"); + + let mut c = ConsumerBuilder::default(); + c.register("order", move |job| -> io::Result<_> { + // trying to set progress on a community edition of Faktory will give: + // 'an internal server error occurred: tracking subsystem is only available in Faktory Enterprise' + assert!(t_captured + .lock() + .expect("lock acquired") + .set_progress( + ProgressUpdate::builder(&job_id_captured) + .desc("Still processing...".to_owned()) + .percent(32) + .build(), + ) + .is_ok()); + // Let's update the progress once again, to check the 'set_progress' shortcut: + assert!(t_captured + .lock() + .unwrap() + .set_progress(ProgressUpdate::set(&job_id_captured, 33)) + .is_ok()); + + // let's sleep for a while ... + thread::sleep(time::Duration::from_secs(2)); + + // ... and read the progress info + let result = t_captured + .lock() + .expect("lock acquired") + .get_progress(job_id_captured.clone()) + .expect("Retrieved progress update over the wire"); + + assert!(result.is_some()); + let result = result.unwrap(); + assert_eq!(result.jid, job_id_captured.clone()); + match result.state { + JobState::Working => {} + _ => panic!("expected job's state to be 'working'"), + } + assert!(result.updated_at.is_some()); + assert_eq!(result.percent, Some(33)); + // considering the job done + Ok(eprintln!("{:?}", job)) + }); + + let mut c = c + .connect(Some(&url)) + .expect("Successfully ran a handshake with 'Faktory'"); + assert_had_one!(&mut c, "test_tracker_can_send_progress_update"); + + let progress = t + .lock() + .expect("lock acquired successfully") + .get_progress(job_id.clone()) + .expect("Retrieved progress update over the wire once again") + .expect("Some progress"); + + assert_eq!(progress.jid, job_id); + // 'Faktory' will be keeping last known update for at least 30 minutes: + assert_eq!(progress.percent, Some(33)); + + // But it actually knows the job's real status, since the consumer (worker) + // informed it immediately after finishing with the job: + assert_eq!(progress.state, JobState::Success); + + // Let's update the status once again to verify the 'update_builder' method + // on the `Progress` struct works as expected: + let upd = progress + .update_builder() + .desc("Final stage.".to_string()) + .percent(99) + .build(); + assert!(t.lock().unwrap().set_progress(upd).is_ok()); + + let progress = t + .lock() + .unwrap() + .get_progress(job_id) + .expect("Retrieved progress update over the wire once again") + .expect("Some progress"); + + if progress.percent != Some(100) { + let upd = progress.update_percent(100); + assert_eq!(upd.desc, progress.desc); + assert!(t.lock().unwrap().set_progress(upd).is_ok()) + } + + // What about 'ordinary' job ? + let job_id = job_ordinary.id().to_owned().clone(); + + // Sending it ... + p.enqueue(job_ordinary) + .expect("Successfuly send to Faktory"); + + // ... and asking for its progress + let progress = t + .lock() + .expect("lock acquired") + .get_progress(job_id.clone()) + .expect("Retrieved progress update over the wire once again") + .expect("Some progress"); + + // From the docs: + // There are several reasons why a job's state might be unknown: + // The JID is invalid or was never actually enqueued. + // The job was not tagged with the track variable in the job's custom attributes: custom:{"track":1}. + // The job's tracking structure has expired in Redis. It lives for 30 minutes and a big queue backlog can lead to expiration. + assert_eq!(progress.jid, job_id); + + // Returned from Faktory: '{"jid":"f7APFzrS2RZi9eaA","state":"unknown","updated_at":""}' + assert_eq!(progress.state, JobState::Unknown); + assert!(progress.updated_at.is_none()); + assert!(progress.percent.is_none()); + assert!(progress.desc.is_none()); +} + +#[test] +fn test_batch_of_jobs_can_be_initiated() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("thumbnail", move |_job| -> io::Result<_> { Ok(()) }); + c.register("clean_up", move |_job| -> io::Result<_> { Ok(()) }); + let mut c = c.connect(Some(&url)).unwrap(); + let mut t = Client::connect(Some(&url)).expect("job progress tracker created successfully"); + + let job_1 = Job::builder("thumbnail") + .args(vec!["path/to/original/image1"]) + .queue("test_batch_of_jobs_can_be_initiated") + .build(); + let job_2 = Job::builder("thumbnail") + .args(vec!["path/to/original/image2"]) + .queue("test_batch_of_jobs_can_be_initiated") + .build(); + let job_3 = Job::builder("thumbnail") + .args(vec!["path/to/original/image3"]) + .queue("test_batch_of_jobs_can_be_initiated") + .add_to_custom_data("bid", "check-check") + .build(); + + let cb_job = Job::builder("clean_up") + .queue("test_batch_of_jobs_can_be_initiated__CALLBACKs") + .build(); + + let batch = Batch::builder() + .description("Image resizing workload") + .with_complete_callback(cb_job); + + let time_just_before_batch_init = Utc::now(); + + let mut b = p.start_batch(batch).unwrap(); + + // let's remember batch id: + let bid = b.id().to_string(); + + assert!(b.add(job_1).unwrap().is_none()); + assert!(b.add(job_2).unwrap().is_none()); + assert_eq!(b.add(job_3).unwrap().unwrap(), "check-check"); + b.commit().unwrap(); + + // The batch has been committed, let's see its status: + let time_just_before_getting_status = Utc::now(); + + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // Just to make a meaningfull assertion about the BatchStatus's 'created_at' field: + assert!(s.created_at > time_just_before_batch_init); + assert!(s.created_at < time_just_before_getting_status); + assert_eq!(s.bid, bid); + assert_eq!(s.description, Some("Image resizing workload".into())); + assert_eq!(s.total, 3); // three jobs registered + assert_eq!(s.pending, 3); // and none executed just yet + assert_eq!(s.failed, 0); + // Docs do not mention it, but the golang client does: + // https://github.com/contribsys/faktory/blob/main/client/batch.go#L17-L19 + assert_eq!(s.success_callback_state, CallbackState::Pending); // we did not even provide the 'success' callback + assert_eq!(s.complete_callback_state, CallbackState::Pending); + + // consume and execute job 1 ... + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated"); + // ... and try consuming from the "callback" queue: + assert_is_empty!(&mut c, "test_batch_of_jobs_can_be_initiated__CALLBACKs"); + + // let's ask the Faktory server about the batch status after + // we have consumed one job from this batch: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 1 of 3 jobs: + assert_eq!(s.total, 3); + assert_eq!(s.pending, 2); + assert_eq!(s.failed, 0); + + // now, consume and execute job 2 + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated"); + // ... and check the callback queue again: + assert_is_empty!(&mut c, "test_batch_of_jobs_can_be_initiated__CALLBACKs"); // not just yet ... + + // let's check batch status once again: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 2 of 3 jobs: + assert_eq!(s.total, 3); + assert_eq!(s.pending, 1); + assert_eq!(s.failed, 0); + + // finally, consume and execute job 3 - the last one from the batch + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated"); + + // let's check batch status to see what happens after + // all the jobs from the batch have been executed: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 2 of 3 jobs: + assert_eq!(s.total, 3); + assert_eq!(s.pending, 0); + assert_eq!(s.failed, 0); + assert_eq!(s.complete_callback_state, CallbackState::Enqueued); + + // let's now successfully consume from the "callback" queue: + assert_had_one!(&mut c, "test_batch_of_jobs_can_be_initiated__CALLBACKs"); + + // let's check batch status one last time: + let s = t + .get_batch_status(bid.clone()) + .expect("successfully fetched batch status from server...") + .expect("...and it's not none"); + + // this is because we have just consumed and executed 2 of 3 jobs: + assert_eq!(s.complete_callback_state, CallbackState::FinishedOk); +} + +#[test] +fn test_batches_can_be_nested() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + + // Set up 'producer', 'consumer', and 'tracker': + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("jobtype", move |_job| -> io::Result<_> { Ok(()) }); + let mut _c = c.connect(Some(&url)).unwrap(); + let mut t = Client::connect(Some(&url)).expect("job progress tracker created successfully"); + + // Prepare some jobs: + let parent_job1 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + let child_job_1 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + let child_job_2 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + let grand_child_job_1 = Job::builder("jobtype") + .queue("test_batches_can_be_nested") + .build(); + + // Sccording to Faktory docs: + // "The callback for a parent batch will not enqueue until the callback for the child batch has finished." + // See: https://github.com/contribsys/faktory/wiki/Ent-Batches#guarantees + let parent_cb_job = Job::builder("clean_up") + .queue("test_batches_can_be_nested__CALLBACKs") + .build(); + let child_cb_job = Job::builder("clean_up") + .queue("test_batches_can_be_nested__CALLBACKs") + .build(); + let grandchild_cb_job = Job::builder("clean_up") + .queue("test_batches_can_be_nested__CALLBACKs") + .build(); + + // batches start + let parent_batch = Batch::builder() + .description("Parent batch") + .with_success_callback(parent_cb_job); + let mut parent_batch = p.start_batch(parent_batch).unwrap(); + let parent_batch_id = parent_batch.id().to_owned(); + parent_batch.add(parent_job1).unwrap(); + + let child_batch = Batch::builder() + .description("Child batch") + .with_success_callback(child_cb_job); + let mut child_batch = parent_batch.start_batch(child_batch).unwrap(); + let child_batch_id = child_batch.id().to_owned(); + child_batch.add(child_job_1).unwrap(); + child_batch.add(child_job_2).unwrap(); + + let grandchild_batch = Batch::builder() + .description("Grandchild batch") + .with_success_callback(grandchild_cb_job); + let mut grandchild_batch = child_batch.start_batch(grandchild_batch).unwrap(); + let grandchild_batch_id = grandchild_batch.id().to_owned(); + grandchild_batch.add(grand_child_job_1).unwrap(); + + grandchild_batch.commit().unwrap(); + child_batch.commit().unwrap(); + parent_batch.commit().unwrap(); + // batches finish + + let parent_status = t + .get_batch_status(parent_batch_id.clone()) + .unwrap() + .unwrap(); + assert_eq!(parent_status.description, Some("Parent batch".to_string())); + assert_eq!(parent_status.total, 1); + assert_eq!(parent_status.parent_bid, None); + + let child_status = t.get_batch_status(child_batch_id.clone()).unwrap().unwrap(); + assert_eq!(child_status.description, Some("Child batch".to_string())); + assert_eq!(child_status.total, 2); + assert_eq!(child_status.parent_bid, Some(parent_batch_id)); + + let grandchild_status = t.get_batch_status(grandchild_batch_id).unwrap().unwrap(); + assert_eq!( + grandchild_status.description, + Some("Grandchild batch".to_string()) + ); + assert_eq!(grandchild_status.total, 1); + assert_eq!(grandchild_status.parent_bid, Some(child_batch_id)); +} + +#[test] +fn test_callback_will_not_be_queued_unless_batch_gets_committed() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + + // prepare a producer, a consumer of 'order' jobs, and a tracker: + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut c = ConsumerBuilder::default(); + c.register("order", move |_job| -> io::Result<_> { Ok(()) }); + c.register("order_clean_up", move |_job| -> io::Result<_> { Ok(()) }); + let mut c = c.connect(Some(&url)).unwrap(); + let mut t = Client::connect(Some(&url)).unwrap(); + + let mut jobs = some_jobs( + "order", + "test_callback_will_not_be_queued_unless_batch_gets_committed", + 3, + ); + let mut callbacks = some_jobs( + "order_clean_up", + "test_callback_will_not_be_queued_unless_batch_gets_committed__CALLBACKs", + 1, + ); + + // start a 'batch': + let mut b = p + .start_batch( + Batch::builder() + .description("Orders processing workload") + .with_success_callback(callbacks.next().unwrap()), + ) + .unwrap(); + let bid = b.id().to_string(); + + // push 3 jobs onto this batch, but DO NOT commit the batch: + for _ in 0..3 { + b.add(jobs.next().unwrap()).unwrap(); + } + + // check this batch's status: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 3); + assert_eq!(s.pending, 3); + assert_eq!(s.success_callback_state, CallbackState::Pending); + + // consume those 3 jobs successfully; + for _ in 0..3 { + assert_had_one!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed" + ); + } + + // verify the queue is drained: + assert_is_empty!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed" + ); + + // check this batch's status again: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 3); + assert_eq!(s.pending, 0); + assert_eq!(s.failed, 0); + assert_eq!(s.success_callback_state, CallbackState::Pending); // not just yet; + + // to double-check, let's assert the success callbacks queue is empty: + assert_is_empty!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed__CALLBACKs" + ); + + // now let's COMMIT the batch ... + b.commit().unwrap(); + + // ... and check batch status: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.success_callback_state, CallbackState::Enqueued); + + // finally, let's consume from the success callbacks queue ... + assert_had_one!( + &mut c, + "test_callback_will_not_be_queued_unless_batch_gets_committed__CALLBACKs" + ); + + // ... and see the final status: + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.success_callback_state, CallbackState::FinishedOk); +} + +#[test] +fn test_callback_will_be_queued_upon_commit_even_if_batch_is_empty() { + use std::{thread, time}; + + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut t = Client::connect(Some(&url)).unwrap(); + let q_name = "test_callback_will_be_queued_upon_commit_even_if_batch_is_empty"; + let complete_cb_jobtype = "complete_callback_jobtype"; + let success_cb_jobtype = "success_cb_jobtype"; + let complete_cb = some_jobs(complete_cb_jobtype, q_name, 1).next().unwrap(); + let success_cb = some_jobs(success_cb_jobtype, q_name, 1).next().unwrap(); + let b = p + .start_batch( + Batch::builder() + .description("Orders processing workload") + .with_callbacks(success_cb, complete_cb), + ) + .unwrap(); + let bid = b.id().to_owned(); + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); // no jobs in the batch; + assert_eq!(s.success_callback_state, CallbackState::Pending); + assert_eq!(s.complete_callback_state, CallbackState::Pending); + + b.commit().unwrap(); + + // let's give the Faktory server some time: + thread::sleep(time::Duration::from_secs(2)); + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); // again, there are no jobs in the batch ... + + // The docs say "If you don't push any jobs into the batch, any callbacks will fire immediately upon BATCH COMMIT." + // and "the success callback for a batch will always enqueue after the complete callback" + assert_eq!(s.complete_callback_state, CallbackState::Enqueued); + assert_eq!(s.success_callback_state, CallbackState::Pending); + + let mut c = ConsumerBuilder::default(); + c.register(complete_cb_jobtype, move |_job| -> io::Result<_> { Ok(()) }); + c.register(success_cb_jobtype, move |_job| -> io::Result<_> { + Err(io::Error::new( + io::ErrorKind::Other, + "we want this one to fail to test the 'CallbackState' behavior", + )) + }); + + let mut c = c.connect(Some(&url)).unwrap(); + + assert_had_one!(&mut c, q_name); // complete callback consumed + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); + match s.complete_callback_state { + CallbackState::FinishedOk => {} + _ => panic!("Expected the callback to have been successfully executed"), + } + match s.success_callback_state { + CallbackState::Enqueued => {} + _ => panic!("Expected the callback to have been enqueued, since the `complete` callback has already executed"), + } + assert_had_one!(&mut c, q_name); // success callback consumed + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); + assert_eq!(s.complete_callback_state, CallbackState::FinishedOk); + // Still `Enqueued` due to the fact that it was not finished with success. + // Had we registered a handler for `success_cb_jobtype` returing Ok(()) rather then Err(), + // the state would be `FinishedOk` just like it's the case with the `complete` callback. + assert_eq!(s.success_callback_state, CallbackState::Enqueued); +} + +#[test] +fn test_batch_can_be_reopened_add_extra_jobs_and_batches_added() { + skip_if_not_enterprise!(); + let url = learn_faktory_url(); + let mut p = Producer::connect(Some(&url)).unwrap(); + let mut t = Client::connect(Some(&url)).unwrap(); + let mut jobs = some_jobs("order", "test_batch_can_be_reopned_add_extra_jobs_added", 4); + let mut callbacks = some_jobs( + "order_clean_up", + "test_batch_can_be_reopned_add_extra_jobs_added__CALLBACKs", + 1, + ); + + let b = Batch::builder() + .description("Orders processing workload") + .with_success_callback(callbacks.next().unwrap()); + + let mut b = p.start_batch(b).unwrap(); + let bid = b.id().to_string(); + b.add(jobs.next().unwrap()).unwrap(); // 1 job + b.add(jobs.next().unwrap()).unwrap(); // 2 jobs + + let status = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(status.total, 2); + assert_eq!(status.pending, 2); + + // ############################## SUBTEST 0 ########################################## + // Let's try to open/reopen a batch we have never declared: + let b = p.open_batch(String::from("non-existent-batch-id")).unwrap(); + // The server will error back on this, with "No such batch ", but + // we are handling this case for the end-user and returning `Ok(None)` instead, indicating + // this way that there is not such batch. + assert!(b.is_none()); + // ########################## END OF SUBTEST 0 ####################################### + + // ############################## SUBTEST 1 ########################################## + // Let's fist of all try to open the batch we have not committed yet: + // [We can use `producer::open_batch` specifying a bid OR - even we previously retrived + // a status for this batch, we can go with `status::open` providing an exclusive ref to producer] + let mut b = status.open(&mut p).unwrap().unwrap(); + b.add(jobs.next().unwrap()).unwrap(); // 3 jobs + + b.commit().unwrap(); // committig the batch + + let status = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(status.total, 3); + assert_eq!(status.pending, 3); + + // Subtest 1 result: + // The Faktory server let's us open the uncommitted batch. This is something not mention + // in the docs, but still worth checking. + // ########################### END OF SUBTEST 1 ###################################### + + // ############################## SUBTEST 2 ########################################## + // From the docs: + // """Note that, once committed, only a job within the batch may reopen it. + // Faktory will return an error if you dynamically add jobs from "outside" the batch; + // this is to prevent a race condition between callbacks firing and an outsider adding more jobs.""" + // Ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#batch-open-bid (Jan 10, 2024) + + // Let's try to open an already committed batch: + let mut b = p + .open_batch(bid.clone()) + .expect("no error") + .expect("is some"); + b.add(jobs.next().unwrap()).unwrap(); // 4 jobs + b.commit().unwrap(); // committing the batch again! + + let s = t.get_batch_status(bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 4); + assert_eq!(s.pending, 4); + + // Subtest 2 result: + // We managed to open a batch "from outside" and the server accepted the job INSTEAD OF ERRORING BACK. + // ############################ END OF SUBTEST 2 ####################################### + + // ############################## SUBTEST 3 ############################################ + // Let's see if we will be able to - again - open the committed batch "from outside" and + // add a nested batch to it. + let mut b = p.open_batch(bid.clone()).unwrap().expect("is some"); + let mut nested_callbacks = some_jobs( + "order_clean_up__NESTED", + "test_batch_can_be_reopned_add_extra_jobs_added__CALLBACKs__NESTED", + 2, + ); + let nested_batch_declaration = Batch::builder() + .description("Orders processing workload. Nested stage") + .with_callbacks( + nested_callbacks.next().unwrap(), + nested_callbacks.next().unwrap(), + ); + let nested_batch = b.start_batch(nested_batch_declaration).unwrap(); + let nested_bid = nested_batch.id().to_string(); + // committing the nested batch without any jobs + // since those are just not relevant for this test: + nested_batch.commit().unwrap(); + + let s = t.get_batch_status(nested_bid.clone()).unwrap().unwrap(); + assert_eq!(s.total, 0); + assert_eq!(s.parent_bid, Some(bid)); // this is really our child batch + assert_eq!(s.complete_callback_state, CallbackState::Enqueued); + + // Subtest 3 result: + // We managed to open an already committed batch "from outside" and the server accepted + // a nested batch INSTEAD OF ERRORING BACK. + // ############################ END OF SUBTEST 3 ####################################### + + // The following subtest assertions should be adjusted once fixes are introduced to + // the Faktory as per https://github.com/contribsys/faktory/issues/465 + // The idea is we should not be able to push to a batch for which the server have already + // enqeued a callback. + // + // ############################## SUBTEST 4 ############################################ + // From the docs: + // """Once a callback has enqueued for a batch, you may not add anything to the batch.""" + // ref: https://github.com/contribsys/faktory/wiki/Ent-Batches#guarantees (Jan 10, 2024) + + // Let's try to re-open the nested batch that we have already committed and add some jobs to it. + let mut b = p + .open_batch(nested_bid.clone()) + .expect("no error") + .expect("is some"); + let mut more_jobs = some_jobs( + "order_clean_up__NESTED", + "test_batch_can_be_reopned_add_extra_jobs_added__NESTED", + 2, + ); + b.add(more_jobs.next().unwrap()).unwrap(); + b.add(more_jobs.next().unwrap()).unwrap(); + b.commit().unwrap(); + + let s = t.get_batch_status(nested_bid.clone()).unwrap().unwrap(); + match s.complete_callback_state { + CallbackState::Enqueued => {} + _ => panic!("Expected the callback to have been enqueued"), + } + assert_eq!(s.pending, 2); // ... though there are pending jobs + assert_eq!(s.total, 2); + + // Subtest 4 result: + // We were able to add more jobs to the batch for which the Faktory server had already + // queued the callback. + // ############################## END OF SUBTEST 4 ##################################### + + // ############################## OVERALL RESULTS ###################################### + // The guarantees that definitely hold: + // + // 1) the callbacks will fire immediately after the jobs of this batch have been executed, provided the batch has been committed; + // + // 2) the callbacks will fire immediately for an empty batch, provided it has been committed; + // + // 3) the 'complete' callback will always be queued first + // (this is shown as part the test 'test_callback_will_be_queue_upon_commit_even_if_batch_is_empty'); +} diff --git a/tests/tls.rs b/tests/tls.rs index eb7e79cb..3260ea7c 100644 --- a/tests/tls.rs +++ b/tests/tls.rs @@ -1,13 +1,10 @@ -extern crate faktory; -extern crate serde_json; -extern crate url; +#![cfg(feature = "tls")] use faktory::*; use serde_json::Value; use std::{env, fs, io, sync}; #[test] -#[cfg(feature = "tls")] fn roundtrip_tls() { use native_tls::{Certificate, TlsConnector};