Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

fix #523 #525

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion skywalking.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static void php_skywalking_init_globals(zend_skywalking_globals *skywalking_glob

skywalking_globals->oap_version = "9.0.0";
skywalking_globals->oap_cross_process_protocol = "3.0";
skywalking_globals->oap_authentication = NULL;
skywalking_globals->oap_authentication = "";

// tls
skywalking_globals->grpc_address = NULL;
Expand Down
33 changes: 20 additions & 13 deletions src/reporter/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use tokio::signal::{self, unix::SignalKind};
use tonic::{
transport::{Channel, Endpoint},
Request,
metadata::{
MetadataMap, MetadataValue
}
};
use std::os::raw::c_char;
use std::thread;
Expand Down Expand Up @@ -114,7 +117,7 @@ pub struct Reporter {
pub service_instance: *const c_char,
}

pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String) -> anyhow::Result<()> {
pub fn init(address: String, service: String, mut service_instance: String, log_level: String, log_path: String, authentication: String) -> anyhow::Result<()> {
let mut level = simplelog::LevelFilter::Debug;

if log_level == "disable" {
Expand Down Expand Up @@ -147,15 +150,15 @@ pub fn init(address: String, service: String, mut service_instance: String, log_
.worker_threads(4)
.enable_all()
.build()?;
rt.block_on(worker(address, service, service_instance));
rt.block_on(worker(address, service, service_instance, authentication));
Ok(())
}

pub async fn worker(address: String, service: String, service_instance: String) {
pub async fn worker(address: String, service: String, service_instance: String, authentication: String) {
let (segment_sender, segment_receiver) = counted_channel(5000);
spawn(do_connect(address.to_owned()));
spawn(login(service.to_owned(), service_instance.to_owned()));
spawn(keep_alive(service.to_owned(), service_instance.to_owned()));
spawn(login(service.to_owned(), service_instance.to_owned(), authentication.to_owned()));
spawn(keep_alive(service.to_owned(), service_instance.to_owned(), authentication.to_owned()));
spawn(sender(segment_receiver));
spawn(receive(segment_sender));

Expand Down Expand Up @@ -191,7 +194,7 @@ pub async fn do_connect(address: String) {
GRPC_CHANNEL.set(channel);
}

pub async fn login(service: String, service_instance: String) {
pub async fn login(service: String, service_instance: String, authentication: String) {
let props = vec![
KeyStringValuePair {
key: "os_name".to_owned(),
Expand Down Expand Up @@ -229,7 +232,7 @@ pub async fn login(service: String, service_instance: String) {
None => continue,
};
log::debug!("login instance {:?}", instance);
match do_login(channel.clone(), instance.clone()).await {
match do_login(channel.clone(), instance.clone(), authentication.clone()).await {
Ok(r) => {
REGISTER.set(true);
break;
Expand All @@ -242,13 +245,15 @@ pub async fn login(service: String, service_instance: String) {
}
}

pub async fn do_login(channel: Channel, instance: InstanceProperties) -> anyhow::Result<()> {
pub async fn do_login(channel: Channel, instance: InstanceProperties, authentication: String) -> anyhow::Result<()> {
let mut client = ManagementServiceClient::new(channel);
client.report_instance_properties(Request::new(instance)).await?;
let mut request = Request::new(instance);
request.metadata_mut().insert("Authentication", authentication.as_str().parse()?);
client.report_instance_properties(request).await?;
Ok(())
}

pub async fn keep_alive(service: String, service_instance: String) {
pub async fn keep_alive(service: String, service_instance: String, authentication: String) {
let instance = InstancePingPkg {
service: service.clone(),
service_instance: service_instance.clone(),
Expand All @@ -267,16 +272,18 @@ pub async fn keep_alive(service: String, service_instance: String) {
None => continue,
};
log::debug!("keep alive instance {:?}", instance);
match do_keep_alive(channel.clone(), instance.clone()).await {
match do_keep_alive(channel.clone(), instance.clone(), authentication.clone()).await {
Ok(r) => continue,
Err(e) => continue
};
}
}

pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg) -> anyhow::Result<()> {
pub async fn do_keep_alive(channel: Channel, instance: InstancePingPkg, authentication: String) -> anyhow::Result<()> {
let mut client = ManagementServiceClient::new(channel);
client.keep_alive(Request::new(instance)).await?;
let mut request = Request::new(instance);
request.metadata_mut().insert("Authentication", authentication.as_str().parse()?);
client.keep_alive(request).await?;
Ok(())
}

Expand Down
9 changes: 8 additions & 1 deletion src/sky_core_report.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ bool sky_core_report_ipc_init();

bool sky_core_report_ipc_send(char *data, size_t len);

bool sky_core_report_new(char *address, char *service, char *service_instance, char *log_level, char *log_path);
bool sky_core_report_new(
char *address,
char *service,
char *service_instance,
char *log_level,
char *log_path,
char *oap_authentication
);

char *sky_core_report_trace_id();

Expand Down
3 changes: 3 additions & 0 deletions src/sky_core_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,22 @@ extern "C" fn sky_core_report_new(
service_instance: *const c_char,
log_level: *const c_char,
log_path: *const c_char,
oap_authentication: *const c_char
) -> bool {
let f = || unsafe {
let address = CStr::from_ptr(address).to_str()?;
let service = CStr::from_ptr(service).to_str()?;
let service_instance = CStr::from_ptr(service_instance).to_str()?;
let log_level = CStr::from_ptr(log_level).to_str()?;
let log_path = CStr::from_ptr(log_path).to_str()?;
let oap_authentication = CStr::from_ptr(oap_authentication).to_str()?;
grpc::init(
address.to_string(),
service.to_string(),
service_instance.to_string(),
log_level.to_string(),
log_path.to_string(),
oap_authentication.to_string(),
)?;
Ok::<_, anyhow::Error>(())
};
Expand Down