Skip to content

Commit

Permalink
Revert Logger to use OS thread
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Sep 2, 2024
1 parent 839d2ee commit 8b33d3c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 53 deletions.
1 change: 0 additions & 1 deletion RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ Released on TBD (UTC).
- Improved `@customdataclass` decorator with `date` field (#1900), thanks @faysou
- Refactored `RedisMessageBusDatabase` to tokio tasks
- Refactored `RedisCacheDatabase` to tokio tasks
- Refactored `Logger` to tokio tasks
- Upgraded `tokio` crate to v1.40.0

### Breaking Changes
Expand Down
91 changes: 39 additions & 52 deletions nautilus_core/common/src/logging/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{collections::HashMap, env, fmt::Display, str::FromStr, sync::atomic::Ordering};
use std::{
collections::HashMap,
env,
fmt::Display,
str::FromStr,
sync::{atomic::Ordering, mpsc::SendError},
};

use indexmap::IndexMap;
use log::{
Expand All @@ -34,7 +40,6 @@ use super::{LOGGING_BYPASSED, LOGGING_REALTIME};
use crate::{
enums::{LogColor, LogLevel},
logging::writer::{FileWriter, FileWriterConfig, LogWriter, StderrWriter, StdoutWriter},
runtime::get_runtime,
};

#[cfg_attr(
Expand Down Expand Up @@ -142,14 +147,14 @@ impl LoggerConfig {

/// A high-performance logger utilizing a MPSC channel under the hood.
///
/// A separate task is spawned at initialization which receives [`LogEvent`] structs over the
/// A separate thread is spawned at initialization which receives [`LogEvent`] structs over the
/// channel.
#[derive(Debug)]
pub struct Logger {
/// Configure maximum levels for components and IO.
pub config: LoggerConfig,
/// Send log events to a separate task.
tx: tokio::sync::mpsc::UnboundedSender<LogEvent>,
/// Send log events to a separate thread.
tx: std::sync::mpsc::Sender<LogEvent>,
}

/// Represents a type of log event.
Expand Down Expand Up @@ -278,31 +283,19 @@ impl Log for Logger {
component,
message: format!("{}", record.args()),
};

// Check if channel closed first so we can log the line without cloning
if self.tx.is_closed() {
log_send_error(line.to_string());
} else if let Err(e) = self.tx.send(LogEvent::Log(line)) {
log_send_error(e.to_string());
if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
eprintln!("Error sending log event: {line}");
}
}
}

fn flush(&self) {
if let Err(e) = self.tx.send(LogEvent::Flush) {
log_flush_error(e.to_string());
eprintln!("Error sending flush log event: {e}");
}
}
}

fn log_send_error(msg: String) {
eprintln!("Error sending log event: {msg}");
}

fn log_flush_error(msg: String) {
eprintln!("Error sending flush log event: {msg}");
}

#[allow(clippy::too_many_arguments)]
impl Logger {
#[must_use]
Expand All @@ -322,7 +315,7 @@ impl Logger {
config: LoggerConfig,
file_config: FileWriterConfig,
) -> LogGuard {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<LogEvent>();
let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();

let logger = Self {
tx,
Expand All @@ -335,19 +328,23 @@ impl Logger {
println!("Logger initialized with {config:?} {file_config:?}");
}

let mut handle: Option<tokio::task::JoinHandle<()>> = None;
let mut handle: Option<std::thread::JoinHandle<()>> = None;
match set_boxed_logger(Box::new(logger)) {
Ok(()) => {
handle = Some(get_runtime().spawn(async move {
Self::handle_messages(
trader_id.to_string(),
instance_id.to_string(),
config,
file_config,
rx,
)
.await;
}));
handle = Some(
std::thread::Builder::new()
.name("logging".to_string())
.spawn(move || {
Self::handle_messages(
trader_id.to_string(),
instance_id.to_string(),
config,
file_config,
rx,
);
})
.expect("Error spawning `logging` thread"),
);

let max_level = log::LevelFilter::Trace;
set_max_level(max_level);
Expand All @@ -363,15 +360,15 @@ impl Logger {
LogGuard::new(handle)
}

async fn handle_messages(
fn handle_messages(
trader_id: String,
instance_id: String,
config: LoggerConfig,
file_config: FileWriterConfig,
mut rx: tokio::sync::mpsc::UnboundedReceiver<LogEvent>,
rx: std::sync::mpsc::Receiver<LogEvent>,
) {
if config.print_config {
println!("Logger task `handle_messages` initialized");
println!("Logger `handle_messages` initialized");
}

let LoggerConfig {
Expand All @@ -396,7 +393,7 @@ impl Logger {
};

// Continue to receive and handle log events until channel is hung up
while let Some(event) = rx.recv().await {
while let Ok(event) = rx.recv() {
match event {
LogEvent::Flush => {
break;
Expand Down Expand Up @@ -483,13 +480,13 @@ pub fn log(level: LogLevel, color: LogColor, component: Ustr, message: &str) {
)]
#[derive(Debug)]
pub struct LogGuard {
handle: Option<tokio::task::JoinHandle<()>>,
handle: Option<std::thread::JoinHandle<()>>,
}

impl LogGuard {
/// Creates a new [`LogGuard`] instance.
#[must_use]
pub const fn new(handle: Option<tokio::task::JoinHandle<()>>) -> Self {
pub const fn new(handle: Option<std::thread::JoinHandle<()>>) -> Self {
Self { handle }
}
}
Expand All @@ -504,15 +501,8 @@ impl Default for LogGuard {
impl Drop for LogGuard {
fn drop(&mut self) {
log::logger().flush();

if let Some(handle) = self.handle.take() {
tokio::task::block_in_place(|| {
get_runtime().block_on(async {
if let Err(e) = handle.await {
eprintln!("Error awaiting logging task: {e:?}");
}
});
});
handle.join().expect("Error joining logging handle");
}
}
}
Expand Down Expand Up @@ -591,8 +581,7 @@ mod tests {
}

#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_logging_to_file() {
fn test_logging_to_file() {
let config = LoggerConfig {
fileout_level: LevelFilter::Debug,
..Default::default()
Expand Down Expand Up @@ -656,8 +645,7 @@ mod tests {
}

#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_log_component_level_filtering() {
fn test_log_component_level_filtering() {
let config = LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error");

let temp_dir = tempdir().expect("Failed to create temporary directory");
Expand Down Expand Up @@ -711,8 +699,7 @@ mod tests {
}

#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_logging_to_file_in_json_format() {
fn test_logging_to_file_in_json_format() {
let config =
LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info");

Expand Down

0 comments on commit 8b33d3c

Please sign in to comment.