Skip to content

Commit 20de877

Browse files
authored
Add ccJobLogData to ConsumerConfig to improve logging (#30)
1 parent 1fe7401 commit 20de877

File tree

6 files changed

+25
-32
lines changed

6 files changed

+25
-32
lines changed

Diff for: consumers/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# consumers-2.3.3.0 (2025-??-??)
2+
* Add `ccJobLogData` to `ConsumerConfig` to improve logging.
3+
14
# consumers-2.3.2.0 (2024-08-30)
25
* Use prepared queries to improve parsing and planning time.
36
* Prevent the consumer from crashlooping if `ccOnException` throws.

Diff for: consumers/consumers.cabal

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cabal-version: 3.0
22
name: consumers
3-
version: 2.3.2.0
3+
version: 2.3.3.0
44
synopsis: Concurrent PostgreSQL data consumers
55

66
description: Library for setting up concurrent consumers of data
@@ -54,6 +54,7 @@ library
5454
Database.PostgreSQL.Consumers.Utils
5555

5656
build-depends: base >= 4.14 && < 5
57+
, aeson >= 2.0
5758
, containers >= 0.5
5859
, exceptions >= 0.10
5960
, hpqtypes >= 1.11
@@ -79,9 +80,7 @@ test-suite consumers-example
7980
hpqtypes,
8081
hpqtypes-extras,
8182
log-base,
82-
text,
83-
text-show,
84-
transformers
83+
text
8584

8685
hs-source-dirs: example
8786

@@ -107,7 +106,6 @@ test-suite consumers-test
107106
mtl,
108107
stm,
109108
text,
110-
text-show,
111109
time,
112110
transformers,
113111
transformers-base

Diff for: consumers/example/Example.hs

+2-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import Log
2121
import Log.Backend.StandardOutput
2222
import System.Environment
2323
import System.Exit
24-
import TextShow
2524

2625
-- | Main application monad. See the 'log-base' and the 'hpqtypes'
2726
-- packages for documentation on 'DBT' and 'LogT'.
@@ -118,6 +117,7 @@ main = do
118117
, ccMaxRunningJobs = 1
119118
, ccProcessJob = processJob
120119
, ccOnException = handleException
120+
, ccJobLogData = \(i, _) -> ["job_id" .= i]
121121
}
122122

123123
-- Add a job to the consumer's queue.
@@ -141,13 +141,7 @@ main = do
141141
-- failure in different ways, such as: remove the job from the
142142
-- queue, mark it as processed, or schedule it for rerun.
143143
handleException :: SomeException -> (Int64, T.Text) -> AppM Action
144-
handleException exc (idx, _msg) = do
145-
logAttention "Job failed" $
146-
object
147-
[ "job_id" .= showt idx
148-
, "exception" .= showt exc
149-
]
150-
pure . RerunAfter $ imicroseconds 500000
144+
handleException _ _ = pure . RerunAfter $ imicroseconds 500000
151145

152146
-- | Table where jobs are stored. See
153147
-- 'Database.PostgreSQL.Consumers.Config.ConsumerConfig'.

Diff for: consumers/src/Database/PostgreSQL/Consumers/Components.hs

+12-11
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,17 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal
123123
where
124124
cc =
125125
cc0
126-
{ ccOnException = \ex job -> do
126+
{ ccOnException = \ex job -> localData (ccJobLogData cc0 job) $ do
127+
let doOnException = do
128+
action <- ccOnException cc0 ex job
129+
logInfo "Unexpected exception caught while processing job" $
130+
object
131+
[ "exception" .= show ex
132+
, "action" .= show action
133+
]
134+
pure action
127135
-- Let asynchronous exceptions through (StopExecution in particular).
128-
ccOnException cc0 ex job `ES.catchAny` \handlerEx -> do
136+
doOnException `ES.catchAny` \handlerEx -> do
129137
-- Arbitrary delay, but better than letting exceptions from the
130138
-- handler through and potentially crashlooping the consumer:
131139
--
@@ -140,8 +148,7 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal
140148
let action = RerunAfter $ idays 1
141149
logAttention "ccOnException threw an exception" $
142150
object
143-
[ "job_id" .= show (ccJobIndex cc0 job)
144-
, "exception" .= show handlerEx
151+
[ "exception" .= show handlerEx
145152
, "action" .= show action
146153
]
147154
pure action
@@ -391,7 +398,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs
391398
(_, joinFork) <- mask $ \restore -> T.fork $ do
392399
tid <- myThreadId
393400
bracket_ (registerJob tid) (unregisterJob tid) . restore $ do
394-
ccProcessJob job
401+
localData (ccJobLogData job) $ ccProcessJob job
395402
pure (job, joinFork)
396403
where
397404
registerJob tid = atomically $ do
@@ -406,12 +413,6 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs
406413
Right result -> pure (ccJobIndex job, result)
407414
Left ex -> do
408415
action <- ccOnException ex job
409-
logAttention "Unexpected exception caught while processing job" $
410-
object
411-
[ "job_id" .= show (ccJobIndex job)
412-
, "exception" .= show ex
413-
, "action" .= show action
414-
]
415416
pure (ccJobIndex job, Failed action)
416417

417418
-- Update status of the jobs.

Diff for: consumers/src/Database/PostgreSQL/Consumers/Config.hs

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ module Database.PostgreSQL.Consumers.Config
55
) where
66

77
import Control.Exception (SomeException)
8+
import Data.Aeson.Types qualified as A
89
import Data.Time
910
import Database.PostgreSQL.PQTypes.FromRow
1011
import Database.PostgreSQL.PQTypes.Interval
@@ -113,4 +114,6 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig
113114
-- ^ Action taken if a job processing function throws an exception. For
114115
-- robustness it's best to ensure that it doesn't throw. If it does, the
115116
-- exception will be logged and the job in question postponed by a day.
117+
, ccJobLogData :: !(job -> [A.Pair])
118+
-- ^ Data to attach to each log message while processing a job.
116119
}

Diff for: consumers/test/Test.hs

+2-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import Log.Backend.StandardOutput
2121
import System.Environment
2222
import System.Exit
2323
import Test.HUnit qualified as T
24-
import TextShow
2524

2625
data TestEnvSt = TestEnvSt
2726
{ teCurrentTime :: UTCTime
@@ -162,6 +161,7 @@ test = do
162161
, ccMaxRunningJobs = 20
163162
, ccProcessJob = processJob
164163
, ccOnException = handleException
164+
, ccJobLogData = \(i, _) -> ["job_id" .= i]
165165
}
166166

167167
putJob :: Int32 -> TestEnv ()
@@ -184,13 +184,7 @@ test = do
184184
pure (Ok Remove)
185185

186186
handleException :: SomeException -> (Int64, Int32) -> TestEnv Action
187-
handleException exc (idx, _countdown) = do
188-
logAttention "Job failed" $
189-
object
190-
[ "job_id" .= showt idx
191-
, "exception" .= showt exc
192-
]
193-
pure . RerunAfter $ imicroseconds 500000
187+
handleException _ _ = pure . RerunAfter $ imicroseconds 500000
194188

195189
jobsTable :: Table
196190
jobsTable =

0 commit comments

Comments
 (0)