Skip to content

Commit 213da3c

Browse files
committed
Delay 5 minutes in DimensionalTimeSliceCrawler for partition creation on live event
This commit add 5 minutes delay to partition creation on live event in DimensionalTimeSliceCrawler. In general, newly generated events become queryable after 30 ~ 120 second. Delay 5 minutes give enough time for the newly generated events to become queryable to largely reduce the possibility of losing events due to eventual consistency in vender API side. Signed-off-by: Wenjie Yao <[email protected]>
1 parent 118c303 commit 213da3c

File tree

2 files changed

+145
-60
lines changed

2 files changed

+145
-60
lines changed

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawler.java

Lines changed: 47 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
@Named
3737
public class DimensionalTimeSliceCrawler implements Crawler<DimensionalTimeSliceWorkerProgressState> {
3838
private static final Logger log = LoggerFactory.getLogger(DimensionalTimeSliceCrawler.class);
39+
// delay five minutes for partition creation on latest time duration to ensure the newly generated events are queryable
40+
// In general, newly generated events become queryable after 30 ~ 120 second
41+
protected static final long WAIT_SECONDS_BEFORE_PARTITION_CREATION = 300;
3942
private static final String DIMENSIONAL_TIME_SLICE_WORKER_PARTITIONS_CREATED = "DimensionalTimeSliceWorkerPartitionsCreated";
4043
private static final String WORKER_PARTITION_WAIT_TIME = "WorkerPartitionWaitTime";
4144
private static final String WORKER_PARTITION_PROCESS_LATENCY = "WorkerPartitionProcessLatency";
@@ -73,10 +76,9 @@ public void initialize(List<String> dimensionTypes) {
7376
*/
7477
@Override
7578
public Instant crawl(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator) {
76-
Instant latestModifiedTime = Instant.now();
7779
double startCount = partitionsCreatedCounter.count();
7880

79-
createPartitionsForDimensionTypes(leaderPartition, coordinator, latestModifiedTime, dimensionTypes);
81+
Instant latestModifiedTime = createPartitions(leaderPartition, coordinator);
8082

8183
double partitionsInThisCrawl = partitionsCreatedCounter.count() - startCount;
8284
log.info("Total partitions created in this crawl: {}", partitionsInThisCrawl);
@@ -89,82 +91,85 @@ public void executePartition(DimensionalTimeSliceWorkerProgressState state, Buff
8991
partitionProcessLatencyTimer.record(() -> client.executePartition(state, buffer, acknowledgementSet));
9092
}
9193

92-
private void createPartitionsForDimensionTypes(LeaderPartition leaderPartition,
93-
EnhancedSourceCoordinator coordinator,
94-
Instant latestModifiedTime,
95-
List<String> dimensionTypes) {
94+
private Instant createPartitions(LeaderPartition leaderPartition,
95+
EnhancedSourceCoordinator coordinator) {
9696
DimensionalTimeSliceLeaderProgressState leaderProgressState =
9797
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
9898

9999
if (leaderProgressState.getRemainingHours() == 0) {
100-
createPartitionForIncrementalSync(leaderPartition, coordinator,
101-
latestModifiedTime, dimensionTypes);
100+
return createPartitionsForIncrementalSync(leaderPartition, coordinator);
102101
} else {
103-
createPartitionForHistoricalPull(leaderPartition, coordinator,
104-
latestModifiedTime, dimensionTypes);
102+
return createPartitionsForHistoricalPull(leaderPartition, coordinator);
105103
}
106104
}
107105

108106
/**
109107
* Creates partitions for historical data pull. Creates hourly partitions
110108
* for each dimension type, working backwards from the current time.
111109
*/
112-
private void createPartitionForHistoricalPull(LeaderPartition leaderPartition,
113-
EnhancedSourceCoordinator coordinator,
114-
Instant latestModifiedTime,
115-
List<String> dimensionTypes) {
110+
private Instant createPartitionsForHistoricalPull(LeaderPartition leaderPartition,
111+
EnhancedSourceCoordinator coordinator) {
116112
DimensionalTimeSliceLeaderProgressState leaderProgressState =
117113
(DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get();
118114
int remainingHours = leaderProgressState.getRemainingHours();
119115
Instant initialTime = leaderProgressState.getLastPollTime();
120-
Instant nowUtc = initialTime.truncatedTo(ChronoUnit.HOURS);
121-
for (int i = remainingHours; i > 0; i-- ) {
122-
Instant startTime = nowUtc.minus(Duration.ofHours(i));;
116+
Instant latestHour = initialTime.truncatedTo(ChronoUnit.HOURS);
117+
for (int i = remainingHours; i > 1; i--) {
118+
Instant startTime = latestHour.minus(Duration.ofHours(i));
123119
Instant endTime = startTime.plus(HOUR_DURATION);
124120

125-
for (String dimensionType : dimensionTypes) {
126-
createWorkerPartition(startTime, endTime, dimensionType, coordinator);
127-
}
121+
createWorkerPartitionsForDimensionTypes(startTime, endTime, coordinator);
128122
}
129123

130-
// Create final partitions from last hour to now
131-
for (String dimensionType : dimensionTypes) {
132-
createWorkerPartition(nowUtc, latestModifiedTime, dimensionType, coordinator);
124+
Instant latestModifiedTime = initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
125+
if (latestModifiedTime.isAfter(latestHour)) {
126+
// if checkpointing time is after the latest hour, creat one partition for last hour
127+
// and one from latest hour to checkpointing time
128+
createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestHour, coordinator);
129+
createWorkerPartitionsForDimensionTypes(latestHour, latestModifiedTime, coordinator);
130+
} else {
131+
// if checkpointing time is not later than the latest hour, create one partition from 1 hour ago to checkpointing time
132+
createWorkerPartitionsForDimensionTypes(latestHour.minus(Duration.ofHours(1)), latestModifiedTime, coordinator);
133133
}
134134

135135
updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);
136+
137+
return latestModifiedTime;
136138
}
137139

138140
/**
139141
* Creates partitions for incremental sync. Creates one partition per dimension type
140142
* from the last poll time to current time.
141143
*/
142-
private void createPartitionForIncrementalSync(LeaderPartition leaderPartition,
143-
EnhancedSourceCoordinator coordinator,
144-
Instant latestModifiedTime,
145-
List<String> dimensionTypes) {
144+
private Instant createPartitionsForIncrementalSync(LeaderPartition leaderPartition,
145+
EnhancedSourceCoordinator coordinator) {
146+
Instant latestModifiedTime = Instant.now().minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION);
146147
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
147148
Instant lastPollTime = leaderProgressState.getLastPollTime();
148149

149-
// Create one partition from lastPollTime to latestModifiedTime for each type
150-
for (String dimensionType : dimensionTypes) {
151-
createWorkerPartition(lastPollTime, latestModifiedTime, dimensionType, coordinator);
150+
if (lastPollTime.isBefore(latestModifiedTime)) {
151+
// Create one partition from lastPollTime to latestModifiedTime for each type
152+
createWorkerPartitionsForDimensionTypes(lastPollTime, latestModifiedTime, coordinator);
153+
154+
updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);
155+
return latestModifiedTime;
152156
}
153157

154-
updateLeaderProgressState(leaderPartition, 0, latestModifiedTime, coordinator);
158+
return lastPollTime;
155159
}
156160

157-
void createWorkerPartition(Instant startTime, Instant endTime,
158-
String dimensionType, EnhancedSourceCoordinator coordinator) {
159-
DimensionalTimeSliceWorkerProgressState workerState = new DimensionalTimeSliceWorkerProgressState();
160-
workerState.setPartitionCreationTime(Instant.now());
161-
workerState.setStartTime(startTime);
162-
workerState.setEndTime(endTime);
163-
workerState.setDimensionType(dimensionType);
164-
165-
SaasSourcePartition partition = new SaasSourcePartition(workerState, LAST_UPDATED_KEY + UUID.randomUUID());
166-
coordinator.createPartition(partition);
167-
partitionsCreatedCounter.increment();
161+
void createWorkerPartitionsForDimensionTypes(Instant startTime, Instant endTime, EnhancedSourceCoordinator coordinator) {
162+
for (String dimensionType : dimensionTypes) {
163+
DimensionalTimeSliceWorkerProgressState workerState = new DimensionalTimeSliceWorkerProgressState();
164+
workerState.setPartitionCreationTime(Instant.now());
165+
workerState.setStartTime(startTime);
166+
workerState.setEndTime(endTime);
167+
workerState.setDimensionType(dimensionType);
168+
169+
SaasSourcePartition partition = new SaasSourcePartition(workerState, LAST_UPDATED_KEY + UUID.randomUUID());
170+
coordinator.createPartition(partition);
171+
partitionsCreatedCounter.increment();
172+
}
168173
}
169174

170175
/**

data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/DimensionalTimeSliceCrawlerTest.java

Lines changed: 98 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Arrays;
2828
import java.util.List;
2929

30+
import static org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.WAIT_SECONDS_BEFORE_PARTITION_CREATION;
3031
import static org.junit.jupiter.api.Assertions.assertEquals;
3132
import static org.junit.jupiter.api.Assertions.assertNotNull;
3233
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,6 +37,7 @@
3637
import static org.mockito.Mockito.atLeastOnce;
3738
import static org.mockito.Mockito.mock;
3839
import static org.mockito.Mockito.times;
40+
import static org.mockito.Mockito.never;
3941
import static org.mockito.Mockito.verify;
4042
import static org.mockito.Mockito.when;
4143
import static org.mockito.Mockito.doAnswer;
@@ -80,8 +82,8 @@ void setUp() {
8082
}
8183

8284
@Test
83-
void testCrawl_withIncrementalSync() {
84-
Instant lastPollTime = Instant.now().minus(Duration.ofHours(1));
85+
void testCrawl_withIncrementalSync_lastModificationTimeBefore5MinutesAgo() {
86+
Instant lastPollTime = Instant.now().minusSeconds(400);
8587
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0);
8688
LeaderPartition leaderPartition = new LeaderPartition(state);
8789

@@ -105,10 +107,64 @@ void testCrawl_withIncrementalSync() {
105107
}
106108

107109
@Test
108-
void testCrawl_withHistoricalSync() {
109-
Instant now = Instant.now().truncatedTo(ChronoUnit.HOURS);
110-
int lookbackHours = 3;
111-
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(now, lookbackHours);
110+
void testCrawl_withIncrementalSync_lastModificationTimeAfter5MinutesAgo() {
111+
Instant lastPollTime = Instant.now().minusSeconds(10);
112+
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(lastPollTime, 0);
113+
LeaderPartition leaderPartition = new LeaderPartition(state);
114+
115+
Instant latest = crawler.crawl(leaderPartition, coordinator);
116+
117+
assertNotNull(latest);
118+
verify(coordinator, never()).createPartition(partitionCaptor.capture());
119+
verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any());
120+
verify(partitionsCreatedCounter, never()).increment();
121+
}
122+
123+
@Test
124+
void testCrawl_withHistoricalSync_initialTimeInTheFirst5MinutesOfTheHOur() {
125+
Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS);
126+
Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION -1);
127+
int lookbackHours = 2;
128+
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours);
129+
LeaderPartition leaderPartition = new LeaderPartition(state);
130+
131+
Instant latest = crawler.crawl(leaderPartition, coordinator);
132+
133+
assertNotNull(latest);
134+
// Expecting (lookbackHours + 1) * LOG_TYPES.size() partitions
135+
int expectedPartitions = (lookbackHours) * LOG_TYPES.size();
136+
verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture());
137+
verify(coordinator, atLeastOnce()).saveProgressStateForPartition(eq(leaderPartition), any());
138+
verify(partitionsCreatedCounter, times(expectedPartitions)).increment();
139+
140+
List<SaasSourcePartition> createdPartitions = partitionCaptor.getAllValues();
141+
assertEquals(expectedPartitions, createdPartitions.size());
142+
143+
// Verify first hour's partitions
144+
for (int i = 0; i < LOG_TYPES.size(); i++) {
145+
DimensionalTimeSliceWorkerProgressState workerState =
146+
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
147+
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime());
148+
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime());
149+
assertEquals(LOG_TYPES.get(i), workerState.getDimensionType());
150+
}
151+
152+
// Verify previous hour's partitions
153+
for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * 2; i++) {
154+
DimensionalTimeSliceWorkerProgressState workerState =
155+
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
156+
assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime());
157+
assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime());
158+
assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType());
159+
}
160+
}
161+
162+
@Test
163+
void testCrawl_withHistoricalSync_initialTimeNotInTheFirst5MinutesOfTheHOur() {
164+
Instant latestHour = Instant.now().truncatedTo(ChronoUnit.HOURS);
165+
Instant initialTime = latestHour.plusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION + 1);
166+
int lookbackHours = 2;
167+
DimensionalTimeSliceLeaderProgressState state = new DimensionalTimeSliceLeaderProgressState(initialTime, lookbackHours);
112168
LeaderPartition leaderPartition = new LeaderPartition(state);
113169

114170
Instant latest = crawler.crawl(leaderPartition, coordinator);
@@ -127,29 +183,53 @@ void testCrawl_withHistoricalSync() {
127183
for (int i = 0; i < LOG_TYPES.size(); i++) {
128184
DimensionalTimeSliceWorkerProgressState workerState =
129185
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
130-
assertEquals(now.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime());
131-
assertEquals(now.minus(Duration.ofHours(lookbackHours-1)), workerState.getEndTime());
186+
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours)), workerState.getStartTime());
187+
assertEquals(latestHour.minus(Duration.ofHours(lookbackHours - 1)), workerState.getEndTime());
132188
assertEquals(LOG_TYPES.get(i), workerState.getDimensionType());
133189
}
190+
191+
// Verify previous hour's partitions
192+
for (int i = LOG_TYPES.size(); i < LOG_TYPES.size() * 2; i++) {
193+
DimensionalTimeSliceWorkerProgressState workerState =
194+
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
195+
assertEquals(latestHour.minus(Duration.ofHours(1)), workerState.getStartTime());
196+
assertEquals(latestHour, workerState.getEndTime());
197+
assertEquals(LOG_TYPES.get(i - LOG_TYPES.size()), workerState.getDimensionType());
198+
}
199+
200+
// Verify latest hour's partitions
201+
for (int i = LOG_TYPES.size() * 2; i < LOG_TYPES.size() * 3; i++) {
202+
DimensionalTimeSliceWorkerProgressState workerState =
203+
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
204+
assertEquals(latestHour, workerState.getStartTime());
205+
assertEquals(initialTime.minusSeconds(WAIT_SECONDS_BEFORE_PARTITION_CREATION), workerState.getEndTime());
206+
assertEquals(LOG_TYPES.get(i - LOG_TYPES.size() * 2), workerState.getDimensionType());
207+
}
134208
}
135209

136210
@Test
137-
void testCreateWorkerPartition() {
211+
void createWorkerPartitionsForDimensionTypes() {
138212
Instant start = Instant.parse("2024-10-30T00:00:00Z");
139213
Instant end = start.plus(Duration.ofHours(1));
140214
String logType = "Exchange";
141215

142-
crawler.createWorkerPartition(start, end, logType, coordinator);
216+
crawler.createWorkerPartitionsForDimensionTypes(start, end, coordinator);
143217

144-
verify(coordinator).createPartition(partitionCaptor.capture());
145-
verify(partitionsCreatedCounter).increment();
218+
int expectedPartitions = LOG_TYPES.size();
219+
verify(coordinator, times(expectedPartitions)).createPartition(partitionCaptor.capture());
220+
verify(partitionsCreatedCounter, times(expectedPartitions)).increment();
221+
222+
List<SaasSourcePartition> createdPartitions = partitionCaptor.getAllValues();
223+
assertEquals(expectedPartitions, createdPartitions.size());
146224

147-
SaasSourcePartition partition = partitionCaptor.getValue();
148-
DimensionalTimeSliceWorkerProgressState state =
149-
(DimensionalTimeSliceWorkerProgressState) partition.getProgressState().get();
150-
assertEquals(start, state.getStartTime());
151-
assertEquals(end, state.getEndTime());
152-
assertEquals(logType, state.getDimensionType());
225+
// Verify first hour's partitions
226+
for (int i = 0; i < LOG_TYPES.size(); i++) {
227+
DimensionalTimeSliceWorkerProgressState workerState =
228+
(DimensionalTimeSliceWorkerProgressState) createdPartitions.get(i).getProgressState().get();
229+
assertEquals(start, workerState.getStartTime());
230+
assertEquals(end, workerState.getEndTime());
231+
assertEquals(LOG_TYPES.get(i), workerState.getDimensionType());
232+
}
153233
}
154234

155235
@Test

0 commit comments

Comments
 (0)