@@ -15,7 +15,9 @@ import {
15
15
FumeOffset ,
16
16
FumeShardIdx ,
17
17
FumeSlotStatus ,
18
+ Slot ,
18
19
} from "./state-machine" ;
20
+ import { Runtime } from "inspector/promises" ;
19
21
20
22
export class CompletedDownloadBlockTask {
21
23
constructor (
@@ -64,28 +66,34 @@ export abstract class AsyncSlotDownloader {
64
66
) : Promise < DownloadTaskResult > ;
65
67
}
66
68
67
- type TaskName =
68
- | "dragonsmouth_bidi"
69
- | "control_plane_rx"
70
- | "download_task"
71
- | "commit_tick" ;
72
-
73
-
74
-
75
69
export type Tick = { } ;
76
70
77
- export type ControlPlaneResp = { readonly response : ControlPlaneResp }
71
+ export type ControlPlaneResp = { readonly response : ControlResponse }
78
72
79
73
export type DownloadTaskCompleted = { readonly result : DownloadTaskResult }
80
74
81
75
export type SubscribeRequestUpdate = { readonly new_subscribe_request : SubscribeRequest }
82
76
77
+ export type RuntimeEventKind =
78
+ | 'tick'
79
+ | 'subscribe_request_update'
80
+ | 'download_completed'
81
+ | 'control_plane_response' ;
82
+
83
+ // export type RuntimeEvent = {
84
+ // _kind: RuntimeEventKind,
85
+ // tick: Tick | undefined,
86
+ // subscribe_request_update: SubscribeRequestUpdate | undefined,
87
+ // download_completed: DownloadTaskCompleted | undefined,
88
+ // control_plane_response: ControlPlaneResp | undefined
89
+ // }
90
+
91
+ export type RuntimeEvent =
92
+ | { _kind : 'tick' ; tick : Tick }
93
+ | { _kind : 'subscribe_request_update' ; subscribe_request_update : SubscribeRequestUpdate }
94
+ | { _kind : 'download_completed' ; download_completed : DownloadTaskCompleted }
95
+ | { _kind : 'control_plane_response' ; control_plane_response : ControlPlaneResp } ;
83
96
84
- export type RuntimeEvent =
85
- | { _kind : 'tick' , value : Tick }
86
- | { _kind : 'subscribe_request_update' , value : SubscribeRequestUpdate }
87
- | { _kind : 'download_completed' , value : DownloadTaskCompleted }
88
- | { _kind : 'control_plane_response' , value : ControlPlaneResp }
89
97
90
98
export class FumeDragonsmouthRuntime {
91
99
public stateMachine : FumaroleSM ;
@@ -426,11 +434,91 @@ export class FumeDragonsmouthRuntime {
426
434
427
435
428
436
429
- type RuntimeContext = {
437
+ type FumaroleRuntimeArgs = {
430
438
download_task_observer : Observer < DownloadTaskArgs > ,
431
439
control_plane_observer : Observer < ControlCommand > ,
432
- state : FumaroleSM ,
440
+ dragonsmouth_observer : Observer < SubscribeUpdate > ,
441
+ sm : FumaroleSM ,
442
+ download_task_result_observable : Observable < DownloadTaskResult > ,
443
+ }
444
+
445
+
446
+
447
+ type FumaroleRuntimeCtx = {
448
+ sm : FumaroleSM ,
449
+ commitInterval : number ; // in seconds
450
+ gcInterval : number ;
451
+ maxConcurrentDownload : number ;
452
+ lastCommit : number ;
453
+ inflight_downloads : Map < Slot , FumeDownloadRequest > ,
454
+ subscribeRequest : SubscribeRequest ,
455
+ }
456
+
457
+
458
+ function onControlPlaneResponse ( this : FumaroleRuntimeCtx , resp : ControlResponse ) {
459
+ if ( resp . pollHist ) {
460
+ const pollHist = resp . pollHist ;
461
+ console . log ( `Received poll history ${ pollHist . events . length } events` ) ;
462
+ this . sm . queueBlockchainEvent ( pollHist . events ) ;
463
+ } else if ( resp . commitOffset ) {
464
+ const commitOffset = resp . commitOffset ;
465
+ console . log ( `Received commit offset: ${ JSON . stringify ( commitOffset ) } ` ) ;
466
+ this . sm . updateCommittedOffset ( commitOffset . offset ) ;
467
+ } else if ( resp . pong ) {
468
+ console . log ( "Received pong" ) ;
469
+ } else {
470
+ throw new Error ( "Unexpected control response" ) ;
471
+ }
433
472
}
434
473
474
+ function onDownloadCompleted ( this : FumaroleRuntimeCtx , result : DownloadTaskResult ) {
475
+ console . log ( "Download completed:" , result ) ;
476
+ if ( result . kind === "Ok" ) {
477
+ const completed = result . completed ! ;
478
+ console . log (
479
+ `Download completed for slot ${ completed . slot } , shard ${ completed . shardIdx } , ${ completed . totalEventDownloaded } total events`
480
+ ) ;
435
481
482
+ this . sm . makeSlotDownloadProgress (
483
+ completed . slot ,
484
+ completed . shardIdx
485
+ ) ;
486
+ } else {
487
+ const slot = result . slot ;
488
+ const err = result . err ;
489
+ throw new Error ( `Failed to download slot ${ slot } : ${ err ! . message } ` ) ;
490
+ }
491
+ }
492
+
493
+ function onSubscribeRequestUpdate ( this : FumaroleRuntimeCtx , update : SubscribeRequestUpdate ) {
494
+ this . subscribeRequest = update . new_subscribe_request ;
495
+ }
496
+
497
+ function runtime_next ( this : FumaroleRuntimeCtx , ev : RuntimeEvent ) {
498
+ switch ( ev . _kind ) {
499
+ case 'tick' :
500
+ return ;
501
+ case 'subscribe_request_update' :
502
+ onSubscribeRequestUpdate . call ( this , ev . subscribe_request_update ) ;
503
+ case 'download_completed' :
504
+ onDownloadCompleted . call ( this , ev . download_completed ) ;
505
+ case 'control_plane_response' :
506
+ onControlPlaneResponse . call ( this , ev . control_plane_response ) ;
507
+ }
508
+ }
509
+
510
+
511
+ export function runtimeObserverFactory ( args : FumaroleRuntimeArgs ) : Observer < RuntimeEvent > {
512
+ const {
513
+ download_task_observer,
514
+ control_plane_observer,
515
+ dragonsmouth_observer,
516
+ sm,
517
+ download_task_result_observable,
518
+ } = args ;
519
+
520
+ return {
521
+
522
+ }
523
+ }
436
524
0 commit comments