Skip to content

Commit 724cefc

Browse files
authored
Consistently prefer job param name to j (riverqueue#69)
When using a job parameter name for a worker, we weren't consistent about whether to use `j` or `job` and have mixed convention all over the place: func (w *Worker) Work(ctx context.Context, j *river.Job[ErroringArgs]) error { func (w *Worker) Work(ctx context.Context, job *river.Job[ErroringArgs]) error { Generally in non-core-team Go, a convention of more than single character variable names is preferred except in specific cases (e.g. `i`), so here, move everything over to use the more explicit `job`.
1 parent 4d141b3 commit 724cefc

11 files changed

+66
-66
lines changed

client_test.go

+43-43
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type noOpWorker struct {
6363
WorkerDefaults[noOpArgs]
6464
}
6565

66-
func (w *noOpWorker) Work(ctx context.Context, j *Job[noOpArgs]) error { return nil }
66+
func (w *noOpWorker) Work(ctx context.Context, job *Job[noOpArgs]) error { return nil }
6767

6868
type periodicJobArgs struct{}
6969

@@ -73,18 +73,18 @@ type periodicJobWorker struct {
7373
WorkerDefaults[periodicJobArgs]
7474
}
7575

76-
func (w *periodicJobWorker) Work(ctx context.Context, j *Job[periodicJobArgs]) error {
76+
func (w *periodicJobWorker) Work(ctx context.Context, job *Job[periodicJobArgs]) error {
7777
return nil
7878
}
7979

8080
type callbackFunc func(context.Context, *Job[callbackArgs]) error
8181

8282
func makeAwaitCallback(startedCh chan<- int64, doneCh chan struct{}) callbackFunc {
83-
return func(ctx context.Context, j *Job[callbackArgs]) error {
83+
return func(ctx context.Context, job *Job[callbackArgs]) error {
8484
select {
8585
case <-ctx.Done():
8686
return ctx.Err()
87-
case startedCh <- j.ID:
87+
case startedCh <- job.ID:
8888
}
8989

9090
// await done signal, or context cancellation:
@@ -108,8 +108,8 @@ type callbackWorker struct {
108108
fn callbackFunc
109109
}
110110

111-
func (w *callbackWorker) Work(ctx context.Context, j *Job[callbackArgs]) error {
112-
return w.fn(ctx, j)
111+
func (w *callbackWorker) Work(ctx context.Context, job *Job[callbackArgs]) error {
112+
return w.fn(ctx, job)
113113
}
114114

115115
func newTestConfig(t *testing.T, callback callbackFunc) *Config {
@@ -397,11 +397,11 @@ func Test_Client_Stop(t *testing.T) {
397397
jobDoneChan := make(chan struct{})
398398
jobStartedChan := make(chan int64)
399399

400-
callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
400+
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
401401
select {
402402
case <-ctx.Done():
403403
return ctx.Err()
404-
case jobStartedChan <- j.ID:
404+
case jobStartedChan <- job.ID:
405405
}
406406

407407
select {
@@ -446,9 +446,9 @@ func Test_Client_Stop(t *testing.T) {
446446
t.Parallel()
447447

448448
startedCh := make(chan int64)
449-
callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
449+
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
450450
select {
451-
case startedCh <- j.ID:
451+
case startedCh <- job.ID:
452452
default:
453453
}
454454
return nil
@@ -473,7 +473,7 @@ func Test_Client_Stop(t *testing.T) {
473473
t.Run("WithSubscriber", func(t *testing.T) {
474474
t.Parallel()
475475

476-
callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error { return nil }
476+
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error { return nil }
477477

478478
client := runNewTestClient(ctx, t, newTestConfig(t, callbackFunc))
479479

@@ -503,14 +503,14 @@ func Test_Client_StopAndCancel(t *testing.T) {
503503
jobDoneChan := make(chan struct{})
504504
jobStartedChan := make(chan int64)
505505

506-
callbackFunc := func(ctx context.Context, j *Job[callbackArgs]) error {
506+
callbackFunc := func(ctx context.Context, job *Job[callbackArgs]) error {
507507
defer close(jobDoneChan)
508508

509509
// indicate the job has started, unless context is already done:
510510
select {
511511
case <-ctx.Done():
512512
return ctx.Err()
513-
case jobStartedChan <- j.ID:
513+
case jobStartedChan <- job.ID:
514514
}
515515

516516
t.Logf("Job waiting for context cancellation")
@@ -571,12 +571,12 @@ type callbackWorkerWithCustomTimeout struct {
571571
fn func(context.Context, *Job[callbackWithCustomTimeoutArgs]) error
572572
}
573573

574-
func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error {
575-
return w.fn(ctx, j)
574+
func (w *callbackWorkerWithCustomTimeout) Work(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error {
575+
return w.fn(ctx, job)
576576
}
577577

578-
func (w *callbackWorkerWithCustomTimeout) Timeout(j *Job[callbackWithCustomTimeoutArgs]) time.Duration {
579-
return j.Args.TimeoutValue
578+
func (w *callbackWorkerWithCustomTimeout) Timeout(job *Job[callbackWithCustomTimeoutArgs]) time.Duration {
579+
return job.Args.TimeoutValue
580580
}
581581

582582
func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) {
@@ -589,7 +589,7 @@ func Test_Client_JobContextInheritsFromProvidedContext(t *testing.T) {
589589
doneCh := make(chan struct{})
590590
close(doneCh)
591591

592-
callbackFunc := func(ctx context.Context, j *Job[callbackWithCustomTimeoutArgs]) error {
592+
callbackFunc := func(ctx context.Context, job *Job[callbackWithCustomTimeoutArgs]) error {
593593
// indicate the job has started, unless context is already done:
594594
select {
595595
case <-ctx.Done():
@@ -1073,7 +1073,7 @@ func Test_Client_ErrorHandler(t *testing.T) {
10731073
t.Parallel()
10741074

10751075
handlerErr := fmt.Errorf("job error")
1076-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1076+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
10771077
return handlerErr
10781078
})
10791079

@@ -1127,7 +1127,7 @@ func Test_Client_ErrorHandler(t *testing.T) {
11271127
t.Run("PanicHandler", func(t *testing.T) {
11281128
t.Parallel()
11291129

1130-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1130+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
11311131
panic("panic val")
11321132
})
11331133

@@ -1459,7 +1459,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
14591459
t.Run("RetryUntilDiscarded", func(t *testing.T) {
14601460
t.Parallel()
14611461

1462-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1462+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
14631463
return fmt.Errorf("job error")
14641464
})
14651465

@@ -1583,8 +1583,8 @@ func Test_Client_Subscribe(t *testing.T) {
15831583

15841584
// Fail/succeed jobs based on their name so we can get a mix of both to
15851585
// verify.
1586-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1587-
if strings.HasPrefix(j.Args.Name, "failed") {
1586+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
1587+
if strings.HasPrefix(job.Args.Name, "failed") {
15881588
return fmt.Errorf("job error")
15891589
}
15901590
return nil
@@ -1649,8 +1649,8 @@ func Test_Client_Subscribe(t *testing.T) {
16491649
t.Run("CompletedOnly", func(t *testing.T) {
16501650
t.Parallel()
16511651

1652-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1653-
if strings.HasPrefix(j.Args.Name, "failed") {
1652+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
1653+
if strings.HasPrefix(job.Args.Name, "failed") {
16541654
return fmt.Errorf("job error")
16551655
}
16561656
return nil
@@ -1690,8 +1690,8 @@ func Test_Client_Subscribe(t *testing.T) {
16901690
t.Run("FailedOnly", func(t *testing.T) {
16911691
t.Parallel()
16921692

1693-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1694-
if strings.HasPrefix(j.Args.Name, "failed") {
1693+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
1694+
if strings.HasPrefix(job.Args.Name, "failed") {
16951695
return fmt.Errorf("job error")
16961696
}
16971697
return nil
@@ -1731,7 +1731,7 @@ func Test_Client_Subscribe(t *testing.T) {
17311731
t.Run("EventsDropWithNoListeners", func(t *testing.T) {
17321732
t.Parallel()
17331733

1734-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1734+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
17351735
return nil
17361736
})
17371737

@@ -1775,7 +1775,7 @@ func Test_Client_Subscribe(t *testing.T) {
17751775
t.Run("PanicOnUnknownKind", func(t *testing.T) {
17761776
t.Parallel()
17771777

1778-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1778+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
17791779
return nil
17801780
})
17811781

@@ -1789,7 +1789,7 @@ func Test_Client_Subscribe(t *testing.T) {
17891789
t.Run("SubscriptionCancellation", func(t *testing.T) {
17901790
t.Parallel()
17911791

1792-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1792+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
17931793
return nil
17941794
})
17951795

@@ -1883,7 +1883,7 @@ func Test_Client_JobCompletion(t *testing.T) {
18831883
t.Parallel()
18841884

18851885
require := require.New(t)
1886-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1886+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
18871887
return nil
18881888
})
18891889

@@ -1909,8 +1909,8 @@ func Test_Client_JobCompletion(t *testing.T) {
19091909
require := require.New(t)
19101910
var dbPool *pgxpool.Pool
19111911
now := time.Now().UTC()
1912-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1913-
_, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: j.ID, FinalizedAt: now})
1912+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
1913+
_, err := queries.JobSetCompleted(ctx, dbPool, dbsqlc.JobSetCompletedParams{ID: job.ID, FinalizedAt: now})
19141914
require.NoError(err)
19151915
return nil
19161916
})
@@ -1936,7 +1936,7 @@ func Test_Client_JobCompletion(t *testing.T) {
19361936
t.Parallel()
19371937

19381938
require := require.New(t)
1939-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1939+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
19401940
return errors.New("oops")
19411941
})
19421942

@@ -1961,7 +1961,7 @@ func Test_Client_JobCompletion(t *testing.T) {
19611961
t.Parallel()
19621962

19631963
require := require.New(t)
1964-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1964+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
19651965
return JobCancel(errors.New("oops"))
19661966
})
19671967

@@ -1988,9 +1988,9 @@ func Test_Client_JobCompletion(t *testing.T) {
19881988
require := require.New(t)
19891989
var dbPool *pgxpool.Pool
19901990
now := time.Now().UTC()
1991-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
1991+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
19921992
_, err := queries.JobSetDiscarded(ctx, dbPool, dbsqlc.JobSetDiscardedParams{
1993-
ID: j.ID,
1993+
ID: job.ID,
19941994
Error: []byte("{\"error\": \"oops\"}"),
19951995
FinalizedAt: now,
19961996
})
@@ -2023,11 +2023,11 @@ func Test_Client_JobCompletion(t *testing.T) {
20232023
now := time.Now().UTC()
20242024
var updatedJob *Job[callbackArgs]
20252025

2026-
config := newTestConfig(t, func(ctx context.Context, j *Job[callbackArgs]) error {
2026+
config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error {
20272027
tx, err := dbPool.Begin(ctx)
20282028
require.NoError(err)
20292029

2030-
updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, j)
2030+
updatedJob, err = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job)
20312031
require.NoError(err)
20322032

20332033
return tx.Commit(ctx)
@@ -2150,8 +2150,8 @@ func Test_NewClient_ClientIDWrittenToJobAttemptedByWhenFetched(t *testing.T) {
21502150
doneCh := make(chan struct{})
21512151
startedCh := make(chan *Job[callbackArgs])
21522152

2153-
callback := func(ctx context.Context, j *Job[callbackArgs]) error {
2154-
startedCh <- j
2153+
callback := func(ctx context.Context, job *Job[callbackArgs]) error {
2154+
startedCh <- job
21552155
<-doneCh
21562156
return nil
21572157
}
@@ -2529,8 +2529,8 @@ type timeoutTestWorker struct {
25292529
doneCh chan testWorkerDeadline
25302530
}
25312531

2532-
func (w *timeoutTestWorker) Timeout(j *Job[timeoutTestArgs]) time.Duration {
2533-
return j.Args.TimeoutValue
2532+
func (w *timeoutTestWorker) Timeout(job *Job[timeoutTestArgs]) time.Duration {
2533+
return job.Args.TimeoutValue
25342534
}
25352535

25362536
func (w *timeoutTestWorker) Work(ctx context.Context, job *Job[timeoutTestArgs]) error {

example_error_handler_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ type ErroringWorker struct {
4545
river.WorkerDefaults[ErroringArgs]
4646
}
4747

48-
func (w *ErroringWorker) Work(ctx context.Context, j *river.Job[ErroringArgs]) error {
48+
func (w *ErroringWorker) Work(ctx context.Context, job *river.Job[ErroringArgs]) error {
4949
switch {
50-
case j.Args.ShouldError:
50+
case job.Args.ShouldError:
5151
return fmt.Errorf("this job errored")
52-
case j.Args.ShouldPanic:
52+
case job.Args.ShouldPanic:
5353
panic("this job panicked")
5454
}
5555
return nil

example_job_cancel_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ type CancellingWorker struct {
2323
river.WorkerDefaults[CancellingArgs]
2424
}
2525

26-
func (w *CancellingWorker) Work(ctx context.Context, j *river.Job[CancellingArgs]) error {
27-
if j.Args.ShouldCancel {
26+
func (w *CancellingWorker) Work(ctx context.Context, job *river.Job[CancellingArgs]) error {
27+
if job.Args.ShouldCancel {
2828
fmt.Println("cancelling job")
2929
return river.JobCancel(fmt.Errorf("this wrapped error message will be persisted to DB"))
3030
}

example_job_snooze_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ type SnoozingWorker struct {
2424
river.WorkerDefaults[SnoozingArgs]
2525
}
2626

27-
func (w *SnoozingWorker) Work(ctx context.Context, j *river.Job[SnoozingArgs]) error {
28-
if j.Args.ShouldSnooze {
27+
func (w *SnoozingWorker) Work(ctx context.Context, job *river.Job[SnoozingArgs]) error {
28+
if job.Args.ShouldSnooze {
2929
fmt.Println("snoozing job for 5 minutes")
3030
return river.JobSnooze(5 * time.Minute)
3131
}

example_work_func_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ func Example_workFunc() {
3737
}
3838

3939
workers := river.NewWorkers()
40-
river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, j *river.Job[WorkFuncArgs]) error {
41-
fmt.Printf("Message: %s", j.Args.Message)
40+
river.AddWorker(workers, river.WorkFunc(func(ctx context.Context, job *river.Job[WorkFuncArgs]) error {
41+
fmt.Printf("Message: %s", job.Args.Message)
4242
return nil
4343
}))
4444

internal/cmd/producersample/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,10 @@ type MyJobWorker struct {
214214
logger *slog.Logger
215215
}
216216

217-
func (w *MyJobWorker) Work(ctx context.Context, j *river.Job[MyJobArgs]) error {
217+
func (w *MyJobWorker) Work(ctx context.Context, job *river.Job[MyJobArgs]) error {
218218
atomic.AddUint64(&w.jobsPerformed, 1)
219-
// fmt.Printf("performing job %d with args %+v\n", j.ID, j.Args)
220-
if j.ID%1000 == 0 {
219+
// fmt.Printf("performing job %d with args %+v\n", job.ID, job.Args)
220+
if job.ID%1000 == 0 {
221221
w.logger.Info("simulating stalled job, blocked on ctx.Done()")
222222
<-ctx.Done()
223223
w.logger.Info("stalled job exiting")

job_args_reflect_kind_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import "reflect"
1313
// Message `json:"message"`
1414
// }
1515
//
16-
// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, j *Job[WorkFuncArgs]) error {
16+
// AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[WorkFuncArgs]) error {
1717
// ...
1818
//
1919
// Its major downside compared to a normal JobArgs implementation is that it's

job_executor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (w *customRetryPolicyWorker) NextRetry(job *Job[callbackArgs]) time.Time {
3535
return time.Time{}
3636
}
3737

38-
func (w *customRetryPolicyWorker) Work(ctx context.Context, j *Job[callbackArgs]) error {
38+
func (w *customRetryPolicyWorker) Work(ctx context.Context, job *Job[callbackArgs]) error {
3939
return w.f()
4040
}
4141

rivertest/rivertest_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type Job1Worker struct {
2929
river.WorkerDefaults[Job1Args]
3030
}
3131

32-
func (w *Job1Worker) Work(ctx context.Context, j *river.Job[Job1Args]) error { return nil }
32+
func (w *Job1Worker) Work(ctx context.Context, job *river.Job[Job1Args]) error { return nil }
3333

3434
type Job2Args struct {
3535
Int int `json:"int"`
@@ -41,7 +41,7 @@ type Job2Worker struct {
4141
river.WorkerDefaults[Job2Args]
4242
}
4343

44-
func (w *Job2Worker) Work(ctx context.Context, j *river.Job[Job2Args]) error { return nil }
44+
func (w *Job2Worker) Work(ctx context.Context, job *river.Job[Job2Args]) error { return nil }
4545

4646
// The tests for this function are quite minimal because it uses the same
4747
// implementation as the `*Tx` variant, so most of the test happens below.

0 commit comments

Comments
 (0)