diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 7108503f75..46a32f89d1 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -40,6 +40,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.regions.Region; @@ -50,6 +51,8 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.lenient; @@ -88,6 +91,9 @@ public class CloudWatchLogsIT { @Mock private ThresholdConfig thresholdConfig; + @Mock + private EventHandle eventHandle; + @Mock private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; @@ -127,6 +133,7 @@ void setUp() { requestsFailedCount = new AtomicInteger(0); dlqSuccessCount = new AtomicInteger(0); objectMapper = new ObjectMapper(); + eventHandle = mock(EventHandle.class); pluginSetting = mock(PluginSetting.class); pluginFactory = mock(PluginFactory.class); when(pluginSetting.getPipelineName()).thenReturn("pipeline"); @@ -208,6 +215,7 @@ void setUp() { when(thresholdConfig.getFlushInterval()).thenReturn(60L); when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); + when(cloudWatchLogsSinkConfig.getWorkers()).thenReturn(3); } private List listObjectsWithPrefix(String bucketName, String prefix) { @@ -297,7 +305,7 @@ void TestSinkOperationWithLogSendInterval() throws Exception { assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); assertThat(requestsSuccessCount.get(), equalTo(1)); assertThat(dlqSuccessCount.get(), equalTo(0)); - + verify(eventHandle, times(NUM_RECORDS)).release(true); } @Test @@ -333,6 +341,7 @@ void TestSinkOperationWithBatchSize() throws Exception { assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); assertThat(requestsSuccessCount.get(), equalTo(NUM_RECORDS)); assertThat(dlqSuccessCount.get(), equalTo(0)); + verify(eventHandle, times(NUM_RECORDS)).release(true); } @@ -368,6 +377,7 @@ void TestSinkOperationWithMaxRequestSize() throws Exception { assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); assertThat(requestsSuccessCount.get(), equalTo(1)); assertThat(dlqSuccessCount.get(), equalTo(0)); + verify(eventHandle, times(NUM_RECORDS)).release(true); } @@ -423,6 +433,7 @@ void testWithLargeSingleMessagesSentToDLQ() { assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); assertThat(requestsSuccessCount.get(), equalTo(1)); assertThat(dlqSuccessCount.get(), equalTo(1)); + verify(eventHandle, times(NUM_RECORDS+1)).release(true); } @@ -461,6 +472,7 @@ void testWithBadCredentials_AllEventsToDLQ() { assertThat(eventsSuccessCount.get(), equalTo(0)); assertThat(requestsSuccessCount.get(), equalTo(0)); assertThat(dlqSuccessCount.get(), equalTo(NUM_RECORDS)); + verify(eventHandle, times(NUM_RECORDS)).release(true); } @@ -468,14 +480,20 @@ private Collection> getRecordList(int numberOfRecords) { final Collection> recordList = new ArrayList<>(); List records = generateRecords(numberOfRecords); for (int i = 0; i < numberOfRecords; i++) { - final Event event = JacksonLog.builder().withData(records.get(i)).build(); + final Event event = JacksonLog.builder() + .withData(records.get(i)) + .withEventHandle(eventHandle) + .build(); recordList.add(new Record<>(event)); } return recordList; } private Record getLargeRecord(int size) { - final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic(size))).build(); + final Event event = JacksonLog.builder() + .withData(Map.of("key", RandomStringUtils.randomAlphabetic(size))) + .withEventHandle(eventHandle) + .build(); return new Record<>(event); }