Skip to content

Commit fe138f5

Browse files
authored
New metrics on the S3 source - Succeeded Count and Read Time Elapsed (opensearch-project#1505)
New metrics on the S3 source - S3 Objects succeeded and the read time elapsed to read and process an Object. Signed-off-by: David Venable <[email protected]>
1 parent cfa4048 commit fe138f5

File tree

3 files changed

+124
-4
lines changed

3 files changed

+124
-4
lines changed

data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
import com.amazon.dataprepper.plugins.source.compression.CompressionEngine;
1414
import com.amazon.dataprepper.plugins.source.compression.NoneCompressionEngine;
1515
import io.micrometer.core.instrument.Counter;
16+
import io.micrometer.core.instrument.Meter;
17+
import io.micrometer.core.instrument.Tags;
18+
import io.micrometer.core.instrument.Timer;
19+
import io.micrometer.core.instrument.noop.NoopTimer;
1620
import org.junit.jupiter.api.BeforeEach;
1721
import org.junit.jupiter.api.extension.ExtensionContext;
1822
import org.junit.jupiter.params.ParameterizedTest;
@@ -30,14 +34,14 @@
3034
import java.util.function.Consumer;
3135
import java.util.stream.Stream;
3236

33-
import static com.amazon.dataprepper.plugins.source.S3ObjectWorker.S3_OBJECTS_FAILED_METRIC_NAME;
3437
import static org.hamcrest.CoreMatchers.equalTo;
3538
import static org.hamcrest.CoreMatchers.notNullValue;
3639
import static org.hamcrest.MatcherAssert.assertThat;
3740
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3841
import static org.junit.jupiter.params.provider.Arguments.arguments;
3942
import static org.mockito.ArgumentMatchers.anyCollection;
4043
import static org.mockito.ArgumentMatchers.anyInt;
44+
import static org.mockito.ArgumentMatchers.anyString;
4145
import static org.mockito.ArgumentMatchers.eq;
4246
import static org.mockito.Mockito.doAnswer;
4347
import static org.mockito.Mockito.mock;
@@ -69,7 +73,9 @@ void setUp() {
6973

7074
pluginMetrics = mock(PluginMetrics.class);
7175
final Counter counter = mock(Counter.class);
72-
when(pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME)).thenReturn(counter);
76+
final Timer timer = new NoopTimer(new Meter.Id("test", Tags.empty(), null, null, Meter.Type.TIMER));
77+
when(pluginMetrics.counter(anyString())).thenReturn(counter);
78+
when(pluginMetrics.timer(anyString())).thenReturn(timer);
7379
}
7480

7581
private void stubBufferWriter(final Consumer<Event> additionalEventAssertions) throws Exception {

data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.amazon.dataprepper.plugins.source.codec.Codec;
1313
import com.amazon.dataprepper.plugins.source.compression.CompressionEngine;
1414
import io.micrometer.core.instrument.Counter;
15+
import io.micrometer.core.instrument.Timer;
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
1718
import software.amazon.awssdk.core.ResponseInputStream;
@@ -22,6 +23,7 @@
2223
import java.io.IOException;
2324
import java.io.InputStream;
2425
import java.time.Duration;
26+
import java.util.concurrent.Callable;
2527

2628
/**
2729
* Class responsible for taking an {@link S3ObjectReference} and creating all the necessary {@link Event}
@@ -30,6 +32,8 @@
3032
class S3ObjectWorker {
3133
private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class);
3234
static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed";
35+
static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded";
36+
static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed";
3337

3438
private final S3Client s3Client;
3539
private final Buffer<Record<Event>> buffer;
@@ -38,6 +42,8 @@ class S3ObjectWorker {
3842
private final Duration bufferTimeout;
3943
private final int numberOfRecordsToAccumulate;
4044
private final Counter s3ObjectsFailedCounter;
45+
private final Counter s3ObjectsSucceededCounter;
46+
private final Timer s3ObjectReadTimer;
4147

4248
public S3ObjectWorker(final S3Client s3Client,
4349
final Buffer<Record<Event>> buffer,
@@ -54,6 +60,8 @@ public S3ObjectWorker(final S3Client s3Client,
5460
this.numberOfRecordsToAccumulate = numberOfRecordsToAccumulate;
5561

5662
s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
63+
s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME);
64+
s3ObjectReadTimer = pluginMetrics.timer(S3_OBJECTS_TIME_ELAPSED_METRIC_NAME);
5765
}
5866

5967
void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException {
@@ -65,8 +73,24 @@ void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException
6573
.build();
6674

6775
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, numberOfRecordsToAccumulate, bufferTimeout);
76+
try {
77+
s3ObjectReadTimer.recordCallable((Callable<Void>) () -> {
78+
doParseObject(s3ObjectReference, getObjectRequest, bufferAccumulator);
6879

80+
return null;
81+
});
82+
} catch (final IOException | RuntimeException e) {
83+
throw e;
84+
} catch (final Exception e) {
85+
// doParseObject does not throw Exception, only IOException or RuntimeException. But, Callable has Exception as a checked
86+
// exception on the interface. This catch block thus should not be reached, but in case it is, wrap it.
87+
throw new RuntimeException(e);
88+
}
89+
90+
s3ObjectsSucceededCounter.increment();
91+
}
6992

93+
private void doParseObject(final S3ObjectReference s3ObjectReference, final GetObjectRequest getObjectRequest, final BufferAccumulator<Record<Event>> bufferAccumulator) throws IOException {
7094
try (final ResponseInputStream<GetObjectResponse> responseInputStream = s3Client.getObject(getObjectRequest);
7195
final InputStream inputStream = compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream)) {
7296
codec.parse(inputStream, record -> {

data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.amazon.dataprepper.plugins.source.codec.Codec;
1313
import com.amazon.dataprepper.plugins.source.compression.CompressionEngine;
1414
import io.micrometer.core.instrument.Counter;
15+
import io.micrometer.core.instrument.Timer;
1516
import org.junit.jupiter.api.BeforeEach;
1617
import org.junit.jupiter.api.Test;
1718
import org.junit.jupiter.api.extension.ExtendWith;
@@ -31,10 +32,12 @@
3132
import java.util.Optional;
3233
import java.util.Random;
3334
import java.util.UUID;
35+
import java.util.concurrent.Callable;
3436
import java.util.function.Consumer;
3537

3638
import static org.hamcrest.CoreMatchers.equalTo;
3739
import static org.hamcrest.CoreMatchers.notNullValue;
40+
import static org.hamcrest.CoreMatchers.nullValue;
3841
import static org.hamcrest.CoreMatchers.sameInstance;
3942
import static org.hamcrest.MatcherAssert.assertThat;
4043
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -45,6 +48,7 @@
4548
import static org.mockito.Mockito.mock;
4649
import static org.mockito.Mockito.mockStatic;
4750
import static org.mockito.Mockito.verify;
51+
import static org.mockito.Mockito.verifyNoInteractions;
4852
import static org.mockito.Mockito.when;
4953

5054
@ExtendWith(MockitoExtension.class)
@@ -72,13 +76,19 @@ class S3ObjectWorkerTest {
7276
private PluginMetrics pluginMetrics;
7377
@Mock
7478
private Counter s3ObjectsFailedCounter;
79+
@Mock
80+
private Counter s3ObjectsSucceededCounter;
81+
@Mock
82+
private Timer s3ObjectReadTimer;
7583

7684
private String bucketName;
7785
private String key;
7886
private ResponseInputStream<GetObjectResponse> objectInputStream;
7987

88+
private Exception exceptionThrownByCallable;
89+
8090
@BeforeEach
81-
void setUp() {
91+
void setUp() throws Exception {
8292
final Random random = new Random();
8393
bufferTimeout = Duration.ofMillis(random.nextInt(100) + 100);
8494
recordsToAccumulate = random.nextInt(10) + 2;
@@ -88,7 +98,21 @@ void setUp() {
8898
when(s3ObjectReference.getBucketName()).thenReturn(bucketName);
8999
when(s3ObjectReference.getKey()).thenReturn(key);
90100

101+
exceptionThrownByCallable = null;
102+
when(s3ObjectReadTimer.recordCallable(any(Callable.class)))
103+
.thenAnswer(a -> {
104+
try {
105+
a.getArgument(0, Callable.class).call();
106+
} catch (final Exception ex) {
107+
exceptionThrownByCallable = ex;
108+
throw ex;
109+
}
110+
return null;
111+
});
112+
91113
when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_FAILED_METRIC_NAME)).thenReturn(s3ObjectsFailedCounter);
114+
when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_SUCCEEDED_METRIC_NAME)).thenReturn(s3ObjectsSucceededCounter);
115+
when(pluginMetrics.timer(S3ObjectWorker.S3_OBJECTS_TIME_ELAPSED_METRIC_NAME)).thenReturn(s3ObjectReadTimer);
92116

93117
objectInputStream = mock(ResponseInputStream.class);
94118
}
@@ -189,6 +213,20 @@ void parseS3Object_calls_BufferAccumulator_flush_after_Codec_parse() throws Exce
189213
inOrder.verify(bufferAccumulator).flush();
190214
}
191215

216+
@Test
217+
void parseS3Object_increments_success_counter_after_parsing_S3_object() throws IOException {
218+
final ResponseInputStream<GetObjectResponse> objectInputStream = mock(ResponseInputStream.class);
219+
when(s3Client.getObject(any(GetObjectRequest.class)))
220+
.thenReturn(objectInputStream);
221+
222+
final S3ObjectWorker objectUnderTest = createObjectUnderTest();
223+
objectUnderTest.parseS3Object(s3ObjectReference);
224+
225+
verify(s3ObjectsSucceededCounter).increment();
226+
verifyNoInteractions(s3ObjectsFailedCounter);
227+
assertThat(exceptionThrownByCallable, nullValue());
228+
}
229+
192230
@Test
193231
void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_get_S3_object() {
194232
final RuntimeException expectedException = mock(RuntimeException.class);
@@ -201,10 +239,12 @@ void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_get_S3
201239
assertThat(actualException, sameInstance(expectedException));
202240

203241
verify(s3ObjectsFailedCounter).increment();
242+
verifyNoInteractions(s3ObjectsSucceededCounter);
243+
assertThat(exceptionThrownByCallable, sameInstance(expectedException));
204244
}
205245

206246
@Test
207-
void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_parse_S3_object() throws IOException {
247+
void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_to_parse_S3_object() throws IOException {
208248
when(compressionEngine.createInputStream(key, objectInputStream)).thenReturn(objectInputStream);
209249
when(s3Client.getObject(any(GetObjectRequest.class)))
210250
.thenReturn(objectInputStream);
@@ -218,5 +258,55 @@ void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_parse_
218258
assertThat(actualException, sameInstance(expectedException));
219259

220260
verify(s3ObjectsFailedCounter).increment();
261+
verifyNoInteractions(s3ObjectsSucceededCounter);
262+
assertThat(exceptionThrownByCallable, sameInstance(expectedException));
263+
}
264+
265+
@Test
266+
void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_to_GetObject_from_S3() {
267+
final RuntimeException expectedException = mock(RuntimeException.class);
268+
when(s3Client.getObject(any(GetObjectRequest.class)))
269+
.thenThrow(expectedException);
270+
271+
final S3ObjectWorker objectUnderTest = createObjectUnderTest();
272+
final RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference));
273+
274+
assertThat(actualException, sameInstance(expectedException));
275+
276+
verify(s3ObjectsFailedCounter).increment();
277+
verifyNoInteractions(s3ObjectsSucceededCounter);
278+
assertThat(exceptionThrownByCallable, sameInstance(expectedException));
279+
}
280+
281+
@Test
282+
void parseS3Object_throws_Exception_and_increments_failure_counter_when_CompressionEngine_fails() throws IOException {
283+
when(s3Client.getObject(any(GetObjectRequest.class)))
284+
.thenReturn(objectInputStream);
285+
final IOException expectedException = mock(IOException.class);
286+
when(compressionEngine.createInputStream(key, objectInputStream)).thenThrow(expectedException);
287+
288+
final S3ObjectWorker objectUnderTest = createObjectUnderTest();
289+
final IOException actualException = assertThrows(IOException.class, () -> objectUnderTest.parseS3Object(s3ObjectReference));
290+
291+
assertThat(actualException, sameInstance(expectedException));
292+
293+
verify(s3ObjectsFailedCounter).increment();
294+
verifyNoInteractions(s3ObjectsSucceededCounter);
295+
assertThat(exceptionThrownByCallable, sameInstance(expectedException));
296+
}
297+
298+
@Test
299+
void parseS3Object_calls_GetObject_after_Callable() throws Exception {
300+
final ResponseInputStream<GetObjectResponse> objectInputStream = mock(ResponseInputStream.class);
301+
when(s3Client.getObject(any(GetObjectRequest.class)))
302+
.thenReturn(objectInputStream);
303+
304+
final S3ObjectWorker objectUnderTest = createObjectUnderTest();
305+
objectUnderTest.parseS3Object(s3ObjectReference);
306+
307+
final InOrder inOrder = inOrder(s3ObjectReadTimer, s3Client);
308+
309+
inOrder.verify(s3ObjectReadTimer).recordCallable(any(Callable.class));
310+
inOrder.verify(s3Client).getObject(any(GetObjectRequest.class));
221311
}
222312
}

0 commit comments

Comments
 (0)