1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: fmt:: Debug ;
1516use std:: path:: Path ;
1617use std:: sync:: Arc ;
1718use std:: time:: Duration ;
1819
1920use cache:: { build_fundamental_cache_registry, with_default_composite_cache_registry} ;
21+ use catalog:: CatalogManagerRef ;
2022use catalog:: information_extension:: DistributedInformationExtension ;
2123use catalog:: kvbackend:: { CachedKvBackendBuilder , KvBackendCatalogManagerBuilder , MetaKvBackend } ;
2224use clap:: Parser ;
2325use client:: client_manager:: NodeClients ;
2426use common_base:: Plugins ;
2527use common_config:: { Configurable , DEFAULT_DATA_HOME } ;
2628use common_grpc:: channel_manager:: ChannelConfig ;
29+ use common_meta:: FlownodeId ;
2730use common_meta:: cache:: { CacheRegistryBuilder , LayeredCacheRegistryBuilder } ;
2831use common_meta:: heartbeat:: handler:: HandlerGroupExecutor ;
2932use common_meta:: heartbeat:: handler:: invalidate_table_cache:: InvalidateCacheHandler ;
3033use common_meta:: heartbeat:: handler:: parse_mailbox_message:: ParseMailboxMessageHandler ;
3134use common_meta:: key:: TableMetadataManager ;
3235use common_meta:: key:: flow:: FlowMetadataManager ;
36+ use common_meta:: kv_backend:: KvBackendRef ;
3337use common_stat:: ResourceStatImpl ;
3438use common_telemetry:: info;
3539use common_telemetry:: logging:: { DEFAULT_LOGGING_DIR , TracingOptions } ;
@@ -39,12 +43,13 @@ use flow::{
3943 get_flow_auth_options,
4044} ;
4145use meta_client:: { MetaClientOptions , MetaClientType } ;
46+ use servers:: configurator:: GrpcBuilderConfiguratorRef ;
4247use snafu:: { OptionExt , ResultExt , ensure} ;
4348use tracing_appender:: non_blocking:: WorkerGuard ;
4449
4550use crate :: error:: {
4651 BuildCacheRegistrySnafu , InitMetadataSnafu , LoadLayeredConfigSnafu , MetaClientInitSnafu ,
47- MissingConfigSnafu , Result , ShutdownFlownodeSnafu , StartFlownodeSnafu ,
52+ MissingConfigSnafu , OtherSnafu , Result , ShutdownFlownodeSnafu , StartFlownodeSnafu ,
4853} ;
4954use crate :: options:: { GlobalOptions , GreptimeOptions } ;
5055use crate :: { App , create_resource_limit_metrics, log_versions, maybe_activate_heap_profile} ;
@@ -55,33 +60,14 @@ type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
5560
5661pub struct Instance {
5762 flownode : FlownodeInstance ,
58-
59- // The components of flownode, which make it easier to expand based
60- // on the components.
61- #[ cfg( feature = "enterprise" ) ]
62- components : Components ,
63-
6463 // Keep the logging guard to prevent the worker from being dropped.
6564 _guard : Vec < WorkerGuard > ,
6665}
6766
68- #[ cfg( feature = "enterprise" ) ]
69- pub struct Components {
70- pub catalog_manager : catalog:: CatalogManagerRef ,
71- pub fe_client : Arc < FrontendClient > ,
72- pub kv_backend : common_meta:: kv_backend:: KvBackendRef ,
73- }
74-
7567impl Instance {
76- pub fn new (
77- flownode : FlownodeInstance ,
78- #[ cfg( feature = "enterprise" ) ] components : Components ,
79- guard : Vec < WorkerGuard > ,
80- ) -> Self {
68+ pub fn new ( flownode : FlownodeInstance , guard : Vec < WorkerGuard > ) -> Self {
8169 Self {
8270 flownode,
83- #[ cfg( feature = "enterprise" ) ]
84- components,
8571 _guard : guard,
8672 }
8773 }
@@ -94,11 +80,6 @@ impl Instance {
9480 pub fn flownode_mut ( & mut self ) -> & mut FlownodeInstance {
9581 & mut self . flownode
9682 }
97-
98- #[ cfg( feature = "enterprise" ) ]
99- pub fn components ( & self ) -> & Components {
100- & self . components
101- }
10283}
10384
10485#[ async_trait:: async_trait]
@@ -396,7 +377,7 @@ impl StartCommand {
396377 let frontend_client = Arc :: new ( frontend_client) ;
397378 let flownode_builder = FlownodeBuilder :: new (
398379 opts. clone ( ) ,
399- plugins,
380+ plugins. clone ( ) ,
400381 table_metadata_manager,
401382 catalog_manager. clone ( ) ,
402383 flow_metadata_manager,
@@ -405,8 +386,29 @@ impl StartCommand {
405386 . with_heartbeat_task ( heartbeat_task) ;
406387
407388 let mut flownode = flownode_builder. build ( ) . await . context ( StartFlownodeSnafu ) ?;
389+
390+ let builder =
391+ FlownodeServiceBuilder :: grpc_server_builder ( & opts, flownode. flownode_server ( ) ) ;
392+ let builder = if let Some ( configurator) =
393+ plugins. get :: < GrpcBuilderConfiguratorRef < GrpcConfigureContext > > ( )
394+ {
395+ let context = GrpcConfigureContext {
396+ kv_backend : cached_meta_backend. clone ( ) ,
397+ fe_client : frontend_client. clone ( ) ,
398+ flownode_id : member_id,
399+ catalog_manager : catalog_manager. clone ( ) ,
400+ } ;
401+ configurator
402+ . configure ( builder, context)
403+ . await
404+ . context ( OtherSnafu ) ?
405+ } else {
406+ builder
407+ } ;
408+ let grpc_server = builder. build ( ) ;
409+
408410 let services = FlownodeServiceBuilder :: new ( & opts)
409- . with_default_grpc_server ( flownode . flownode_server ( ) )
411+ . with_grpc_server ( grpc_server )
410412 . enable_http_service ( )
411413 . build ( )
412414 . context ( StartFlownodeSnafu ) ?;
@@ -430,16 +432,14 @@ impl StartCommand {
430432 . set_frontend_invoker ( invoker)
431433 . await ;
432434
433- #[ cfg( feature = "enterprise" ) ]
434- let components = Components {
435- catalog_manager : catalog_manager. clone ( ) ,
436- fe_client : frontend_client,
437- kv_backend : cached_meta_backend,
438- } ;
439-
440- #[ cfg( not( feature = "enterprise" ) ) ]
441- return Ok ( Instance :: new ( flownode, guard) ) ;
442- #[ cfg( feature = "enterprise" ) ]
443- Ok ( Instance :: new ( flownode, components, guard) )
435+ Ok ( Instance :: new ( flownode, guard) )
444436 }
445437}
438+
439+ /// The context for [`GrpcBuilderConfiguratorRef`] in flownode.
440+ pub struct GrpcConfigureContext {
441+ pub kv_backend : KvBackendRef ,
442+ pub fe_client : Arc < FrontendClient > ,
443+ pub flownode_id : FlownodeId ,
444+ pub catalog_manager : CatalogManagerRef ,
445+ }
0 commit comments