@@ -5,7 +5,7 @@ import type { StorageType } from '@rudderstack/analytics-js-common/types/Storage
5
5
import type { Nullable } from '@rudderstack/analytics-js-common/types/Nullable' ;
6
6
import type { ILogger } from '@rudderstack/analytics-js-common/types/Logger' ;
7
7
import type { BatchOpts , QueueOpts } from '@rudderstack/analytics-js-common/types/LoadOptions' ;
8
- import { isDefined } from '@rudderstack/analytics-js-common/utilities/checks' ;
8
+ import { isDefined , isNullOrUndefined } from '@rudderstack/analytics-js-common/utilities/checks' ;
9
9
import { LOCAL_STORAGE } from '@rudderstack/analytics-js-common/constants/storages' ;
10
10
import { generateUUID } from '@rudderstack/analytics-js-common/utilities/uuId' ;
11
11
import type {
@@ -32,6 +32,8 @@ import {
32
32
DEFAULT_RECLAIM_TIMEOUT_MS ,
33
33
DEFAULT_RECLAIM_WAIT_MS ,
34
34
DEFAULT_BATCH_FLUSH_INTERVAL_MS ,
35
+ MIN_TIMER_SCALE_FACTOR ,
36
+ MAX_TIMER_SCALE_FACTOR ,
35
37
} from './constants' ;
36
38
37
39
const sortByTime = ( a : QueueItem , b : QueueItem ) => a . time - b . time ;
@@ -64,6 +66,8 @@ class RetryQueue implements IQueue<QueueItemData> {
64
66
flushBatchTaskId ?: string ;
65
67
batchingInProgress ?: boolean ;
66
68
batchSizeCalcCb ?: QueueBatchItemsSizeCalculatorCallback < QueueItemData > ;
69
+ reclaimStartVal ?: Nullable < string > ;
70
+ reclaimEndVal ?: Nullable < string > ;
67
71
68
72
constructor (
69
73
name : string ,
@@ -95,12 +99,21 @@ class RetryQueue implements IQueue<QueueItemData> {
95
99
jitter : options . backoffJitter || DEFAULT_BACKOFF_JITTER ,
96
100
} ;
97
101
102
+ // Limit the timer scale factor to the minimum value
103
+ let timerScaleFactor = Math . max (
104
+ options . timerScaleFactor ?? MIN_TIMER_SCALE_FACTOR ,
105
+ MIN_TIMER_SCALE_FACTOR ,
106
+ ) ;
107
+
108
+ // Limit the timer scale factor to the maximum value
109
+ timerScaleFactor = Math . min ( timerScaleFactor , MAX_TIMER_SCALE_FACTOR ) ;
110
+
98
111
// painstakingly tuned. that's why they're not "easily" configurable
99
112
this . timeouts = {
100
- ackTimer : DEFAULT_ACK_TIMER_MS ,
101
- reclaimTimer : DEFAULT_RECLAIM_TIMER_MS ,
102
- reclaimTimeout : DEFAULT_RECLAIM_TIMEOUT_MS ,
103
- reclaimWait : DEFAULT_RECLAIM_WAIT_MS ,
113
+ ackTimer : Math . round ( timerScaleFactor * DEFAULT_ACK_TIMER_MS ) ,
114
+ reclaimTimer : Math . round ( timerScaleFactor * DEFAULT_RECLAIM_TIMER_MS ) ,
115
+ reclaimTimeout : Math . round ( timerScaleFactor * DEFAULT_RECLAIM_TIMEOUT_MS ) ,
116
+ reclaimWait : Math . round ( timerScaleFactor * DEFAULT_RECLAIM_WAIT_MS ) ,
104
117
} ;
105
118
106
119
this . schedule = new Schedule ( ) ;
@@ -165,17 +178,21 @@ class RetryQueue implements IQueue<QueueItemData> {
165
178
}
166
179
167
180
getStorageEntry (
168
- name ? : string ,
181
+ name : string ,
169
182
) : Nullable < QueueItem < QueueItemData > [ ] | Record < string , any > | number > {
170
- return this . store . get ( name ?? this . name ) ;
183
+ return this . store . get ( name ) ;
171
184
}
172
185
173
186
// TODO: fix the type of different queues to be the same if possible
174
187
setStorageEntry (
175
- name ? : string ,
188
+ name : string ,
176
189
value ?: Nullable < QueueItem < QueueItemData > [ ] | Record < string , any > > | number ,
177
190
) {
178
- this . store . set ( name ?? this . name , value ?? [ ] ) ;
191
+ if ( isNullOrUndefined ( value ) ) {
192
+ this . store . remove ( name ) ;
193
+ } else {
194
+ this . store . set ( name , value ) ;
195
+ }
179
196
}
180
197
181
198
/**
@@ -305,7 +322,7 @@ class RetryQueue implements IQueue<QueueItemData> {
305
322
batchQueue = batchQueue . slice ( - batchQueue . length ) ;
306
323
batchQueue . push ( entry ) ;
307
324
308
- const batchDispatchInfo = this . getBatchDispInfo ( batchQueue ) ;
325
+ const batchDispatchInfo = this . getBatchDispatchInfo ( batchQueue ) ;
309
326
// if batch criteria is met, queue the batch events to the main queue and clear batch queue
310
327
if ( batchDispatchInfo . criteriaMet || batchDispatchInfo . criteriaExceeded ) {
311
328
let batchItems ;
@@ -399,7 +416,7 @@ class RetryQueue implements IQueue<QueueItemData> {
399
416
* @param batchItems Prospective batch items
400
417
* @returns Batch dispatch info
401
418
*/
402
- getBatchDispInfo ( batchItems : QueueItem [ ] ) {
419
+ getBatchDispatchInfo ( batchItems : QueueItem [ ] ) {
403
420
let lengthCriteriaMet = false ;
404
421
let lengthCriteriaExceeded = false ;
405
422
const configuredBatchMaxItems = this . batch ?. maxItems as number ;
@@ -520,8 +537,17 @@ class RetryQueue implements IQueue<QueueItemData> {
520
537
// Ack continuously to prevent other tabs from claiming our queue
521
538
ack ( ) {
522
539
this . setStorageEntry ( QueueStatuses . ACK , this . schedule . now ( ) ) ;
523
- this . setStorageEntry ( QueueStatuses . RECLAIM_START , null ) ;
524
- this . setStorageEntry ( QueueStatuses . RECLAIM_END , null ) ;
540
+
541
+ if ( this . reclaimStartVal != null ) {
542
+ this . reclaimStartVal = null ;
543
+ this . setStorageEntry ( QueueStatuses . RECLAIM_START , null ) ;
544
+ }
545
+
546
+ if ( this . reclaimEndVal != null ) {
547
+ this . reclaimEndVal = null ;
548
+ this . setStorageEntry ( QueueStatuses . RECLAIM_END , null ) ;
549
+ }
550
+
525
551
this . schedule . run ( this . ack , this . timeouts . ackTimer , ScheduleModes . ASAP ) ;
526
552
}
527
553
@@ -609,9 +635,10 @@ class RetryQueue implements IQueue<QueueItemData> {
609
635
610
636
removeStorageEntry ( store : IStore , entryIdx : number , backoff : number , attempt = 1 ) {
611
637
const maxAttempts = 2 ;
638
+ const queueEntryKeys = Object . keys ( QueueStatuses ) ;
639
+ const entry = QueueStatuses [ queueEntryKeys [ entryIdx ] as keyof typeof QueueStatuses ] ;
640
+
612
641
( globalThis as typeof window ) . setTimeout ( ( ) => {
613
- const queueEntryKeys = Object . keys ( QueueStatuses ) ;
614
- const entry = QueueStatuses [ queueEntryKeys [ entryIdx ] as keyof typeof QueueStatuses ] ;
615
642
try {
616
643
store . remove ( entry ) ;
617
644
@@ -633,7 +660,7 @@ class RetryQueue implements IQueue<QueueItemData> {
633
660
this . logger ?. error ( RETRY_QUEUE_ENTRY_REMOVE_ERROR ( RETRY_QUEUE , entry , attempt ) , err ) ;
634
661
}
635
662
636
- // clear the next entry
663
+ // clear the next entry after we've exhausted our attempts
637
664
if ( attempt === maxAttempts && entryIdx + 1 < queueEntryKeys . length ) {
638
665
this . removeStorageEntry ( store , entryIdx + 1 , backoff ) ;
639
666
}
@@ -678,45 +705,31 @@ class RetryQueue implements IQueue<QueueItemData> {
678
705
} ;
679
706
const findOtherQueues = ( name : string ) : IStore [ ] => {
680
707
const res : IStore [ ] = [ ] ;
681
- const storage = this . store . getOriginalEngine ( ) ;
682
-
683
- for ( let i = 0 ; i < storage . length ; i ++ ) {
684
- const k = storage . key ( i ) ;
685
- const parts : string [ ] = k ? k . split ( '.' ) : [ ] ;
686
-
687
- if ( parts . length !== 3 ) {
688
- // eslint-disable-next-line no-continue
689
- continue ;
690
- }
691
-
692
- if ( parts [ 0 ] !== name ) {
693
- // eslint-disable-next-line no-continue
694
- continue ;
695
- }
696
-
697
- if ( parts [ 2 ] !== QueueStatuses . ACK ) {
698
- // eslint-disable-next-line no-continue
699
- continue ;
708
+ const storageKeys = this . store . getOriginalEngine ( ) . keys ( ) ;
709
+ storageKeys . forEach ( ( k : string ) => {
710
+ const keyParts : string [ ] = k ? k . split ( '.' ) : [ ] ;
711
+
712
+ if (
713
+ keyParts . length >= 3 &&
714
+ keyParts [ 0 ] === name &&
715
+ keyParts [ 1 ] !== this . id &&
716
+ keyParts [ 2 ] === QueueStatuses . ACK
717
+ ) {
718
+ res . push (
719
+ this . storeManager . setStore ( {
720
+ id : keyParts [ 1 ] as string ,
721
+ name,
722
+ validKeys : QueueStatuses ,
723
+ type : LOCAL_STORAGE ,
724
+ } ) ,
725
+ ) ;
700
726
}
701
-
702
- res . push (
703
- this . storeManager . setStore ( {
704
- id : parts [ 1 ] as string ,
705
- name,
706
- validKeys : QueueStatuses ,
707
- type : LOCAL_STORAGE ,
708
- } ) ,
709
- ) ;
710
- }
727
+ } ) ;
711
728
712
729
return res ;
713
730
} ;
714
731
715
732
findOtherQueues ( this . name ) . forEach ( store => {
716
- if ( store . id === this . id ) {
717
- return ;
718
- }
719
-
720
733
if ( this . schedule . now ( ) - store . get ( QueueStatuses . ACK ) < this . timeouts . reclaimTimeout ) {
721
734
return ;
722
735
}
0 commit comments