Skip to content

Commit

Permalink
add all tonic (#914)
Browse files Browse the repository at this point in the history
  • Loading branch information
vvzxy authored Sep 24, 2024
1 parent dc7dd50 commit 3062b1e
Show file tree
Hide file tree
Showing 104 changed files with 22,473 additions and 3 deletions.
2,143 changes: 2,143 additions & 0 deletions eBPF_Supermarket/Auto_Cluster_Deployer/tonic/Cargo.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions eBPF_Supermarket/Auto_Cluster_Deployer/tonic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[workspace]
members = [
"tonic",
"tonic-build",
"tonic-web", # Non-published crates
"server_tonic",
"tonic-web/tests/integration",
]
resolver = "2"
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
[package]
authors = ["ya!"]
name = "sacontrol-tonic"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "grpc-web-server"
path = "src/server.rs"

[[bin]]
name = "grpc-web-client"
path = "src/client.rs"

[features]
gcp = ["dep:prost-types", "tonic/tls"]
routeguide = ["dep:async-stream", "tokio-stream", "dep:rand", "dep:serde", "dep:serde_json"]
autoreload = ["tokio-stream/net", "dep:listenfd"]
grpc-web = ["dep:tonic-web", "dep:bytes", "dep:http", "dep:hyper", "dep:hyper-util", "dep:tracing-subscriber", "dep:tower"]
tracing = ["dep:tracing", "dep:tracing-subscriber"]
uds = ["tokio-stream/net", "dep:tower", "dep:hyper", "dep:hyper-util"]
streaming = ["tokio-stream", "dep:h2"]
mock = ["tokio-stream", "dep:tower", "dep:hyper-util"]
tower = ["dep:hyper", "dep:hyper-util", "dep:tower", "tower?/timeout", "dep:http"]
json-codec = ["dep:serde", "dep:serde_json", "dep:bytes"]
compression = ["tonic/gzip"]
tls = ["tonic/tls"]
tls-rustls = ["dep:http", "dep:hyper", "dep:hyper-util", "dep:hyper-rustls", "dep:tower", "tower-http/util", "tower-http/add-extension", "dep:rustls-pemfile", "dep:tokio-rustls", "dep:pin-project", "dep:http-body-util"]
dynamic-load-balance = ["dep:tower"]
timeout = ["tokio/time", "dep:tower", "tower?/timeout"]
tls-client-auth = ["tonic/tls"]
h2c = ["dep:hyper", "dep:tower", "dep:http", "dep:hyper-util"]
cancellation = ["dep:tokio-util"]

full = ["gcp", "routeguide", "autoreload", "grpc-web", "tracing", "uds", "streaming", "mock", "tower", "json-codec", "compression", "tls", "tls-rustls", "dynamic-load-balance", "timeout", "tls-client-auth", "cancellation", "h2c"]
default = ["full"]

[dependencies]
tokio = { version = "1.0", features = ["full"] }
tokio-postgres = "0.7"
prost = "0.13"
tonic = { path = "../tonic" }
tonic-web = { path = "../tonic-web", optional = true }
async-stream = { version = "0.3", optional = true }
tokio-stream = { version = "0.1", optional = true }
tokio-util = { version = "0.7.8", optional = true }
tower = { version = "0.4", optional = true }
toml = "0.5"
rand = { version = "0.8", optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
tracing = { version = "0.1.16", optional = true }
tracing-subscriber = { version = "0.3", features = ["tracing-log", "fmt"], optional = true }
prost-types = { version = "0.13", optional = true }
http = { version = "1", optional = true }
http-body = { version = "1", optional = true }
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1", optional = true }
hyper-util = { version = "0.1.4", optional = true }
listenfd = { version = "1.0", optional = true }
bytes = { version = "1", optional = true }
h2 = { version = "0.4", optional = true }
tokio-rustls = { version = "0.26", optional = true, features = ["ring", "tls12"], default-features = false }
hyper-rustls = { version = "0.27.0", features = ["http2", "ring", "tls12"], optional = true, default-features = false }
rustls-pemfile = { version = "2.0.0", optional = true }
tower-http = { version = "0.5", optional = true }
pin-project = { version = "1.0.11", optional = true }

[build-dependencies]
tonic-build = { path = "../tonic-build", features = ["prost"] }
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::{env, path::PathBuf};

fn main() {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
tonic_build::configure()
.file_descriptor_set_path(out_dir.join("helloworld_descriptor.bin"))
.compile(&["proto/server_tonic.proto"], &["proto"])
.unwrap();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package sacontrol;

service SAControl {
rpc DeployPackages (Empty) returns (Ack);
rpc SARegist (SAInfo) returns (Ack);
rpc GetPackageInfoByIP (IpRequest) returns (PackageInfoResponse);
rpc SendPackageFile(PackageRequest) returns (stream FileChunk);
}

message Empty {}

message IpRequest {
string ip_address = 1;
}

message SAInfo {
string server_ip = 1;
int32 category = 2;
}

message Ack {
bool success = 1;
}

message ProgramDirectory {
string name = 1;
repeated string subdirectories = 2;
}

message TasksList {
repeated string tasks = 1;
}

message TaskRequest {
repeated string tasks = 1;
}

message Data {
string data = 1;
}

message VersionInfo {
string version = 1;
}

message PackageRequest {
int32 category = 1;
}

message FileChunk {
bytes content = 1;
string file_type = 2;
}

message PackageInfoResponse {
int32 id = 1;
string version = 2;
string software_name = 3;
string description = 4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
use sacontrol::{Empty, SaInfo, PackageRequest, IpRequest};
use sacontrol::sa_control_client::SaControlClient;
use tonic::Request;
use std::error::Error;
use std::time::Duration;
use tokio::time;
use tokio::process::Command;
use std::str;
use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use std::sync::Arc;
use serde::Deserialize;
use tokio::sync::Mutex;
use hyper_util::rt::TokioExecutor;
use tonic_web::GrpcWebClientLayer;
use tokio_stream::StreamExt;

pub mod sacontrol {
tonic::include_proto!("sacontrol");
}

#[derive(Debug, Deserialize)]
struct Version {
number: String,
}

#[derive(Debug, Deserialize)]
struct Config {
version: Version,
}

// 获取本机 IP 地址的异步辅助函数
async fn get_local_ip() -> Option<String> {
let output = Command::new("hostname")
.arg("-I")
.output()
.await.ok()?;

let ip = String::from_utf8_lossy(&output.stdout)
.trim()
.split_whitespace()
.next()?
.to_string();

Some(ip)
}

// 解析 deb 包中的版本号
async fn read_deb_version(file_path: &str) -> Result<String, Box<dyn Error>> {
// 检查 deb 文件是否存在
if fs::metadata(file_path).await.is_err() {
println!("未找到 .deb 文件,使用默认版本 0.0");
return Ok("0.0".to_string());
}

// 使用 dpkg-deb 命令提取版本信息
let output = Command::new("dpkg-deb")
.arg("-f")
.arg(file_path)
.arg("Version")
.output()
.await; // 使用 await 等待命令执行完成

match output {
Ok(output) if output.status.success() => {
let version = String::from_utf8_lossy(&output.stdout);
Ok(version.trim().to_string())
}
_ => {
println!("无法读取 .deb 文件版本,使用默认版本 0.0");
Ok("0.0".to_string())
}
}
}

// 解析配置文件并返回 PackageInfo
async fn check_current_version(config_path: &str) -> Result<String, Box<dyn Error>> {
// 尝试读取配置文件内容
let config_content = fs::read_to_string(config_path)
.await
.unwrap_or_else(|_| {
// 打印错误信息
println!("未找到配置文件,使用默认版本 0.0");
"version = '0.0'".to_string()
});

println!("Task config file content: {:?}", config_content);

// 尝试解析配置内容
let config: Config = toml::from_str(&config_content)
.unwrap_or_else(|err| {
// 打印错误信息
println!("Failed to parse task config: {}", err);
// 返回默认版本号
Config {
version: Version {
number: "0.0".to_string(),
},
}
});

// 返回版本号
Ok(config.version.number)
}


#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<(), Box<dyn Error>> {
let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http();

let svc = tower::ServiceBuilder::new()
.layer(GrpcWebClientLayer::new())
.service(client);

let client = SaControlClient::with_origin(svc, "http://192.168.1.158:3000".try_into()?);
let client = Arc::new(Mutex::new(client)); // 将 client 包装在 Arc<Mutex> 中

// 预先检查 sudo 权限
Command::new("sudo")
.arg("-v")
.output()
.await?;

// 循环处理版本检查和更新
loop {
let request = Request::new(Empty {});

let response = {
let mut client_guard = client.lock().await;
client_guard.deploy_packages(request).await?
};
println!("Deploy_packages RESPONSE={:?}", response);

// 获取本机 IP 地址
let local_ip_str = get_local_ip().await.expect("Failed to get local IP address");

// 创建 IpRequest 请求
let request = Request::new(IpRequest { ip_address: local_ip_str.clone() });
let response = {
let mut client_guard = client.lock().await;
client_guard.get_package_info_by_ip(request).await?
};

println!("Get_package_info_by_ip RESPONSE={:?}", response);

let package_info = response.into_inner();

// 如果 id 是 -1,则跳过后续操作,继续下一轮循环
if package_info.id == -1 {
println!("获取到的 id 为 -1,无需执行后续操作,继续下一轮循环。");
continue;
}

let server_version = package_info.version;
let category = package_info.id;

// 获取代理信息
let agent = SaInfo {
server_ip: local_ip_str.clone(),
category: category.clone(),
};

let client_clone = Arc::clone(&client); // 克隆 Arc 以在任务中共享

// 注册代理
let response = {
let mut client_guard = client_clone.lock().await;
client_guard.sa_regist(Request::new(agent)).await?
};
println!("SARegist RESPONSE={:?}", response);

println!("服务器上的包版本: {}", server_version);

// 获取本地版本
let local_version = check_current_version("/usr/local/bin/config.toml").await?;

println!("本地包版本: {}", local_version);

// 比较本地版本与服务器版本
if local_version < server_version {
println!("本地版本较旧,检查接收到的 version 文件中的版本...");

// 读取传输的 version 文件内容
let transmitted_version = read_deb_version("package.deb").await?;

println!("接收到的 deb 文件中的版本: {}", transmitted_version);

// 比较接收到的 version 文件中的版本
if transmitted_version < server_version {
println!("接收到的版本文件版本较旧,准备请求更新包...");

let request = Request::new(PackageRequest { category: category.clone() });
let mut stream = {
let mut client_guard = client.lock().await;
client_guard.send_package_file(request).await?.into_inner()
};

// 创建文件保存接收到的更新包
let mut file_deb = File::create("package.deb").await?;
let mut file_sh = File::create("install.sh").await?;

while let Some(file_chunk) = stream.next().await {
let file_chunk = file_chunk?;
let content = &file_chunk.content;

// 按文件类型保存内容
match file_chunk.file_type.as_str() {
"deb" => file_deb.write_all(content).await?,
"sh" => file_sh.write_all(content).await?,
_ => println!("未知的文件类型: {}", file_chunk.file_type),
}
}

println!("更新包已接收完毕,准备执行安装脚本...");

} else {
println!("传输的 deb 文件中的版本不小于服务器版本,无需更新。");
}
} else {
println!("本地版本已是最新,无需更新。");
}

// 等待一段时间后再进行下一轮检查
time::sleep(Duration::from_secs(5)).await;
}
}
Loading

0 comments on commit 3062b1e

Please sign in to comment.