1212import org .opensearch .dataprepper .plugins .source .source_crawler .base .Crawler ;
1313import org .opensearch .dataprepper .plugins .source .source_crawler .base .CrawlerSourceConfig ;
1414import org .opensearch .dataprepper .plugins .source .source_crawler .base .SaasWorkerProgressState ;
15+ import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .DimensionalTimeSliceWorkerProgressState ;
1516import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .partition .SaasSourcePartition ;
17+ import org .opensearch .dataprepper .plugins .source .source_crawler .exception .SaaSCrawlerException ;
1618
1719import com .google .common .annotations .VisibleForTesting ;
1820import org .slf4j .Logger ;
1921import org .slf4j .LoggerFactory ;
2022
2123import java .time .Duration ;
2224import java .util .Optional ;
25+ import java .time .Instant ;
2326
2427/**
2528 * Worker class for executing the partitioned work created while crawling a source.
@@ -75,10 +78,10 @@ public void run() {
7578 log .info ("Worker thread started" );
7679 log .info ("Processing Partitions" );
7780 while (!Thread .currentThread ().isInterrupted ()) {
81+ Optional <EnhancedSourcePartition > partition = Optional .empty ();
7882 try {
7983 // Get the next available partition from the coordinator
80- Optional <EnhancedSourcePartition > partition =
81- sourceCoordinator .acquireAvailablePartition (SaasSourcePartition .PARTITION_TYPE );
84+ partition = sourceCoordinator .acquireAvailablePartition (SaasSourcePartition .PARTITION_TYPE );
8285 if (partition .isPresent ()) {
8386 // Process the partition (source extraction logic)
8487 processPartition (partition .get (), buffer );
@@ -94,8 +97,18 @@ public void run() {
9497 }
9598 }
9699 } catch (Exception e ) {
97- // TODO: will be in a followup to handle retry strategy differently for non-retryable exceptions
98- backoffRetry (e );
100+ this .parititionsFailedCounter .increment ();
101+ // always default to backoffRetry strategy
102+ boolean shouldUseBackoffRetry = true ;
103+ if (e instanceof SaaSCrawlerException ) {
104+ SaaSCrawlerException saasException = (SaaSCrawlerException ) e ;
105+ if (!saasException .isRetryable ()) {
106+ shouldUseBackoffRetry = delayRetry (partition , e );
107+ }
108+ }
109+ if (shouldUseBackoffRetry ) {
110+ backoffRetry (e );
111+ }
99112 }
100113 }
101114 log .warn ("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered" );
@@ -106,15 +119,67 @@ public void run() {
106119 * @param e - exception thrown by workerScheduler
107120 */
108121 private void backoffRetry (Exception e ) {
109- this .parititionsFailedCounter .increment ();
110- log .error ("Error processing partition" , e );
122+ log .error ("[Retryable Exception] Error processing partition" , e );
111123 try {
112124 Thread .sleep (RETRY_BACKOFF_ON_EXCEPTION_MILLIS );
113125 } catch (InterruptedException ex ) {
114126 log .warn ("Thread interrupted while waiting to retry due to {}" , ex .getMessage ());
115127 }
116128 }
117129
130+ /**
131+ * Delay retry by X Duration (current default = 1 day) for all non-retryble exceptions up to X days (current default = 30 days)
132+ * @param sourcePartition - information on WorkerPartition state
133+ * @param ex - exception thrown by workerScheduler
134+ * @return boolean: true if we should fallback to backoffRetry
135+ */
136+ private boolean delayRetry (Optional <EnhancedSourcePartition > sourcePartition , Exception ex ) {
137+ log .error ("[Non-Retryable Exception] Error processing worker partition. Will delay retry with the configured duration" , ex );
138+ try {
139+ SaasSourcePartition workerPartition = (SaasSourcePartition ) sourcePartition .get ();
140+ boolean isWorkerPartitionLeaseExtended = false ;
141+ if (workerPartition != null ) {
142+ SaasWorkerProgressState progressState = (SaasWorkerProgressState ) workerPartition .getProgressState ().get ();
143+ // TODO: ideally we should add partitionCreationTime for all type of SaasWorkerProgressState
144+ if (progressState instanceof DimensionalTimeSliceWorkerProgressState ) {
145+ DimensionalTimeSliceWorkerProgressState workerProgressState = (DimensionalTimeSliceWorkerProgressState ) progressState ;
146+ updateWorkerPartition (workerProgressState .getPartitionCreationTime (), workerPartition );
147+ isWorkerPartitionLeaseExtended = true ;
148+ }
149+ }
150+
151+ // other SaasWorkerProgressState types (not DimensionalTimeSliceWorkerProgressState) should never use delayRetry()
152+ // to be safe, fallback to default retry strategy
153+ if (!isWorkerPartitionLeaseExtended ) {
154+ return true ;
155+ }
156+ return false ;
157+ } catch (Exception e ) {
158+ log .error ("Error updating workerPartition " , e );
159+ // on exception, do not interrupt thread and retry again
160+ return false ;
161+ }
162+ }
163+
164+
165+ /**
166+ * Update the workerPartition if the partitionCreationTime <= max days to keep retrying (current default = 30 days) on nonretryable exceptions.
167+ * Otherwise, give up the workerPartition.
168+ * @param partitionCreationTime - timestamp in epoch when the worker partition was first created
169+ * @param workerPartition - information on WorkerPartition state
170+ */
171+ private void updateWorkerPartition (Instant partitionCreationTime , SaasSourcePartition workerPartition ) {
172+ log .info ("Updating workerPartition {}" , workerPartition .getPartitionKey ());
173+ Duration age = Duration .between (partitionCreationTime , Instant .now ());
174+ if (age .compareTo (this .sourceConfig .getDurationToGiveUpRetry ()) <= 0 ) {
175+ log .info ("Partition {} is within or equal to the configured max duration, scheduling retry" , workerPartition .getPartitionKey ());
176+ sourceCoordinator .saveProgressStateForPartition (workerPartition , this .sourceConfig .getDurationToDelayRetry ());
177+ } else {
178+ log .info ("Partition {} is older than the configured max duration, giving up" , workerPartition .getPartitionKey ());
179+ sourceCoordinator .giveUpPartition (workerPartition );
180+ }
181+ }
182+
118183 private void processPartition (EnhancedSourcePartition partition , Buffer <Record <Event >> buffer ) {
119184 // Implement your source extraction logic here
120185 // Update the partition state or commit the partition as needed
0 commit comments