Skip to content

Commit

Permalink
Type erase response api
Browse files Browse the repository at this point in the history
  • Loading branch information
TrueDoctor committed Nov 24, 2023
1 parent 6b4acd6 commit 6e9f012
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 42 deletions.
49 changes: 47 additions & 2 deletions chai/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::{collections::HashMap, ops::ControlFlow};
use std::io::Write;
use std::{collections::HashMap, fs, ops::ControlFlow};

use kvv::LineDepartures;
use tracing::{info, log::warn};
use ws_message::Team;
use ws_message::{GameState, Team};

use crate::ws_message::ClientMessage;

pub mod kvv;
pub mod point;
pub mod unique_id;
pub mod ws_message;
const LOG_FILE: &str = "log.csv";

#[derive(Debug)]
pub enum InputMessage {
Expand All @@ -23,6 +25,12 @@ pub enum ServerMessage {
ClientDisconnected(u32),
}

#[derive(Debug)]
pub enum ServerResponse {
Broadcast(String),
P2P(String, u32),
}

#[derive(Debug)]
pub struct Connection {
pub id: u32,
Expand Down Expand Up @@ -107,3 +115,40 @@ pub fn process_message(
}
ControlFlow::Continue(())
}

pub fn generate_respone(
departures: &HashMap<String, kvv::Journey>,
state: &mut AppState,
) -> ServerResponse {
let time = chrono::Utc::now();
let mut trains = kvv::train_positions(departures, time);
trains.retain(|x| !x.line_id.contains("bus"));
let mut log_file = fs::OpenOptions::new()
.append(true)
.create(true)
.open(LOG_FILE)
.unwrap();

// update positions for players on trains
for team in state.teams.iter_mut() {
if let Some(train_id) = &team.on_train {
if let Some(train) = trains.iter().find(|x| &x.line_id == train_id) {
team.long = train.long;
team.lat = train.lat;
}
}
}

let game_state = GameState {
teams: state.teams.clone(),
trains,
};
writeln!(
log_file,
"{}, {}",
time.with_timezone(&chrono_tz::Europe::Berlin).to_rfc3339(),
serde_json::to_string(&game_state).unwrap()
)
.unwrap();
ServerResponse::Broadcast(serde_json::to_string(&game_state).unwrap())
}
61 changes: 21 additions & 40 deletions robusta/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs;
use std::io::Write;
use std::time::Duration;
use std::{collections::HashMap, ops::ControlFlow};

Expand All @@ -14,9 +13,9 @@ use axum::{
routing::{get, get_service, post},
Json, Router,
};
use chai::kvv;
use chai::unique_id::UniqueIdGen;
use chai::ws_message::{GameState, Team};
use chai::ws_message::Team;
use chai::{kvv, ServerResponse};
use chai::{ws_message, InputMessage, ServerMessage};
use futures_util::SinkExt;
use reqwest::StatusCode;
Expand All @@ -28,7 +27,6 @@ use tower_http::{
};
use tracing::{error, info, warn, Level};

const LOG_FILE: &str = "log.csv";
const TEAMS_FILE: &str = "teams.json";

/// The name used for the Mr. X team.
Expand All @@ -43,6 +41,7 @@ struct Client {

#[derive(Debug)]
struct ClientConnection {
id: u32,
send: tokio::sync::mpsc::Sender<String>,
}

Expand Down Expand Up @@ -74,7 +73,7 @@ async fn handler(ws: WebSocketUpgrade, State(state): State<SharedState>) -> Resp
let client = {
let mut state = state.lock().await;
let id = state.client_id_gen.next();
let client_connection = ClientConnection { send };
let client_connection = ClientConnection { id, send };
state.connections.push(client_connection);
Client {
recv: rec,
Expand Down Expand Up @@ -306,60 +305,42 @@ async fn main() {

async fn run_game_loop(mut recv: tokio::sync::mpsc::Receiver<InputMessage>, state: SharedState) {
let mut departures = HashMap::new();
let mut log_file = fs::OpenOptions::new()
.append(true)
.create(true)
.open(LOG_FILE)
.unwrap();
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
interval.tick().await;

let mut state = state.lock().await;
while let Ok(msg) = recv.try_recv() {
// TODO: this does not do anything yet
if let ControlFlow::Break(_) =
chai::process_message(msg, &mut state.teams_state, &mut departures)
{
continue;
}
}

let time = chrono::Utc::now();
let mut trains = kvv::train_positions(&departures, time);
trains.retain(|x| !x.line_id.contains("bus"));

// update positions for players on trains
for team in state.teams_state.teams.iter_mut() {
if let Some(train_id) = &team.on_train {
if let Some(train) = trains.iter().find(|x| &x.line_id == train_id) {
team.long = train.long;
team.lat = train.lat;
}
}
}

let game_state = GameState {
teams: state.teams_state.teams.clone(),
trains,
};
let msg = chai::generate_respone(&departures, &mut state.teams_state);

writeln!(
log_file,
"{}, {}",
time.with_timezone(&chrono_tz::Europe::Berlin).to_rfc3339(),
serde_json::to_string(&game_state).unwrap()
)
.unwrap();
fs::write(
TEAMS_FILE,
serde_json::to_string_pretty(&game_state.teams).unwrap(),
serde_json::to_string_pretty(&state.teams_state.teams).unwrap(),
)
.unwrap();

let msg = serde_json::to_string(&game_state).unwrap();
for connection in state.connections.iter_mut() {
if connection.send.send(msg.clone()).await.is_err() {
continue;
match msg {
ServerResponse::Broadcast(msg) => {
for connection in state.connections.iter_mut() {
if connection.send.send(msg.clone()).await.is_err() {
continue;
}
}
}
ServerResponse::P2P(msg, id) => {
if let Some(connection) = state.connections.iter_mut().find(|x| x.id == id) {
if connection.send.send(msg).await.is_err() {
continue;
}
}
}
}
}
Expand Down

0 comments on commit 6e9f012

Please sign in to comment.