Skip to content
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
a0b2dc9
Add tracker, add support for batch jobs
rustworthy Jan 17, 2024
b143490
Run fmt
rustworthy Jan 17, 2024
030484f
Merge branch 'main' into feat/ent-batch-jobs
rustworthy Jan 20, 2024
f3c8348
Use doc-cfg for new ent features
rustworthy Jan 20, 2024
143319c
Merge branch 'main' into feat/ent-batch-jobs
rustworthy Jan 30, 2024
268ffec
Do not error if batch does not exist when re-opening
rustworthy Jan 30, 2024
6b4770c
Update batch and job names in docs
rustworthy Jan 31, 2024
b69ed1a
Expect no args on batch::builder
rustworthy Jan 31, 2024
3e27bf1
Avoid unwraps in docs
rustworthy Jan 31, 2024
1ca4159
Update src/proto/batch/mod.rs
rustworthy Jan 31, 2024
a8048ee
Consume batchbuilder
rustworthy Jan 31, 2024
6653c50
Accept Into<String> in BatchBuilder description
rustworthy Jan 31, 2024
c0e0176
Default description to None
rustworthy Jan 31, 2024
1cda1dd
Update src/proto/batch/mod.rs
rustworthy Jan 31, 2024
d3e4505
Update src/proto/batch/mod.rs
rustworthy Jan 31, 2024
87bf6c7
Update src/proto/single/ent.rs
rustworthy Jan 31, 2024
304f290
Update comment to 'parse_datetime' utility
rustworthy Jan 31, 2024
31bef3e
Update src/proto/single/ent.rs
rustworthy Jan 31, 2024
f554817
Typo in 'known'
rustworthy Jan 31, 2024
c2db5ea
Update docs for 'reserve_until'
rustworthy Jan 31, 2024
effdd81
Update docs, make 'BatchHandle' public
rustworthy Jan 31, 2024
7c7e1c3
Update comment in e2e test.
rustworthy Feb 1, 2024
c279922
Add JobState enum. Update tests accordingly
rustworthy Feb 1, 2024
fa906df
App 'open' method on 'BatchStatus'
rustworthy Feb 2, 2024
73c11fd
Add 'set_progress' shortcut
rustworthy Feb 2, 2024
2dd891f
Add 'update_percet' method
rustworthy Feb 2, 2024
d745549
Add 'update_percet' method
rustworthy Feb 2, 2024
54f78f6
Add 'update_percet' method
rustworthy Feb 2, 2024
2f7231d
Make get_random_wid and get_random_jid public within crate
rustworthy Feb 2, 2024
120978b
Update tests for 'update_percent' shortcut
rustworthy Feb 2, 2024
01fd744
Make jobs trackable by deafult
rustworthy Feb 2, 2024
c8b7450
Make jobs trackable by deafult - update tests
rustworthy Feb 2, 2024
0edbd83
Add 'CallbackState' enum. Use for complete and success callback type
rustworthy Feb 4, 2024
06d208e
Bring 'Tracker' into scope for docs
rustworthy Feb 4, 2024
e13f0d6
Update assertion on batch test
rustworthy Feb 4, 2024
d10ceee
Make BatchHandle::add return option of old bid
rustworthy Feb 4, 2024
ff3fd5f
Make BatchHandle::add return option of old bid as serde_json::Value
rustworthy Feb 4, 2024
489ca0f
Run formatter
rustworthy Feb 4, 2024
18f1593
Double-check the batch callback status
rustworthy Feb 4, 2024
1dfb8d1
Restore assertions after verifying on CI
rustworthy Feb 4, 2024
0958e31
Update src/producer/mod.rs
rustworthy Feb 5, 2024
e2f57b6
Update src/proto/single/ent.rs
rustworthy Feb 5, 2024
0d721cf
Update src/proto/single/mod.rs
rustworthy Feb 5, 2024
5bd8616
Update import grouping in producer module
rustworthy Feb 5, 2024
c9c8bd0
Turn free func 'set_progress' into unbound set of ProgressUpdate
rustworthy Feb 5, 2024
1560c21
Add comment to batch guarantees subtest
rustworthy Feb 7, 2024
4fd12a6
Rm Tracker construct
rustworthy Feb 12, 2024
07ab901
Re-use client logic
rustworthy Feb 12, 2024
637a2ae
Update docs for 'is_worker'
rustworthy Feb 12, 2024
c7d7d62
make checks pass
rustworthy Feb 12, 2024
629e6b1
Fix PR review threads
rustworthy Feb 12, 2024
c1a82fc
Split single::ent mod
rustworthy Feb 12, 2024
50ef9c3
Update src/lib.rs
rustworthy Feb 17, 2024
fde63fc
Update src/proto/mod.rs
rustworthy Feb 17, 2024
62da8fd
Update src/proto/mod.rs
rustworthy Feb 17, 2024
9bd191c
Update src/proto/mod.rs
rustworthy Feb 17, 2024
8a87e50
Update src/proto/mod.rs
rustworthy Feb 17, 2024
fb56236
Update src/proto/mod.rs
rustworthy Feb 17, 2024
6ce6bc0
Update src/proto/mod.rs
rustworthy Feb 17, 2024
1dd3981
Update imports in faktory ent tests
rustworthy Feb 17, 2024
65ac269
Run cargo fmt
rustworthy Feb 17, 2024
f8dae7d
Update docs
rustworthy Feb 17, 2024
80749d4
Merge branch 'main' into feat/ent-batch-jobs
jonhoo Feb 18, 2024
6c98127
Remove group imports format checking
jonhoo Feb 18, 2024
62ec997
nit on module ordering
jonhoo Feb 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::error::Error;
use crate::proto::{self, Client, ClientOptions, HeartbeatStatus, Reconnect};
use crate::proto::{
self, parse_provided_or_from_env, Ack, Client, ClientOptions, Fail, HeartbeatStatus, Job,
Reconnect,
};
use fnv::FnvHashMap;
use std::error::Error as StdError;
use std::io::prelude::*;
use std::net::TcpStream;
use std::sync::{atomic, Arc, Mutex};

use crate::proto::{Ack, Fail, Job};

const STATUS_RUNNING: usize = 0;
const STATUS_QUIET: usize = 1;
const STATUS_TERMINATING: usize = 2;
Expand Down Expand Up @@ -213,10 +214,7 @@ impl<E> ConsumerBuilder<E> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub fn connect(self, url: Option<&str>) -> Result<Consumer<TcpStream, E>, Error> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
}?;
let url = parse_provided_or_from_env(url)?;
let stream = TcpStream::connect(proto::host_from_url(&url))?;
Self::connect_with(self, stream, url.password().map(|p| p.to_string()))
}
Expand All @@ -228,6 +226,7 @@ impl<E> ConsumerBuilder<E> {
pwd: Option<String>,
) -> Result<Consumer<S, E>, Error> {
self.opts.password = pwd;
self.opts.is_worker = true;
Ok(Consumer::new(
Client::new(stream, self.opts)?,
self.workers,
Expand Down
10 changes: 8 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,11 @@ pub use tls::TlsStream;
pub use crate::consumer::{Consumer, ConsumerBuilder};
pub use crate::error::Error;
pub use crate::producer::Producer;
pub use crate::proto::Reconnect;
pub use crate::proto::{Job, JobBuilder};
pub use crate::proto::{Client, Job, JobBuilder, Reconnect};

#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
pub use crate::proto::{
Batch, BatchBuilder, BatchHandle, BatchStatus, CallbackState, JobState, Progress,
ProgressUpdate, ProgressUpdateBuilder,
};
42 changes: 32 additions & 10 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::error::Error;
use crate::proto::{self, Client, Info, Job, Push, QueueAction, QueueControl};
use crate::proto::{Client, Info, Job, Push, QueueAction, QueueControl};
use std::io::prelude::*;
use std::net::TcpStream;

#[cfg(feature = "ent")]
use crate::proto::{Batch, BatchHandle, CommitBatch, OpenBatch};

/// `Producer` is used to enqueue new jobs that will in turn be processed by Faktory workers.
///
/// # Connecting to Faktory
Expand Down Expand Up @@ -82,21 +85,16 @@ impl Producer<TcpStream> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub fn connect(url: Option<&str>) -> Result<Self, Error> {
let url = match url {
Some(url) => proto::url_parse(url),
None => proto::url_parse(&proto::get_env_url()),
}?;
let stream = TcpStream::connect(proto::host_from_url(&url))?;
Self::connect_with(stream, url.password().map(|p| p.to_string()))
let c = Client::connect(url)?;
Ok(Producer { c })
}
}

impl<S: Read + Write> Producer<S> {
/// Connect to a Faktory server with a non-standard stream.
pub fn connect_with(stream: S, pwd: Option<String>) -> Result<Producer<S>, Error> {
Ok(Producer {
c: Client::new_producer(stream, pwd)?,
})
let c = Client::connect_with(stream, pwd)?;
Ok(Producer { c })
}

/// Enqueue the given job on the Faktory server.
Expand Down Expand Up @@ -129,6 +127,30 @@ impl<S: Read + Write> Producer<S> {
.issue(&QueueControl::new(QueueAction::Resume, queues))?
.await_ok()
}

/// Initiate a new batch of jobs.
#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
pub fn start_batch(&mut self, batch: Batch) -> Result<BatchHandle<'_, S>, Error> {
let bid = self.c.issue(&batch)?.read_bid()?;
Ok(BatchHandle::new(bid, self))
}

/// Open an already existing batch of jobs.
///
/// This will not error if a batch with the provided `bid` does not exist,
/// rather `Ok(None)` will be returned.
#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
pub fn open_batch(&mut self, bid: String) -> Result<Option<BatchHandle<'_, S>>, Error> {
let bid = self.c.issue(&OpenBatch::from(bid))?.maybe_bid()?;
Ok(bid.map(|bid| BatchHandle::new(bid, self)))
}

#[cfg(feature = "ent")]
pub(crate) fn commit_batch(&mut self, bid: String) -> Result<(), Error> {
self.c.issue(&CommitBatch::from(bid))?.await_ok()
}
}

#[cfg(test)]
Expand Down
65 changes: 65 additions & 0 deletions src/proto/batch/cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::proto::single::FaktoryCommand;
use crate::{Batch, Error};
use std::io::Write;

impl FaktoryCommand for Batch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH NEW ")?;
serde_json::to_writer(&mut *w, self).map_err(Error::Serialization)?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct CommitBatch(String);

impl From<String> for CommitBatch {
fn from(value: String) -> Self {
CommitBatch(value)
}
}

impl FaktoryCommand for CommitBatch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH COMMIT ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct GetBatchStatus(String);

impl From<String> for GetBatchStatus {
fn from(value: String) -> Self {
GetBatchStatus(value)
}
}

impl FaktoryCommand for GetBatchStatus {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH STATUS ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}

// ----------------------------------------------

pub struct OpenBatch(String);

impl From<String> for OpenBatch {
fn from(value: String) -> Self {
OpenBatch(value)
}
}

impl FaktoryCommand for OpenBatch {
fn issue<W: Write>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"BATCH OPEN ")?;
w.write_all(self.0.as_bytes())?;
Ok(w.write_all(b"\r\n")?)
}
}
Loading