diff --git a/skywalking.c b/skywalking.c index 24eb3810..8cd8e4bc 100644 --- a/skywalking.c +++ b/skywalking.c @@ -85,7 +85,7 @@ static void php_skywalking_init_globals(zend_skywalking_globals *skywalking_glob skywalking_globals->oap_version = "9.0.0"; skywalking_globals->oap_cross_process_protocol = "3.0"; - skywalking_globals->oap_authentication = NULL; + skywalking_globals->oap_authentication = ""; // tls skywalking_globals->grpc_address = NULL; diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index 63f7ed59..47e545bd 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -24,6 +24,9 @@ use tokio::signal::{self, unix::SignalKind}; use tonic::{ transport::{Channel, Endpoint}, Request, + metadata::{ + MetadataMap, MetadataValue + } }; use std::os::raw::c_char; use std::thread; @@ -114,7 +117,7 @@ pub struct Reporter { pub service_instance: *const c_char, } -pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String) -> anyhow::Result<()> { +pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String, authentication: String) -> anyhow::Result<()> { let mut level = simplelog::LevelFilter::Debug; if log_level == "disable" { @@ -147,15 +150,15 @@ pub fn init(address: String, service: String, mut service_instance: String, log_ .worker_threads(4) .enable_all() .build()?; - rt.block_on(worker(address, service, service_instance)); + rt.block_on(worker(address, service, service_instance, authentication)); Ok(()) } -pub async fn worker(address: String, service: String, service_instance: String) { +pub async fn worker(address: String, service: String, service_instance: String, authentication: String) { let (segment_sender, segment_receiver) = counted_channel(5000); spawn(do_connect(address.to_owned())); - spawn(login(service.to_owned(), service_instance.to_owned())); - spawn(keep_alive(service.to_owned(), service_instance.to_owned())); + spawn(login(service.to_owned(), service_instance.to_owned(), authentication.to_owned())); + spawn(keep_alive(service.to_owned(), service_instance.to_owned(), authentication.to_owned())); spawn(sender(segment_receiver)); spawn(receive(segment_sender)); @@ -191,7 +194,7 @@ pub async fn do_connect(address: String) { GRPC_CHANNEL.set(channel); } -pub async fn login(service: String, service_instance: String) { +pub async fn login(service: String, service_instance: String, authentication: String) { let props = vec![ KeyStringValuePair { key: "os_name".to_owned(), @@ -229,7 +232,7 @@ pub async fn login(service: String, service_instance: String) { None => continue, }; log::debug!("login instance {:?}", instance); - match do_login(channel.clone(), instance.clone()).await { + match do_login(channel.clone(), instance.clone(), authentication.clone()).await { Ok(r) => { REGISTER.set(true); break; @@ -242,13 +245,15 @@ pub async fn login(service: String, service_instance: String) { } } -pub async fn do_login(channel: Channel, instance: InstanceProperties) -> anyhow::Result<()> { +pub async fn do_login(channel: Channel, instance: InstanceProperties, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); - client.report_instance_properties(Request::new(instance)).await?; + let mut request = Request::new(instance); + request.metadata_mut().insert("Authentication", authentication.as_str().parse()?); + client.report_instance_properties(request).await?; Ok(()) } -pub async fn keep_alive(service: String, service_instance: String) { +pub async fn keep_alive(service: String, service_instance: String, authentication: String) { let instance = InstancePingPkg { service: service.clone(), service_instance: service_instance.clone(), @@ -267,16 +272,18 @@ pub async fn keep_alive(service: String, service_instance: String) { None => continue, }; log::debug!("keep alive instance {:?}", instance); - match do_keep_alive(channel.clone(), instance.clone()).await { + match do_keep_alive(channel.clone(), instance.clone(), authentication.clone()).await { Ok(r) => continue, Err(e) => continue }; } } -pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg) -> anyhow::Result<()> { +pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); - client.keep_alive(Request::new(instance)).await?; + let mut request = Request::new(instance); + request.metadata_mut().insert("Authentication", authentication.as_str().parse()?); + client.keep_alive(request).await?; Ok(()) } diff --git a/src/sky_core_report.h b/src/sky_core_report.h index 5fd6956e..413b929f 100644 --- a/src/sky_core_report.h +++ b/src/sky_core_report.h @@ -28,7 +28,14 @@ bool sky_core_report_ipc_init(); bool sky_core_report_ipc_send(char *data, size_t len); -bool sky_core_report_new(char *address, char *service, char *service_instance, char *log_level, char *log_path); +bool sky_core_report_new( + char *address, + char *service, + char *service_instance, + char *log_level, + char *log_path, + char *oap_authentication + ); char *sky_core_report_trace_id(); diff --git a/src/sky_core_report.rs b/src/sky_core_report.rs index 6662a4aa..0ef0ba6b 100644 --- a/src/sky_core_report.rs +++ b/src/sky_core_report.rs @@ -93,6 +93,7 @@ extern "C" fn sky_core_report_new( service_instance: *const c_char, log_level: *const c_char, log_path: *const c_char, + oap_authentication: *const c_char ) -> bool { let f = || unsafe { let address = CStr::from_ptr(address).to_str()?; @@ -100,12 +101,14 @@ extern "C" fn sky_core_report_new( let service_instance = CStr::from_ptr(service_instance).to_str()?; let log_level = CStr::from_ptr(log_level).to_str()?; let log_path = CStr::from_ptr(log_path).to_str()?; + let oap_authentication = CStr::from_ptr(oap_authentication).to_str()?; grpc::init( address.to_string(), service.to_string(), service_instance.to_string(), log_level.to_string(), log_path.to_string(), + oap_authentication.to_string(), )?; Ok::<_, anyhow::Error>(()) };