Skip to content

Commit

Permalink
clean up synced cmds
Browse files Browse the repository at this point in the history
  • Loading branch information
catornot committed Nov 28, 2023
1 parent 8b31bef commit 31ec5ac
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 145 deletions.
4 changes: 2 additions & 2 deletions src/console_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static_detour! {
pub fn hook_write_console() {
unsafe {
if !std::env::args().any(|arg| arg == "-dedicated") {
log::info!("this isn't a dedicated server; not hooking WriteConsoleA");
log::warn!("this isn't a dedicated server; not hooking WriteConsoleA");
return;
}

Expand All @@ -42,7 +42,7 @@ pub fn hook_write_console() {

pub fn hook_console_print(addr: isize) -> Option<()> {
unsafe {
if PLUGIN.wait().console_recv.try_lock().is_some() {
if PLUGIN.wait().server.is_none() {
log::warn!("rcon not running -> no Print hook");
return None;
}
Expand Down
66 changes: 7 additions & 59 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
use bindings::{CmdSource, EngineFunctions, ENGINE_FUNCTIONS};
use bindings::{EngineFunctions, ENGINE_FUNCTIONS};
use console_hook::{hook_console_print, hook_write_console};
use parking_lot::Mutex;
use rcon::{RconServer, RconTask};
use rrplug::{mid::engine::WhichDll, prelude::*, to_c_string};
use rcon::RconServer;
use rrplug::{mid::engine::WhichDll, prelude::*};
use std::{
collections::HashMap,
env,
sync::mpsc::{self, Receiver, Sender},
sync::mpsc::{self, Sender},
};

pub mod bindings;
pub mod console;
pub mod console_hook;
pub mod rcon;

const VALID_RCON_ARGS: [&str; 2] = ["rcon_ip_port", "rcon_password"];

pub struct RconPlugin {
rcon_tasks: Mutex<Receiver<RconTask>>,
rcon_send_tasks: Mutex<Sender<RconTask>>, // mutex is not needed but it must sync so clone on each thread
console_sender: Mutex<Sender<String>>,
console_recv: Mutex<Receiver<String>>,
server: Option<Mutex<RconServer>>,
}

impl Plugin for RconPlugin {
fn new(_: &PluginData) -> Self {
let (sender, recv) = mpsc::channel();
let (console_sender, console_recv) = mpsc::channel();

let rcon_args = env::args()
Expand All @@ -47,7 +44,7 @@ impl Plugin for RconPlugin {
break 'start_server;
};

server = RconServer::try_new(&bind_ip, password)
server = RconServer::try_new(&bind_ip, password, console_recv)
.map_err(|err| log::info!("failed to connect to socket : {err:?}"))
.map(|s| {
hook_write_console();
Expand All @@ -56,13 +53,8 @@ impl Plugin for RconPlugin {
.ok();
}

// std::thread::spawn(move || _ = run_rcon(args));

Self {
rcon_tasks: Mutex::new(recv),
rcon_send_tasks: Mutex::new(sender),
console_sender: Mutex::new(console_sender),
console_recv: Mutex::new(console_recv),
server: server.map(|s| s.into()),
}
}
Expand All @@ -77,51 +69,7 @@ impl Plugin for RconPlugin {
}

fn runframe(&self) {
// can be moved somewhere else

let funcs = ENGINE_FUNCTIONS.wait();

// if let Ok(task) = self.rcon_tasks.lock().try_recv() {
if let Ok(tasks) = self
.server
.as_ref()
// .map(|s| s.lock().run(self.console_recv.lock().try_recv().ok()))
.map(|s| s.lock().run(None))
.flatten()
.ok_or(())
{
for task in tasks.into_iter() {
match task {
RconTask::Runcommand(cmd) => unsafe {
log::info!("executing command : {cmd}");

let cmd = to_c_string!(cmd);
(funcs.cbuf_add_text_type)(
(funcs.cbuf_get_current_player)(),
cmd.as_ptr(),
CmdSource::Code,
);
},
}
}
}
}
}

fn run_rcon(server: &mut RconServer) -> std::convert::Infallible {
let rcon = PLUGIN.wait();

let rcon_send_tasks = rcon.rcon_send_tasks.lock();
let console_recv = rcon.console_recv.lock();

loop {
let new_console_line = console_recv.try_recv().ok();

if let Some(tasks) = server.run(new_console_line) {
tasks
.into_iter()
.for_each(|task| rcon_send_tasks.send(task).expect("failed to send tasks"))
}
_ = self.server.as_ref().map(|s| s.lock().run());
}
}

Expand Down
157 changes: 73 additions & 84 deletions src/rcon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,22 @@ use rrplug::to_c_string;
use std::{
io::{self, Read, Write},
net::{TcpListener, TcpStream},
sync::mpsc::Receiver,
};
use thiserror::Error;

use crate::bindings::{CmdSource, ENGINE_FUNCTIONS};

#[derive(Debug, Clone)]
pub enum RconTask {
Runcommand(String),
}

// this could be fun https://discord.com/channels/920776187884732556/922663696273125387/1134900622773194782
use crate::{
bindings::{CmdSource, ENGINE_FUNCTIONS},
console::ConsoleAccess,
};

const SERVERDATA_AUTH: i32 = 3;
const SERVERDATA_EXECCOMMAND: i32 = 2;
const SERVERDATA_AUTH_RESPONSE: i32 = 2;
const SERVERDATA_RESPONSE_VALUE: i32 = 0;
const MAX_PACKET_SIZE: usize = 4096;
const MIN_PACKET_SIZE: usize = 10;
const MAX_CONTENT_SIZE: usize = MAX_PACKET_SIZE - MIN_PACKET_SIZE;
pub const MAX_CONTENT_SIZE: usize = MAX_PACKET_SIZE - MIN_PACKET_SIZE;

#[derive(Debug, Error)]
pub enum RconRequestError {
Expand Down Expand Up @@ -83,12 +80,15 @@ pub struct RconServer {
password: String,
server: TcpListener,
connections: Vec<RconStream>,
tasks: Vec<RconTask>,
cmd_buffer: Vec<String>,
console: ConsoleAccess,
}

impl RconServer {
pub fn try_new(bind_ip: &str, password: impl Into<String>) -> Result<Self, std::io::Error> {
pub fn try_new(
bind_ip: &str,
password: impl Into<String>,
console_recv: Receiver<String>,
) -> Result<Self, std::io::Error> {
let server = TcpListener::bind(bind_ip)?;

server.set_nonblocking(true)?;
Expand All @@ -97,28 +97,14 @@ impl RconServer {
password: password.into(),
server,
connections: Vec::new(),
tasks: Vec::new(),
cmd_buffer: Vec::new(),
console: ConsoleAccess::new(console_recv),
};

Ok(rcon_server)
}

pub fn run(&mut self, new_console_line: Option<String>) -> Option<Vec<RconTask>> {
if let Some(line) = new_console_line {
let line_size = line.len();
let mut buffer_size = 0;

for bline in self.cmd_buffer.drain(..).rev().collect::<Vec<String>>() {
buffer_size += bline.len();
if buffer_size + line_size > MAX_CONTENT_SIZE {
break;
}

self.cmd_buffer.insert(0, bline);
}
self.cmd_buffer.push(line);
}
pub fn run(&mut self) {
while let Some(_) = self.console.next_line_catpure() {} // string allocation could be remove

match self.server.accept() {
Ok((conn, addr)) => match conn.set_nonblocking(true) {
Expand All @@ -138,12 +124,8 @@ impl RconServer {
}

for i in 0..self.connections.len() {
match handle_connection(&mut self.connections[i], &self.password, &self.cmd_buffer) {
Ok(maybe_task) => {
if let Some(task) = maybe_task {
self.tasks.push(task)
}
}
match handle_connection(&mut self.connections[i], &self.password, &mut self.console) {
Ok(_) => {}
Err(err) => {
match &err {
RconRequestError::SocketError(err) => match err.kind() {
Expand All @@ -159,22 +141,25 @@ impl RconServer {
}
}
}

if self.tasks.is_empty() {
None
} else {
Some(self.tasks.drain(0..).collect())
}
}
}

pub fn handle_connection(
conn: &mut RconStream,
password: &str,
cmd_buffer: &[String],
) -> Result<Option<RconTask>, RconRequestError> {
let stream = &mut conn.stream;
console: &mut ConsoleAccess,
) -> Result<(), RconRequestError> {
let (client_id, request_type, content) = read_rcon_stream(&mut conn.stream)?;

let response = parse_response(conn, password, console, client_id, request_type, content)?;

let buf: Vec<u8> = response.into();
conn.stream.write_all(&buf)?;

Ok(())
}

fn read_rcon_stream(stream: &mut TcpStream) -> Result<(i32, i32, String), RconRequestError> {
let mut size_buf = vec![0; 4];
let bytes_read = stream.read(&mut size_buf)?;
if bytes_read != 4 {
Expand Down Expand Up @@ -212,35 +197,54 @@ pub fn handle_connection(
.or(Err(RconRequestError::VecToArrayError))?,
);

let content = String::from_utf8_lossy(&buf).to_string().replace('\0', "");
Ok((
client_id,
request_type,
String::from_utf8_lossy(&buf).to_string().replace('\0', ""),
)) // TODO: maybe use a struct
}

let (response, task) = match request_type {
fn parse_response(
conn: &mut RconStream,
password: &str,
console: &mut ConsoleAccess,
client_id: i32,
request_type: i32,
content: String,
) -> Result<RconResponse, RconRequestError> {
let response = match request_type {
SERVERDATA_AUTH => {
if content == password {
conn.auth = true;

log::info!("auth successful");

(
RconResponse {
id: client_id,
ty: SERVERDATA_AUTH_RESPONSE,
content: String::new(),
},
None,
)
RconResponse {
id: client_id,
ty: SERVERDATA_AUTH_RESPONSE,
content: String::new(),
}
} else {
log::warn!("auth failed");
conn.auth = false;

(
RconResponse {
id: -1,
ty: SERVERDATA_AUTH_RESPONSE,
content: String::new(),
},
None,
)
RconResponse {
id: -1,
ty: SERVERDATA_AUTH_RESPONSE,
content: String::new(),
}
}
}
SERVERDATA_EXECCOMMAND if content == "dumpconsole" => {
if !conn.auth {
Err(RconRequestError::InvalidClientID(client_id))?
}
log::info!("sending console dump");

RconResponse {
id: client_id,
ty: SERVERDATA_RESPONSE_VALUE,
content: console.get_last_console_output().iter().cloned().collect(),
}
}
SERVERDATA_EXECCOMMAND => {
Expand All @@ -262,33 +266,18 @@ pub fn handle_connection(
}

let mut response = String::new();
while let Some(console_out) = crate::PLUGIN
.get()
.unwrap()
.console_recv
.lock()
.try_recv()
.ok()
{
while let Some(console_out) = console.next_line_catpure() {
response += &console_out;
}

(
RconResponse {
id: client_id,
ty: SERVERDATA_RESPONSE_VALUE,
// content: cmd_buffer.iter().cloned().collect(),
content: response,
},
// Some(RconTask::Runcommand(content)),
None,
)
RconResponse {
id: client_id,
ty: SERVERDATA_RESPONSE_VALUE,
content: response,
}
}
request_num => Err(RconRequestError::InvalidRequestType(request_num))?,
};

let buf: Vec<u8> = response.into();
stream.write_all(&buf)?;

Ok(task)
Ok(response)
}

0 comments on commit 31ec5ac

Please sign in to comment.