@@ -28,6 +28,18 @@ import delay from 'delay';
2828
2929const { generateTxAlonzo, mockChainHistoryProvider, queryTransactionsResult, queryTransactionsResult2 } = mockProviders ;
3030
31+ const updateTransactionsBlockNo = ( transactions : Cardano . HydratedTx [ ] , blockNo = Cardano . BlockNo ( 10_050 ) ) =>
32+ transactions . map ( ( tx ) => ( {
33+ ...tx ,
34+ blockHeader : { ...tx . blockHeader , blockNo }
35+ } ) ) ;
36+
37+ const updateTransactionIds = ( transactions : Cardano . HydratedTx [ ] , tailPattern = 'aaa' ) =>
38+ transactions . map ( ( tx ) => ( {
39+ ...tx ,
40+ id : Cardano . TransactionId ( `${ tx . id . slice ( 0 , - tailPattern . length ) } ${ tailPattern } ` )
41+ } ) ) ;
42+
3143describe ( 'TransactionsTracker' , ( ) => {
3244 const logger = dummyLogger ;
3345
@@ -287,34 +299,239 @@ describe('TransactionsTracker', () => {
287299
288300 // latestStoredBlock <1 2 3>
289301 // newBlock <4 5 6>
290- // rollback$ [1 2 3 ] - transactions need to be retried
302+ // rollback$ [3 2 1 ] - transactions need to be retried
291303 // store&emit [4 5 6]
292- it . todo ( 'rolls back all transactions on completely disjoin sets' ) ;
304+ it ( 'rolls back all transactions on completely disjoin sets' , async ( ) => {
305+ const [ txId1 , txId2 , txId3 ] = updateTransactionsBlockNo ( queryTransactionsResult2 . pageResults ) ;
306+ const [ txId4 , txId5 , txId6 ] = updateTransactionIds ( [ txId1 , txId2 , txId3 ] ) ;
307+
308+ await firstValueFrom ( store . setAll ( [ txId1 , txId2 , txId3 ] ) ) ;
309+
310+ chainHistoryProvider . transactionsByAddresses = jest . fn ( ( ) => ( {
311+ pageResults : [ txId4 , txId5 , txId6 ] ,
312+ totalResultCount : 3
313+ } ) ) ;
314+
315+ const { transactionsSource$ : provider$ , rollback$ } = createAddressTransactionsProvider ( {
316+ addresses$ : of ( addresses ) ,
317+ chainHistoryProvider,
318+ logger,
319+ retryBackoffConfig,
320+ store,
321+ tipBlockHeight$
322+ } ) ;
323+
324+ const rollbacks : Cardano . HydratedTx [ ] = [ ] ;
325+ rollback$ . subscribe ( ( tx ) => rollbacks . push ( tx ) ) ;
326+
327+ expect ( await firstValueFrom ( provider$ . pipe ( bufferCount ( 2 ) ) ) ) . toEqual ( [
328+ [ txId1 , txId2 , txId3 ] , // from store
329+ [ txId4 , txId5 , txId6 ] // chain history
330+ ] ) ;
331+ expect ( rollbacks ) . toEqual ( [ txId3 , txId2 , txId1 ] ) ;
332+ expect ( store . setAll ) . toBeCalledTimes ( 2 ) ;
333+ } ) ;
293334
294335 // latestStoredBlock <1 2>
295336 // newBlock <1 2 3>
296337 // rollback$ none
297338 // store&emit [1,2,3]
298- it . todo ( 'stores new transactions when new block is superset' ) ;
339+ it ( 'stores new transactions when new block is superset' , async ( ) => {
340+ const [ txId1 , txId2 ] = updateTransactionsBlockNo ( queryTransactionsResult2 . pageResults , Cardano . BlockNo ( 10_050 ) ) ;
341+ const [ txId1OtherBlock , txId2OtherBlock , txId3 ] = updateTransactionsBlockNo (
342+ queryTransactionsResult2 . pageResults ,
343+ Cardano . BlockNo ( 10_051 )
344+ ) ;
345+
346+ await firstValueFrom ( store . setAll ( [ txId1 , txId2 ] ) ) ;
347+
348+ chainHistoryProvider . transactionsByAddresses = jest . fn ( ( ) => ( {
349+ pageResults : [ txId1OtherBlock , txId2OtherBlock , txId3 ] ,
350+ totalResultCount : 3
351+ } ) ) ;
352+
353+ const { transactionsSource$ : provider$ , rollback$ } = createAddressTransactionsProvider ( {
354+ addresses$ : of ( addresses ) ,
355+ chainHistoryProvider,
356+ logger,
357+ retryBackoffConfig,
358+ store,
359+ tipBlockHeight$
360+ } ) ;
361+
362+ const rollbacks : Cardano . HydratedTx [ ] = [ ] ;
363+ rollback$ . subscribe ( ( tx ) => rollbacks . push ( tx ) ) ;
364+
365+ expect ( await firstValueFrom ( provider$ . pipe ( bufferCount ( 2 ) ) ) ) . toEqual ( [
366+ [ txId1 , txId2 ] , // from store
367+ [ txId1OtherBlock , txId2OtherBlock , txId3 ] // chain history
368+ ] ) ;
369+ expect ( rollbacks . length ) . toBe ( 0 ) ;
370+ expect ( store . setAll ) . toBeCalledTimes ( 2 ) ;
371+ expect ( store . setAll ) . nthCalledWith ( 2 , [ txId1OtherBlock , txId2OtherBlock , txId3 ] ) ;
372+ } ) ;
299373
300374 // latestStoredBlock <1 2 3>
301375 // newBlock <1 2>
302376 // rollback$ 3
303377 // store&emit [1,2]
304- it . todo ( 'rollback some transactions when new block is subset' ) ;
378+ it ( 'rollback some transactions when new block is subset' , async ( ) => {
379+ const [ txId1 , txId2 , txId3 ] = updateTransactionsBlockNo (
380+ queryTransactionsResult2 . pageResults ,
381+ Cardano . BlockNo ( 10_050 )
382+ ) ;
383+ const [ txId1OtherBlock , txId2OtherBlock ] = updateTransactionsBlockNo ( [ txId1 , txId2 ] , Cardano . BlockNo ( 10_051 ) ) ;
384+
385+ await firstValueFrom ( store . setAll ( [ txId1 , txId2 , txId3 ] ) ) ;
386+
387+ chainHistoryProvider . transactionsByAddresses = jest . fn ( ( ) => ( {
388+ pageResults : [ txId1OtherBlock , txId2OtherBlock ] ,
389+ totalResultCount : 2
390+ } ) ) ;
391+
392+ const { transactionsSource$ : provider$ , rollback$ } = createAddressTransactionsProvider ( {
393+ addresses$ : of ( addresses ) ,
394+ chainHistoryProvider,
395+ logger,
396+ retryBackoffConfig,
397+ store,
398+ tipBlockHeight$
399+ } ) ;
400+
401+ const rollbacks : Cardano . HydratedTx [ ] = [ ] ;
402+ rollback$ . subscribe ( ( tx ) => rollbacks . push ( tx ) ) ;
403+
404+ expect ( await firstValueFrom ( provider$ . pipe ( bufferCount ( 2 ) ) ) ) . toEqual ( [
405+ [ txId1 , txId2 , txId3 ] , // from store
406+ [ txId1OtherBlock , txId2OtherBlock ] // chain history
407+ ] ) ;
408+ expect ( rollbacks ) . toEqual ( [ txId3 ] ) ;
409+ expect ( store . setAll ) . toBeCalledTimes ( 2 ) ;
410+ expect ( store . setAll ) . nthCalledWith ( 2 , [ txId1OtherBlock , txId2OtherBlock ] ) ;
411+ } ) ;
305412
306413 // latestStoredBlock <1 2>
307414 // newBlocks <3> <1> <2>
308415 // rollback$ none - transactions are on chain
309416 // store&emit [3 1 2] - re-emit all as they might have a different blockNo
310417 // Noop - produces the same result in the tx history
311- it . todo ( 'detects when latest block transactions are found in among new blocks' ) ;
418+ it ( 'detects when latest block transactions are found in among new blocks' , async ( ) => {
419+ const [ txId1 , txId2 , txId3 ] = updateTransactionsBlockNo (
420+ queryTransactionsResult2 . pageResults ,
421+ Cardano . BlockNo ( 10_000 )
422+ ) ;
423+
424+ const [ txId3OtherBlock ] = updateTransactionsBlockNo ( [ txId3 ] , Cardano . BlockNo ( 10_100 ) ) ;
425+ const [ txId1OtherBlock ] = updateTransactionsBlockNo ( [ txId1 ] , Cardano . BlockNo ( 10_200 ) ) ;
426+ const [ txId2OtherBlock ] = updateTransactionsBlockNo ( [ txId2 ] , Cardano . BlockNo ( 10_300 ) ) ;
427+
428+ await firstValueFrom ( store . setAll ( [ txId1 , txId2 , txId3 ] ) ) ;
429+
430+ chainHistoryProvider . transactionsByAddresses = jest . fn ( ( ) => ( {
431+ pageResults : [ txId3OtherBlock , txId1OtherBlock , txId2OtherBlock ] ,
432+ totalResultCount : 3
433+ } ) ) ;
434+
435+ const { transactionsSource$ : provider$ , rollback$ } = createAddressTransactionsProvider ( {
436+ addresses$ : of ( addresses ) ,
437+ chainHistoryProvider,
438+ logger,
439+ retryBackoffConfig,
440+ store,
441+ tipBlockHeight$
442+ } ) ;
443+
444+ const rollbacks : Cardano . HydratedTx [ ] = [ ] ;
445+ rollback$ . subscribe ( ( tx ) => rollbacks . push ( tx ) ) ;
446+
447+ expect ( await firstValueFrom ( provider$ . pipe ( bufferCount ( 2 ) ) ) ) . toEqual ( [
448+ [ txId1 , txId2 , txId3 ] , // from store
449+ [ txId3OtherBlock , txId1OtherBlock , txId2OtherBlock ] // chain history
450+ ] ) ;
451+ expect ( rollbacks . length ) . toBe ( 0 ) ;
452+ expect ( store . setAll ) . toBeCalledTimes ( 2 ) ;
453+ expect ( store . setAll ) . nthCalledWith ( 2 , [ txId3OtherBlock , txId1OtherBlock , txId2OtherBlock ] ) ;
454+ } ) ;
312455
313456 // latestStoredBlock <1 2>
314457 // newBlock <3 2 1>
315458 // rollback$ none - transactions are on chain
316459 // store&emit [3 2 1]
317- it . todo ( 'reversed order transactions plus new tx are re-emitted, but not considered rollbacks' ) ;
460+ it ( 'reversed order transactions plus new tx are re-emitted, but not considered rollbacks' , async ( ) => {
461+ const [ txId1 , txId2 , txId3 ] = updateTransactionsBlockNo (
462+ queryTransactionsResult2 . pageResults ,
463+ Cardano . BlockNo ( 10_000 )
464+ ) ;
465+
466+ const [ txId1OtherBlock , txId2OtherBlock , txId3OtherBlock ] = updateTransactionsBlockNo (
467+ [ txId1 , txId2 , txId3 ] ,
468+ Cardano . BlockNo ( 10_100 )
469+ ) ;
470+
471+ await firstValueFrom ( store . setAll ( [ txId1 , txId2 ] ) ) ;
472+
473+ chainHistoryProvider . transactionsByAddresses = jest . fn ( ( ) => ( {
474+ pageResults : [ txId3OtherBlock , txId2OtherBlock , txId1OtherBlock ] ,
475+ totalResultCount : 3
476+ } ) ) ;
477+
478+ const { transactionsSource$ : provider$ , rollback$ } = createAddressTransactionsProvider ( {
479+ addresses$ : of ( addresses ) ,
480+ chainHistoryProvider,
481+ logger,
482+ retryBackoffConfig,
483+ store,
484+ tipBlockHeight$
485+ } ) ;
486+
487+ const rollbacks : Cardano . HydratedTx [ ] = [ ] ;
488+ rollback$ . subscribe ( ( tx ) => rollbacks . push ( tx ) ) ;
489+
490+ expect ( await firstValueFrom ( provider$ . pipe ( bufferCount ( 2 ) ) ) ) . toEqual ( [
491+ [ txId1 , txId2 ] , // from store
492+ [ txId3OtherBlock , txId2OtherBlock , txId1OtherBlock ] // chain history
493+ ] ) ;
494+ expect ( rollbacks . length ) . toBe ( 0 ) ;
495+ expect ( store . setAll ) . toBeCalledTimes ( 2 ) ;
496+ expect ( store . setAll ) . nthCalledWith ( 2 , [ txId3OtherBlock , txId2OtherBlock , txId1OtherBlock ] ) ;
497+ } ) ;
498+
499+ // latestStoredBlock <1 2 3>
500+ // newBlock <1 2 3>
501+ // rollback$ none
502+ // store&emit none
503+ it ( 'does not emit when newBlock transactions are identical to stored transactions' , async ( ) => {
504+ const [ txId1 , txId2 , txId3 ] = updateTransactionsBlockNo (
505+ queryTransactionsResult2 . pageResults ,
506+ Cardano . BlockNo ( 10_000 )
507+ ) ;
508+
509+ await firstValueFrom ( store . setAll ( [ txId1 , txId2 , txId3 ] ) ) ;
510+
511+ chainHistoryProvider . transactionsByAddresses = jest . fn ( ( ) => ( {
512+ pageResults : [ txId1 , txId2 , txId3 ] ,
513+ totalResultCount : 3
514+ } ) ) ;
515+
516+ const { transactionsSource$ : provider$ , rollback$ } = createAddressTransactionsProvider ( {
517+ addresses$ : of ( addresses ) ,
518+ chainHistoryProvider,
519+ logger,
520+ retryBackoffConfig,
521+ store,
522+ tipBlockHeight$
523+ } ) ;
524+
525+ const rollbacks : Cardano . HydratedTx [ ] = [ ] ;
526+ rollback$ . subscribe ( ( tx ) => rollbacks . push ( tx ) ) ;
527+
528+ expect ( await firstValueFrom ( provider$ . pipe ( bufferCount ( 1 ) ) ) ) . toEqual ( [
529+ [ txId1 , txId2 , txId3 ] // from store
530+ ] ) ;
531+ expect ( rollbacks . length ) . toBe ( 0 ) ;
532+ expect ( store . setAll ) . toBeCalledTimes ( 1 ) ;
533+ expect ( store . setAll ) . nthCalledWith ( 1 , [ txId1 , txId2 , txId3 ] ) ;
534+ } ) ;
318535 } ) ;
319536 } ) ;
320537
0 commit comments