Commit 81f2941
authored
Fix send offset to tx, when no active tx (#4096)
Fixes #4088
**Auto-cherry-pick to `3.3.x`**
Fix `IllegalStateException` when record filtered by interceptor in transactional listener
When using a transactional KafkaMessageListenerContainer with an early `RecordInterceptor`
that filters out records, the container attempted to send offsets to a transaction even when no
active transaction existed. This resulted in an `IllegalStateException` from the Kafka producer:
"Cannot send offsets if a transaction is not in progress (currentState=READY)"
The issue occurred because:
- Container has a `KafkaTransactionManager` configured
- Interceptor filters a record before listener execution
- No transaction is started (listener never executes)
- Container still attempts to send offsets via `sendOffsetsToTransaction()`
- Kafka producer rejects the call as no transaction is active
Solution:
Add a guard clause in `sendOffsetsToTransaction()` to check for an active transaction resource in `TransactionSynchronizationManager`. If either `kafkaTxManager` is null or there's no active transaction resource,
the method returns early without attempting to send offsets.
The fix includes an integration test in `TransactionalContainerTests` that:
- Uses `RecordInterceptor` to filter records
- Tracks transaction commits via `TransactionExecutionListener`
- Verifies only records that execute listeners trigger transactions
- Confirms filtered records don't cause exceptions
Signed-off-by: moonyougnCHAE <[email protected]>1 parent d0e92d6 commit 81f2941
File tree
2 files changed
+87
-0
lines changed- spring-kafka/src
- main/java/org/springframework/kafka/listener
- test/java/org/springframework/kafka/listener
2 files changed
+87
-0
lines changedLines changed: 3 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
3080 | 3080 | | |
3081 | 3081 | | |
3082 | 3082 | | |
| 3083 | + | |
| 3084 | + | |
| 3085 | + | |
3083 | 3086 | | |
3084 | 3087 | | |
3085 | 3088 | | |
| |||
Lines changed: 84 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
47 | 47 | | |
48 | 48 | | |
49 | 49 | | |
| 50 | + | |
50 | 51 | | |
51 | 52 | | |
52 | 53 | | |
| |||
75 | 76 | | |
76 | 77 | | |
77 | 78 | | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
78 | 82 | | |
79 | 83 | | |
80 | 84 | | |
| |||
140 | 144 | | |
141 | 145 | | |
142 | 146 | | |
| 147 | + | |
| 148 | + | |
143 | 149 | | |
144 | 150 | | |
145 | 151 | | |
| |||
1148 | 1154 | | |
1149 | 1155 | | |
1150 | 1156 | | |
| 1157 | + | |
| 1158 | + | |
| 1159 | + | |
| 1160 | + | |
| 1161 | + | |
| 1162 | + | |
| 1163 | + | |
| 1164 | + | |
| 1165 | + | |
| 1166 | + | |
| 1167 | + | |
| 1168 | + | |
| 1169 | + | |
| 1170 | + | |
| 1171 | + | |
| 1172 | + | |
| 1173 | + | |
| 1174 | + | |
| 1175 | + | |
| 1176 | + | |
| 1177 | + | |
| 1178 | + | |
| 1179 | + | |
| 1180 | + | |
| 1181 | + | |
| 1182 | + | |
| 1183 | + | |
| 1184 | + | |
| 1185 | + | |
| 1186 | + | |
| 1187 | + | |
| 1188 | + | |
| 1189 | + | |
| 1190 | + | |
| 1191 | + | |
| 1192 | + | |
| 1193 | + | |
| 1194 | + | |
| 1195 | + | |
| 1196 | + | |
| 1197 | + | |
| 1198 | + | |
| 1199 | + | |
| 1200 | + | |
| 1201 | + | |
| 1202 | + | |
| 1203 | + | |
| 1204 | + | |
| 1205 | + | |
| 1206 | + | |
| 1207 | + | |
| 1208 | + | |
| 1209 | + | |
| 1210 | + | |
| 1211 | + | |
| 1212 | + | |
| 1213 | + | |
| 1214 | + | |
| 1215 | + | |
| 1216 | + | |
| 1217 | + | |
| 1218 | + | |
| 1219 | + | |
| 1220 | + | |
| 1221 | + | |
| 1222 | + | |
| 1223 | + | |
| 1224 | + | |
| 1225 | + | |
| 1226 | + | |
| 1227 | + | |
| 1228 | + | |
| 1229 | + | |
| 1230 | + | |
| 1231 | + | |
| 1232 | + | |
| 1233 | + | |
| 1234 | + | |
1151 | 1235 | | |
0 commit comments