@@ -27,11 +27,12 @@ pub struct AquilaGRPCServer {
27
27
sagittarius_url : String ,
28
28
nats_url : String ,
29
29
address : SocketAddr ,
30
+ with_health_service : bool ,
30
31
}
31
32
32
33
impl AquilaGRPCServer {
33
34
pub fn new ( config : & Config ) -> Self {
34
- let address = match format ! ( "127.0.0.1 :{}" , config. grpc_port) . parse ( ) {
35
+ let address = match format ! ( "{} :{}" , config . grpc_host , config. grpc_port) . parse ( ) {
35
36
Ok ( addr) => {
36
37
info ! ( "Listening on {:?}" , & addr) ;
37
38
addr
@@ -43,55 +44,61 @@ impl AquilaGRPCServer {
43
44
token : config. runtime_token . clone ( ) ,
44
45
sagittarius_url : config. backend_url . clone ( ) ,
45
46
nats_url : config. nats_url . clone ( ) ,
47
+ with_health_service : config. with_health_service ,
46
48
address,
47
49
}
48
50
}
49
51
50
- pub async fn start ( & self ) -> std :: result :: Result < ( ) , tonic:: transport:: Error > {
52
+ pub async fn start ( & self ) -> Result < ( ) , tonic:: transport:: Error > {
51
53
let data_type_service = SagittariusDataTypeServiceClient :: new_arc (
52
54
self . sagittarius_url . clone ( ) ,
53
55
self . token . clone ( ) ,
54
56
)
55
57
. await ;
56
58
57
- log :: info!( "DataTypeService started" ) ;
59
+ info ! ( "DataTypeService started" ) ;
58
60
59
61
let flow_type_service = SagittariusFlowTypeServiceClient :: new_arc (
60
62
self . sagittarius_url . clone ( ) ,
61
63
self . token . clone ( ) ,
62
64
)
63
65
. await ;
64
66
65
- log :: info!( "FlowTypeService started" ) ;
67
+ info ! ( "FlowTypeService started" ) ;
66
68
67
69
let runtime_function_service = SagittariusRuntimeFunctionServiceClient :: new_arc (
68
70
self . sagittarius_url . clone ( ) ,
69
71
self . token . clone ( ) ,
70
72
)
71
73
. await ;
72
74
73
- log:: info!( "RuntimeFunctionService started" ) ;
74
-
75
- let health_service = code0_flow:: flow_health:: HealthService :: new ( self . nats_url . clone ( ) ) ;
76
- log:: info!( "HealthService started" ) ;
75
+ info ! ( "RuntimeFunctionService started" ) ;
77
76
78
77
let data_type_server = AquilaDataTypeServiceServer :: new ( data_type_service. clone ( ) ) ;
79
78
let flow_type_server = AquilaFlowTypeServiceServer :: new ( flow_type_service. clone ( ) ) ;
80
79
let runtime_function_server =
81
80
AquilaRuntimeFunctionServiceServer :: new ( runtime_function_service. clone ( ) ) ;
82
81
83
- log:: info!( "Starting gRPC Server..." ) ;
82
+ info ! ( "Starting gRPC Server..." ) ;
83
+
84
+ if self . with_health_service {
85
+ info ! ( "Starting with HealthService" ) ;
86
+ let health_service = code0_flow:: flow_health:: HealthService :: new ( self . nats_url . clone ( ) ) ;
84
87
85
- Server :: builder ( )
86
- . add_service ( tonic_health:: pb:: health_server:: HealthServer :: new (
87
- health_service,
88
- ) )
89
- . add_service ( DataTypeServiceServer :: new ( data_type_server) )
90
- . add_service ( FlowTypeServiceServer :: new ( flow_type_server) )
91
- . add_service ( RuntimeFunctionDefinitionServiceServer :: new (
92
- runtime_function_server,
93
- ) )
94
- . serve ( self . address )
95
- . await
88
+ Server :: builder ( )
89
+ . add_service ( tonic_health:: pb:: health_server:: HealthServer :: new ( health_service) )
90
+ . add_service ( DataTypeServiceServer :: new ( data_type_server) )
91
+ . add_service ( FlowTypeServiceServer :: new ( flow_type_server) )
92
+ . add_service ( RuntimeFunctionDefinitionServiceServer :: new ( runtime_function_server) )
93
+ . serve ( self . address )
94
+ . await
95
+ } else {
96
+
97
+ Server :: builder ( )
98
+ . add_service ( DataTypeServiceServer :: new ( data_type_server) )
99
+ . add_service ( FlowTypeServiceServer :: new ( flow_type_server) )
100
+ . add_service ( RuntimeFunctionDefinitionServiceServer :: new ( runtime_function_server) ) . serve ( self . address )
101
+ . await
102
+ }
96
103
}
97
104
}
0 commit comments