Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class LeaderOnlyTokenCrawler implements Crawler<PaginationCrawlerWorkerPr

private static final String METRIC_BATCHES_FAILED = "batchesFailed";
private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime";
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";

Expand All @@ -55,6 +57,8 @@ public class LeaderOnlyTokenCrawler implements Crawler<PaginationCrawlerWorkerPr
private final Counter acknowledgementSetSuccesses;
private final Counter acknowledgementSetFailures;
private final Timer bufferWriteTimer;
private final Timer partitionWaitTimeTimer;
private final Timer partitionProcessLatencyTimer;

private String lastToken;
private Duration noAckTimeout;
Expand All @@ -67,6 +71,8 @@ public LeaderOnlyTokenCrawler(
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
this.batchesFailedCounter = pluginMetrics.counter(METRIC_BATCHES_FAILED);
this.bufferWriteTimer = pluginMetrics.timer(METRIC_BUFFER_WRITE_TIME);
this.partitionWaitTimeTimer = pluginMetrics.timer(WORKER_PARTITION_WAIT_TIME);
this.partitionProcessLatencyTimer = pluginMetrics.timer(WORKER_PARTITION_PROCESS_LATENCY);
this.acknowledgementSetSuccesses = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME);
this.acknowledgementSetFailures = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME);
this.noAckTimeout = NO_ACK_TIME_OUT_SECONDS;
Expand Down Expand Up @@ -123,7 +129,8 @@ public Instant crawl(LeaderPartition leaderPartition,

@Override
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
partitionWaitTimeTimer.record(Duration.between(state.getExportStartTime(), Instant.now()));
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
}

private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Counter;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -42,6 +44,9 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.doNothing;
import static org.mockito.internal.verification.VerificationModeFactory.times;

@ExtendWith(MockitoExtension.class)
Expand All @@ -62,6 +67,8 @@ class LeaderOnlyTokenCrawlerTest {
private AcknowledgementSet acknowledgementSet;
@Mock
private Buffer<Record<Event>> buffer;
@Mock
private PaginationCrawlerWorkerProgressState workerState;

private LeaderOnlyTokenCrawler crawler;
private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler");
Expand Down Expand Up @@ -241,6 +248,50 @@ void testAcknowledgmentTimeout() {
verify(coordinator).createPartition(any());
}

@Test
public void testExecutePartitionMetrics() {
reset(leaderPartition);

// mock timers and counters
Timer mockCrawlingTimer = mock(Timer.class);
Timer partitionWaitTimeTimer = mock(Timer.class);
Timer partitionProcessLatencyTimer = mock(Timer.class);
Timer mockBufferWriteTimer = mock(Timer.class);
Counter mockBatchesFailedCounter = mock(Counter.class);
Counter mockAcknowledgementSetSuccesses = mock(Counter.class);
Counter mockAcknowledgementSetFailures = mock(Counter.class);

// setup mock plugin metrics
PluginMetrics mockPluginMetrics = mock(PluginMetrics.class);
when(mockPluginMetrics.timer("crawlingTime")).thenReturn(mockCrawlingTimer);
when(mockPluginMetrics.timer("WorkerPartitionWaitTime")).thenReturn(partitionWaitTimeTimer);
when(mockPluginMetrics.timer("WorkerPartitionProcessLatency")).thenReturn(partitionProcessLatencyTimer);
when(mockPluginMetrics.timer("bufferWriteTime")).thenReturn(mockBufferWriteTimer);
when(mockPluginMetrics.counter("batchesFailed")).thenReturn(mockBatchesFailedCounter);
when(mockPluginMetrics.counter("acknowledgementSetSuccesses")).thenReturn(mockAcknowledgementSetSuccesses);
when(mockPluginMetrics.counter("acknowledgementSetFailures")).thenReturn(mockAcknowledgementSetFailures);

LeaderOnlyTokenCrawler testCrawler = new LeaderOnlyTokenCrawler(client, mockPluginMetrics);

// test executePartition with metrics
when(workerState.getExportStartTime()).thenReturn(Instant.now().minusSeconds(1));

// make latency timer execute the runnable so client.executePartition() gets called
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(partitionProcessLatencyTimer).record(any(Runnable.class));

doNothing().when(partitionWaitTimeTimer).record(any(Duration.class));

testCrawler.executePartition(workerState, buffer, acknowledgementSet);

// verify metrics are recorded
verify(partitionProcessLatencyTimer).record(any(Runnable.class));
verify(partitionWaitTimeTimer).record(any(Duration.class));
verify(client).executePartition(workerState, buffer, acknowledgementSet);
}

private List<ItemInfo> createTestItems(int count) {
List<ItemInfo> items = new ArrayList<>();
Expand All @@ -249,4 +300,4 @@ private List<ItemInfo> createTestItems(int count) {
}
return items;
}
}
}