forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_graceful_shutdown_test.go
185 lines (151 loc) · 5.77 KB
/
example_graceful_shutdown_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package river_test
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/internal/util/slogutil"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
type WaitsForCancelOnlyArgs struct{}
func (WaitsForCancelOnlyArgs) Kind() string { return "waits_for_cancel_only" }
// WaitsForCancelOnlyWorker is a worker that will never finish jobs until its
// context is cancelled.
type WaitsForCancelOnlyWorker struct {
river.WorkerDefaults[WaitsForCancelOnlyArgs]
jobStarted chan struct{}
}
func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[WaitsForCancelOnlyArgs]) error {
fmt.Printf("Working job that doesn't finish until cancelled\n")
close(w.jobStarted)
<-ctx.Done()
fmt.Printf("Job cancelled\n")
// In the event of cancellation, an error should be returned so that the job
// goes back in the retry queue.
return ctx.Err()
}
// Example_gracefulShutdown demonstrates a realistic-looking stop loop for
// River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C
// locally or on a platform like Heroku to stop a process) and when received,
// tries a soft stop that waits for work to finish. If it doesn't finish in
// time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs
// using context cancellation. A third will give up on the stop procedure and
// exit uncleanly.
func Example_gracefulShutdown() {
ctx := context.Background()
dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
if err != nil {
panic(err)
}
defer dbPool.Close()
// Required for the purpose of this test, but not necessary in real usage.
if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
panic(err)
}
jobStarted := make(chan struct{})
workers := river.NewWorkers()
river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
if err != nil {
panic(err)
}
_, err = riverClient.Insert(ctx, WaitsForCancelOnlyArgs{}, nil)
if err != nil {
panic(err)
}
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
riverClientStopped := make(chan struct{})
sigintOrTerm := make(chan os.Signal, 1)
signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM)
// This is meant to be a realistic-looking stop goroutine that might go in a
// real program. It waits for SIGINT/SIGTERM and when received, tries to stop
// gracefully by allowing a chance for jobs to finish. But if that isn't
// working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and
// it'll issue a hard stop that cancels the context of all active jobs. In
// case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure
// completely and exits uncleanly.
go func() {
defer close(riverClientStopped)
<-sigintOrTerm
fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")
softStopSucceeded := make(chan struct{})
go func() {
if err := riverClient.Stop(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)
}
}
close(softStopSucceeded)
}()
// Wait for soft stop to succeed, or another SIGINT/SIGTERM.
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")
case <-time.After(10 * time.Second):
fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")
case <-softStopSucceeded:
// Will never be reached in this example.
return
}
hardStopSucceeded := make(chan struct{})
go func() {
if err := riverClient.StopAndCancel(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)
}
}
close(hardStopSucceeded)
}()
// As long as all jobs respect context cancellation, StopAndCancel will
// always work. However, in the case of a bug where a job blocks despite
// being cancelled, it may be necessary to either ignore River's stop
// result (what's shown here) or have a supervisor kill the process.
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; ignoring stop procedure and exiting unsafely\n")
case <-time.After(10 * time.Second):
fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n")
case <-hardStopSucceeded:
}
}()
// Make sure our job starts being worked before doing anything else.
<-jobStarted
// Cheat a little by sending a SIGTERM manually for the purpose of this
// example (normally this will be sent by user or supervisory process). The
// first SIGTERM tries a soft stop in which jobs are given a chance to
// finish up.
sigintOrTerm <- syscall.SIGTERM
// The soft stop will never work in this example because our job only
// respects context cancellation, but wait a short amount of time to give it
// a chance. After it elapses, send another SIGTERM to initiate a hard stop.
select {
case <-riverClientStopped:
// Will never be reached in this example because our job will only ever
// finish on context cancellation.
fmt.Printf("Soft stop succeeded\n")
case <-time.After(100 * time.Millisecond):
sigintOrTerm <- syscall.SIGTERM
<-riverClientStopped
}
// Output:
// Working job that doesn't finish until cancelled
// Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)
// Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)
// Job cancelled
// jobExecutor: Job failed
}