Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ func main() {
//you can move the job to permanent storage if you can't handle it
return bgjob.MoveToDlq(errors.New("move to dlq"))
})

cli := bgjob.NewClient(bgjob.NewPgStore(db))

ctx := context.Background()
store, err := bgjob.NewPgStoreV2(ctx, db)
if err != nil {
panic(err)
}
cli := bgjob.NewClient(store)

//if handler for job type wasn't provided, job will be moved to dlq
handler := bgjob.NewMux().
Expand All @@ -98,7 +103,6 @@ func main() {
bgjob.WithConcurrency(runtime.NumCPU()), //default 1
bgjob.WithPollInterval(500*time.Millisecond), //default 1s
)
ctx := context.Background()
worker.Run(ctx) //call ones, non-blocking

err = cli.Enqueue(ctx, bgjob.EnqueueRequest{
Expand Down
4 changes: 3 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ func prepareTest(t *testing.T) (*require.Assertions, *db, *bgjob.Client) {
err = applyMigration(db.DB)
asserter.NoError(err)

cli := bgjob.NewClient(bgjob.NewPgStore(db.DB))
store, err := bgjob.NewPgStoreV2(context.Background(), db.DB)
Comment thread
d1slike marked this conversation as resolved.
asserter.NoError(err)
cli := bgjob.NewClient(store)

return asserter, db, cli
}
Expand Down
10 changes: 6 additions & 4 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type ExecerContext interface {
ExecContext(ctx context.Context, s string, args ...interface{}) (sql.Result, error)
ExecContext(ctx context.Context, s string, args ...any) (sql.Result, error)
}

func Enqueue(ctx context.Context, e ExecerContext, req EnqueueRequest) error {
Expand Down Expand Up @@ -61,6 +61,7 @@ func requestsToJobs(list []EnqueueRequest) ([]Job, error) {
Queue: req.Queue,
Type: req.Type,
Arg: req.Arg,
RequestId: req.RequestId,
Attempt: 0,
LastError: nil,
NextRunAt: now.Add(req.Delay).Unix(),
Expand All @@ -75,11 +76,11 @@ func requestsToJobs(list []EnqueueRequest) ([]Job, error) {

func bulkInsert(ctx context.Context, e ExecerContext, jobs []Job) error {
valueStrings := make([]string, 0, len(jobs))
valueArgs := make([]interface{}, 0, len(jobs)*8)
valueArgs := make([]any, 0, len(jobs)*9)
placeholderNum := 0
for _, job := range jobs {
placeholders := make([]string, 0)
for i := 0; i < 8; i++ {
for i := 0; i < 9; i++ {
placeholderNum++
placeholders = append(placeholders, fmt.Sprintf("$%d", placeholderNum))
}
Expand All @@ -94,9 +95,10 @@ func bulkInsert(ctx context.Context, e ExecerContext, jobs []Job) error {
job.NextRunAt,
job.CreatedAt,
job.UpdatedAt,
job.RequestId,
)
}
query := fmt.Sprintf("INSERT INTO bgjob_job (id, queue, type, arg, attempt, next_run_at, created_at, updated_at) VALUES %s",
query := fmt.Sprintf("INSERT INTO bgjob_job (id, queue, type, arg, attempt, next_run_at, created_at, updated_at, request_id) VALUES %s",
strings.Join(valueStrings, ","))
_, err := e.ExecContext(ctx, query, valueArgs...)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions enqueue_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
)

type EnqueueRequest struct {
Id string //optional
Queue string //required
Type string //required
Arg []byte //optional
Delay time.Duration //optional
Id string //optional
Queue string //required
Type string //required
Arg []byte //optional
Delay time.Duration //optional
RequestId string //optional
}
8 changes: 6 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ func main() {
return bgjob.MoveToDlq(errors.New("move to dlq"))
})

cli := bgjob.NewClient(bgjob.NewPgStore(db))
ctx := context.Background()
store, err := bgjob.NewPgStoreV2(ctx, db)
if err != nil {
panic(err)
}
cli := bgjob.NewClient(store)

//if handler for job type wasn't provided, job will be moved to dlq
handler := bgjob.NewMux().
Expand All @@ -62,7 +67,6 @@ func main() {
bgjob.WithConcurrency(runtime.NumCPU()), //default 1
bgjob.WithPollInterval(500*time.Millisecond), //default 1s
)
ctx := context.Background()
worker.Run(ctx) //call ones, non-blocking

err = cli.Enqueue(ctx, bgjob.EnqueueRequest{
Expand Down
1 change: 1 addition & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type Job struct {
NextRunAt int64
CreatedAt time.Time
UpdatedAt time.Time
RequestId string
}
21 changes: 21 additions & 0 deletions migration/add_request_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
alter table bgjob_job
add column request_id text;

alter table bgjob_dead_job
add column request_id text;

update bgjob_job
set request_id = ''
where request_id is null;

update bgjob_dead_job
set request_id = ''
where request_id is null;

alter table bgjob_job
alter column request_id set default '',
alter column request_id set not null;

alter table bgjob_dead_job
alter column request_id set default '',
alter column request_id set not null;
6 changes: 4 additions & 2 deletions migration/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ create table bgjob_job
last_error text,
next_run_at int8 not null,
created_at timestamp not null,
updated_at timestamp not null
updated_at timestamp not null,
request_id text not null default ''
);

create index ix_bgjob_job__queue_next_run_at_created_at on bgjob_job (queue, next_run_at, created_at);
Expand All @@ -25,5 +26,6 @@ create table bgjob_dead_job
next_run_at int8 not null,
job_created_at timestamp not null,
job_updated_at timestamp not null,
moved_at timestamp not null
moved_at timestamp not null,
request_id text not null default ''
)
39 changes: 33 additions & 6 deletions pg_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ type pgStore struct {
db *sql.DB
}

func NewPgStore(db *sql.DB) *pgStore {
return &pgStore{
db: db,
func NewPgStoreV2(ctx context.Context, db *sql.DB) (*pgStore, error) {
err := assertHasColumn(ctx, db, "bgjob_job", "request_id")
if err != nil {
return nil, fmt.Errorf("%w; please apply 'add_request_id.sql' migration adding column `request_id` to `bgjob_job`", err)
}

err = assertHasColumn(ctx, db, "bgjob_dead_job", "request_id")
if err != nil {
return nil, fmt.Errorf("%w; please apply 'add_request_id.sql' migration adding column `request_id` to `bgjob_dead_job`", err)
}

return &pgStore{
db: db,
},
nil
}

func (p *pgStore) BulkInsert(ctx context.Context, jobs []Job) error {
Expand All @@ -23,7 +34,7 @@ func (p *pgStore) BulkInsert(ctx context.Context, jobs []Job) error {
func (p *pgStore) Acquire(ctx context.Context, queue string, handler func(tx Tx) error) error {
return runTx(ctx, p.db, func(ctx context.Context, tx *sql.Tx) error {
query := `
SELECT id, queue, type, arg, attempt, last_error, next_run_at, created_at, updated_at
SELECT id, queue, type, arg, attempt, last_error, next_run_at, created_at, updated_at, request_id
Comment thread
d1slike marked this conversation as resolved.
FROM bgjob_job
WHERE queue = $1 AND next_run_at <= $2
ORDER BY next_run_at, created_at
Expand All @@ -41,6 +52,7 @@ LIMIT 1 FOR UPDATE SKIP LOCKED
&job.NextRunAt,
&job.CreatedAt,
&job.UpdatedAt,
&job.RequestId,
)
if err == sql.ErrNoRows {
return ErrEmptyQueue
Expand Down Expand Up @@ -106,8 +118,8 @@ func (p *pgTx) Delete(ctx context.Context, id string) error {

func (p *pgTx) SaveInDlq(ctx context.Context, job Job) error {
query := `INSERT INTO bgjob_dead_job
(job_id, queue, type, arg, attempt, next_run_at, last_error, job_created_at, job_updated_at, moved_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
(job_id, queue, type, arg, attempt, next_run_at, last_error, job_created_at, job_updated_at, moved_at, request_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
`
_, err := p.tx.ExecContext(
ctx,
Expand All @@ -122,6 +134,7 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
job.CreatedAt,
job.UpdatedAt,
timeNow(),
job.RequestId,
)
return err
}
Expand All @@ -131,3 +144,17 @@ func (p *pgTx) UpdateArg(ctx context.Context, id string, arg []byte) error {
_, err := p.tx.ExecContext(ctx, query, arg, timeNow(), id)
return err
}

func assertHasColumn(ctx context.Context, db *sql.DB, table, column string) error {
query := fmt.Sprintf("SELECT %s FROM %s LIMIT 1", column, table)

err := db.QueryRowContext(ctx, query).Scan(new(any))
if err != nil {
if err == sql.ErrNoRows {
Comment thread
d1slike marked this conversation as resolved.
return nil
}
return fmt.Errorf("bgjob: schema check error for %s.%s: %w", table, column, err)
}

return nil
}