From eabc6906fc9ec53f433ee8cf84b5de1741a66e88 Mon Sep 17 00:00:00 2001 From: heyanlong Date: Tue, 30 Aug 2022 11:23:40 +0800 Subject: [PATCH 1/3] fix #523 --- skywalking.c | 2 +- src/reporter/grpc.rs | 33 ++++++++++++++++++++------------- src/sky_core_report.h | 9 ++++++++- src/sky_core_report.rs | 3 +++ 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/skywalking.c b/skywalking.c index 32d28d66..3a0d0971 100644 --- a/skywalking.c +++ b/skywalking.c @@ -86,7 +86,7 @@ static void php_skywalking_init_globals(zend_skywalking_globals *skywalking_glob skywalking_globals->oap_version = "9.0.0"; skywalking_globals->oap_cross_process_protocol = "3.0"; - skywalking_globals->oap_authentication = NULL; + skywalking_globals->oap_authentication = ""; // tls skywalking_globals->grpc_address = NULL; diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index 63f7ed59..d54e7db5 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -24,6 +24,9 @@ use tokio::signal::{self, unix::SignalKind}; use tonic::{ transport::{Channel, Endpoint}, Request, + metadata::{ + MetadataMap, MetadataValue + } }; use std::os::raw::c_char; use std::thread; @@ -114,7 +117,7 @@ pub struct Reporter { pub service_instance: *const c_char, } -pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String) -> anyhow::Result<()> { +pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String, authentication: String) -> anyhow::Result<()> { let mut level = simplelog::LevelFilter::Debug; if log_level == "disable" { @@ -147,15 +150,15 @@ pub fn init(address: String, service: String, mut service_instance: String, log_ .worker_threads(4) .enable_all() .build()?; - rt.block_on(worker(address, service, service_instance)); + rt.block_on(worker(address, service, service_instance, authentication)); Ok(()) } -pub async fn worker(address: String, service: String, service_instance: String) { +pub async fn worker(address: String, service: String, service_instance: String, authentication: String) { let (segment_sender, segment_receiver) = counted_channel(5000); spawn(do_connect(address.to_owned())); - spawn(login(service.to_owned(), service_instance.to_owned())); - spawn(keep_alive(service.to_owned(), service_instance.to_owned())); + spawn(login(service.to_owned(), service_instance.to_owned(), authentication.to_owned())); + spawn(keep_alive(service.to_owned(), service_instance.to_owned(), authentication.to_owned())); spawn(sender(segment_receiver)); spawn(receive(segment_sender)); @@ -191,7 +194,7 @@ pub async fn do_connect(address: String) { GRPC_CHANNEL.set(channel); } -pub async fn login(service: String, service_instance: String) { +pub async fn login(service: String, service_instance: String, authentication: String) { let props = vec![ KeyStringValuePair { key: "os_name".to_owned(), @@ -229,7 +232,7 @@ pub async fn login(service: String, service_instance: String) { None => continue, }; log::debug!("login instance {:?}", instance); - match do_login(channel.clone(), instance.clone()).await { + match do_login(channel.clone(), instance.clone(), authentication.clone()).await { Ok(r) => { REGISTER.set(true); break; @@ -242,13 +245,15 @@ pub async fn login(service: String, service_instance: String) { } } -pub async fn do_login(channel: Channel, instance: InstanceProperties) -> anyhow::Result<()> { +pub async fn do_login(channel: Channel, instance: InstanceProperties, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); - client.report_instance_properties(Request::new(instance)).await?; + let mut request = Request::new(instance); + request.metadata_mut().insert("Authentication", MetadataValue::from(authentication)); + client.report_instance_properties(request).await?; Ok(()) } -pub async fn keep_alive(service: String, service_instance: String) { +pub async fn keep_alive(service: String, service_instance: String, authentication: String) { let instance = InstancePingPkg { service: service.clone(), service_instance: service_instance.clone(), @@ -267,16 +272,18 @@ pub async fn keep_alive(service: String, service_instance: String) { None => continue, }; log::debug!("keep alive instance {:?}", instance); - match do_keep_alive(channel.clone(), instance.clone()).await { + match do_keep_alive(channel.clone(), instance.clone(), authentication.clone()).await { Ok(r) => continue, Err(e) => continue }; } } -pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg) -> anyhow::Result<()> { +pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); - client.keep_alive(Request::new(instance)).await?; + let mut request = Request::new(instance); + request.metadata_mut().insert("Authentication", MetadataValue::from(authentication)); + client.keep_alive(request).await?; Ok(()) } diff --git a/src/sky_core_report.h b/src/sky_core_report.h index bfd5d4da..a2e3bc3e 100644 --- a/src/sky_core_report.h +++ b/src/sky_core_report.h @@ -28,7 +28,14 @@ bool sky_core_report_ipc_init(size_t max_length); bool sky_core_report_ipc_send(char *data, size_t len); -bool sky_core_report_new(char *address, char *service, char *service_instance, char *log_level, char *log_path); +bool sky_core_report_new( + char *address, + char *service, + char *service_instance, + char *log_level, + char *log_path, + char *oap_authentication + ); char *sky_core_report_trace_id(); diff --git a/src/sky_core_report.rs b/src/sky_core_report.rs index 94a8a4be..13ba2dc7 100644 --- a/src/sky_core_report.rs +++ b/src/sky_core_report.rs @@ -93,6 +93,7 @@ extern "C" fn sky_core_report_new( service_instance: *const c_char, log_level: *const c_char, log_path: *const c_char, + oap_authentication: *const c_char ) -> bool { let f = || unsafe { let address = CStr::from_ptr(address).to_str()?; @@ -100,12 +101,14 @@ extern "C" fn sky_core_report_new( let service_instance = CStr::from_ptr(service_instance).to_str()?; let log_level = CStr::from_ptr(log_level).to_str()?; let log_path = CStr::from_ptr(log_path).to_str()?; + let oap_authentication = CStr::from_ptr(oap_authentication).to_str()?; grpc::init( address.to_string(), service.to_string(), service_instance.to_string(), log_level.to_string(), log_path.to_string(), + oap_authentication.to_string(), )?; Ok::<_, anyhow::Error>(()) }; From 0d6fbc0ffa61937d7310f2d676a7c4e930b4af1b Mon Sep 17 00:00:00 2001 From: heyanlong Date: Tue, 30 Aug 2022 17:30:37 +0800 Subject: [PATCH 2/3] authenication --- src/reporter/grpc.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index d54e7db5..beccb268 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -248,7 +248,7 @@ pub async fn login(service: String, service_instance: String, authentication: St pub async fn do_login(channel: Channel, instance: InstanceProperties, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); let mut request = Request::new(instance); - request.metadata_mut().insert("Authentication", MetadataValue::from(authentication)); + request.metadata_mut().insert("Authentication", authentication.as_str().parse().unwrap()); client.report_instance_properties(request).await?; Ok(()) } @@ -282,7 +282,7 @@ pub async fn keep_alive(service: String, service_instance: String, authenticatio pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); let mut request = Request::new(instance); - request.metadata_mut().insert("Authentication", MetadataValue::from(authentication)); + request.metadata_mut().insert("Authentication", authentication.as_str().parse().unwrap()); client.keep_alive(request).await?; Ok(()) } From 8304ae9745ea32a96daee923794cc0fb18fe4056 Mon Sep 17 00:00:00 2001 From: heyanlong Date: Tue, 30 Aug 2022 18:54:27 +0800 Subject: [PATCH 3/3] authenication --- src/reporter/grpc.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index beccb268..47e545bd 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -248,7 +248,7 @@ pub async fn login(service: String, service_instance: String, authentication: St pub async fn do_login(channel: Channel, instance: InstanceProperties, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); let mut request = Request::new(instance); - request.metadata_mut().insert("Authentication", authentication.as_str().parse().unwrap()); + request.metadata_mut().insert("Authentication", authentication.as_str().parse()?); client.report_instance_properties(request).await?; Ok(()) } @@ -282,7 +282,7 @@ pub async fn keep_alive(service: String, service_instance: String, authenticatio pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg, authentication: String) -> anyhow::Result<()> { let mut client = ManagementServiceClient::new(channel); let mut request = Request::new(instance); - request.metadata_mut().insert("Authentication", authentication.as_str().parse().unwrap()); + request.metadata_mut().insert("Authentication", authentication.as_str().parse()?); client.keep_alive(request).await?; Ok(()) }