29
29
30
30
import javax .annotation .Nullable ;
31
31
import java .util .Iterator ;
32
+ import java .util .NoSuchElementException ;
33
+ import java .util .concurrent .atomic .AtomicBoolean ;
32
34
33
35
import static co .elastic .apm .agent .awssdk .common .AbstractSQSInstrumentationHelper .MESSAGE_PROCESSING_ACTION ;
34
36
import static co .elastic .apm .agent .awssdk .common .AbstractSQSInstrumentationHelper .MESSAGING_TYPE ;
@@ -43,6 +45,7 @@ public abstract class AbstractMessageIteratorWrapper<Message> implements Iterato
43
45
private final String queueName ;
44
46
private final AbstractSQSInstrumentationHelper <?, ?, Message > sqsInstrumentationHelper ;
45
47
private final TextHeaderGetter <Message > textHeaderGetter ;
48
+ private final AtomicBoolean iterationEnded ;
46
49
47
50
public AbstractMessageIteratorWrapper (Iterator <Message > delegate , Tracer tracer ,
48
51
String queueName ,
@@ -53,21 +56,25 @@ public AbstractMessageIteratorWrapper(Iterator<Message> delegate, Tracer tracer,
53
56
this .queueName = queueName ;
54
57
this .sqsInstrumentationHelper = sqsInstrumentationHelper ;
55
58
this .textHeaderGetter = textHeaderGetter ;
59
+ this .iterationEnded = new AtomicBoolean (false );
56
60
}
57
61
58
62
@ Override
59
63
public boolean hasNext () {
60
- endCurrentTransaction ();
61
- endMessageProcessingSpan ();
62
- return delegate .hasNext ();
64
+ boolean hasNext = delegate .hasNext ();
65
+ if (!hasNext && iterationEnded .compareAndSet (false , true )) {
66
+ endCurrentTransaction ();
67
+ endMessageProcessingSpan ();
68
+ }
69
+ return hasNext ;
63
70
}
64
71
65
72
@ Nullable
66
73
public Transaction <?> endCurrentTransaction () {
67
74
Transaction <?> transaction = null ;
68
75
try {
69
76
transaction = tracer .currentTransaction ();
70
- if (transaction != null && MESSAGING_TYPE .equals (transaction .getType ())) {
77
+ if (transaction != null && ! transaction . isFinished () && MESSAGING_TYPE .equals (transaction .getType ())) {
71
78
transaction .deactivate ().end ();
72
79
return null ;
73
80
}
@@ -93,12 +100,21 @@ public void endMessageProcessingSpan() {
93
100
94
101
@ Override
95
102
public Message next () {
96
- Transaction <?> currentTransaction = endCurrentTransaction ();
97
- Message sqsMessage = delegate .next ();
98
- if (currentTransaction == null ) {
99
- sqsInstrumentationHelper .startTransactionOnMessage (sqsMessage , queueName , textHeaderGetter );
103
+ try {
104
+ Transaction <?> currentTransaction = endCurrentTransaction ();
105
+ Message sqsMessage = delegate .next ();
106
+ if (currentTransaction == null ) {
107
+ sqsInstrumentationHelper .startTransactionOnMessage (sqsMessage , queueName , textHeaderGetter );
108
+ }
109
+ return sqsMessage ;
110
+ } catch (NoSuchElementException e ) {
111
+ // end the transaction when the caller loops with a try/catch until an exception is thrown
112
+ if (iterationEnded .compareAndSet (false , true )) {
113
+ endCurrentTransaction ();
114
+ endMessageProcessingSpan ();
115
+ }
116
+ throw e ;
100
117
}
101
- return sqsMessage ;
102
118
}
103
119
104
120
0 commit comments