Skip to content

Commit

Permalink
feat: Adds log file rotation and limit
Browse files Browse the repository at this point in the history
  • Loading branch information
andyquinterom committed Nov 17, 2024
1 parent b0addb0 commit 6d33eb1
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 91 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
tarpaulin-report.html
venv/
.Rproj.user
*.log
*.log.bak
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ tokio-postgres-rustls = "0.13.0"
rustls = "0.23.16"
chrono = "0.4.38"
uuid = { version = "1.11.0", features = ["v7"] }
parse-size = "1.0.0"

[dev-dependencies]
rand = "0.8.5"
2 changes: 2 additions & 0 deletions examples/quarto/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.html
*_files/
20 changes: 20 additions & 0 deletions examples/quarto/example.qmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
title: "Old Faithful"
format: html
server: shiny
---

```{r}
sliderInput("bins", "Number of bins:",
min = 1, max = 50, value = 30)
plotOutput("distPlot")
```

```{r}
#| context: server
output$distPlot <- renderPlot({
x <- faithful[, 2] # Old Faithful Geyser data
bins <- seq(min(x), max(x), length.out = input$bins + 1)
hist(x, breaks = bins, col = 'darkgray', border = 'white')
})
```
67 changes: 25 additions & 42 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ pub enum Shutdown {

#[derive(Parser, Debug)]
pub struct StartArgs {
/// The host to bind to.
#[arg(long, env = "FAUCET_HOST", default_value = "127.0.0.1:3838")]
pub host: String,

/// The number of threads to use to handle requests.
#[arg(short, long, env = "FAUCET_WORKERS", default_value_t = num_cpus::get())]
pub workers: usize,
Expand All @@ -92,55 +88,17 @@ pub struct StartArgs {
#[arg(short, long, env = "FAUCET_DIR", default_value = ".")]
pub dir: PathBuf,

/// The IP address to extract from.
/// Defaults to client address.
#[arg(short, long, env = "FAUCET_IP_FROM", default_value = "client")]
pub ip_from: IpFrom,

/// Command, path, or executable to run Rscript.
#[arg(long, short, env = "FAUCET_RSCRIPT", default_value = "Rscript")]
pub rscript: OsString,

/// Command, path, or executable to run quarto.
#[arg(long, env = "FAUCET_QUARTO", default_value = "quarto")]
pub quarto: OsString,

/// Argument passed on to `appDir` when running Shiny.
#[arg(long, short, env = "FAUCET_APP_DIR", default_value = None)]
pub app_dir: Option<String>,

/// Quarto Shiny file path.
#[arg(long, short, env = "FAUCET_QMD", default_value = None)]
pub qmd: Option<PathBuf>,

/// Save logs to a file. Will disable colors!
#[arg(long, short, env = "FAUCET_LOG_FILE", default_value = None)]
pub log_file: Option<PathBuf>,
}

#[derive(Parser, Debug)]
pub struct RouterArgs {
/// The host to bind to.
#[arg(long, env = "FAUCET_HOST", default_value = "127.0.0.1:3838")]
pub host: String,

/// The IP address to extract from.
/// Defaults to client address.
#[arg(short, long, env = "FAUCET_IP_FROM", default_value = "client")]
pub ip_from: IpFrom,

/// Command, path, or executable to run Rscript.
#[arg(long, short, env = "FAUCET_RSCRIPT", default_value = "Rscript")]
pub rscript: OsString,

/// Command, path, or executable to run quarto.
#[arg(long, short, env = "FAUCET_QUARTO", default_value = "quarto")]
pub quarto: OsString,

/// Save logs to a file. Will disable colors!
#[arg(long, short, env = "FAUCET_LOG_FILE", default_value = None)]
pub log_file: Option<PathBuf>,

/// Router config file.
#[arg(
long,
Expand Down Expand Up @@ -176,6 +134,31 @@ pub struct Args {
#[command(subcommand)]
pub command: Commands,

/// The host to bind to.
#[arg(long, env = "FAUCET_HOST", default_value = "127.0.0.1:3838")]
pub host: String,

/// The IP address to extract from.
/// Defaults to client address.
#[arg(short, long, env = "FAUCET_IP_FROM", default_value = "client")]
pub ip_from: IpFrom,

/// Command, path, or executable to run Rscript.
#[arg(long, short, env = "FAUCET_RSCRIPT", default_value = "Rscript")]
pub rscript: OsString,

/// Command, path, or executable to run quarto.
#[arg(long, short, env = "FAUCET_QUARTO", default_value = "quarto")]
pub quarto: OsString,

/// Save logs to a file. Will disable colors!
#[arg(long, short, env = "FAUCET_LOG_FILE", default_value = None)]
pub log_file: Option<PathBuf>,

#[arg(long, short, env = "FAUCET_MAX_LOG_FILE_SIZE", default_value = None, value_parser = |s: &str| parse_size::parse_size(s))]
/// The maximum size of the log file. (Ex. 10M, 1GB)
pub max_log_file_size: Option<u64>,

/// The strategy for shutting down faucet
#[arg(long, env = "FAUCET_SHUTDOWN", default_value = "immediate")]
pub shutdown: Shutdown,
Expand Down
55 changes: 26 additions & 29 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use faucet_server::{cli::Shutdown, shutdown};
pub async fn main() -> FaucetResult<()> {
let cli_args = Args::parse();

let signal = match cli_args.shutdown {
let shutdown_signal = match cli_args.shutdown {
Shutdown::Immediate => shutdown::immediate(),
Shutdown::Graceful => shutdown::graceful(),
};
Expand All @@ -29,55 +29,48 @@ pub async fn main() -> FaucetResult<()> {
}
});

let log_thread_handle = build_logger(
cli_args
.log_file
.as_ref()
.map_or(faucet_server::server::logger::Target::Stderr, |file| {
faucet_server::server::logger::Target::File(file.to_path_buf())
}),
cli_args.max_log_file_size,
shutdown_signal.clone(),
);

match cli_args.command {
Commands::Start(start_args) => {
build_logger(
start_args
.log_file
.as_ref()
.map_or(faucet_server::server::logger::Target::Stderr, |file| {
faucet_server::server::logger::Target::File(file.to_path_buf())
}),
);

log::info!(target: "faucet", "Building the faucet server...");

FaucetServerBuilder::new()
.strategy(Some(start_args.strategy.into()))
.workers(start_args.workers)
.server_type(start_args.server_type())
.extractor(start_args.ip_from.into())
.bind(start_args.host.parse()?)
.extractor(cli_args.ip_from.into())
.bind(cli_args.host.parse()?)
.workdir(start_args.dir)
.rscript(start_args.rscript)
.rscript(cli_args.rscript)
.app_dir(start_args.app_dir)
.quarto(start_args.quarto)
.quarto(cli_args.quarto)
.qmd(start_args.qmd)
.telemetry(telemetry.as_ref())
.build()?
.run(signal)
.run(shutdown_signal)
.await?;
}
Commands::Router(router_args) => {
build_logger(
router_args
.log_file
.as_ref()
.map_or(faucet_server::server::logger::Target::Stderr, |file| {
faucet_server::server::logger::Target::File(file.to_path_buf())
}),
);

let config: RouterConfig =
toml::from_str(&std::fs::read_to_string(router_args.conf).unwrap()).unwrap();

config
.run(
router_args.rscript,
router_args.quarto,
router_args.ip_from.into(),
router_args.host.parse()?,
signal,
cli_args.rscript,
cli_args.quarto,
cli_args.ip_from.into(),
cli_args.host.parse()?,
shutdown_signal,
telemetry.as_ref(),
)
.await?;
Expand All @@ -90,5 +83,9 @@ pub async fn main() -> FaucetResult<()> {
let _ = telemetry.http_events_join_handle.await;
}

if let Some(handle) = log_thread_handle {
let _ = handle.await;
}

Ok(())
}
89 changes: 71 additions & 18 deletions src/server/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,118 @@ pub mod logger {
use std::{io::BufWriter, io::Write, path::PathBuf};

use hyper::body::Bytes;
use tokio::task::JoinHandle;

use crate::shutdown::ShutdownSignal;

pub enum Target {
Stderr,
File(PathBuf),
}

struct LogFileWriter {
sender: std::sync::mpsc::Sender<Bytes>,
sender: tokio::sync::mpsc::Sender<Bytes>,
}

impl std::io::Write for LogFileWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let _ = self.sender.send(Bytes::copy_from_slice(buf));
let _ = self.sender.try_send(Bytes::copy_from_slice(buf));
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

fn start_log_writer_thread(path: PathBuf) -> LogFileWriter {
fn start_log_writer_thread(
path: PathBuf,
max_file_size: Option<u64>,
shutdown: ShutdownSignal,
) -> (LogFileWriter, JoinHandle<()>) {
let max_file_size = max_file_size.unwrap_or(u64::MAX);
let mut current_file_size = match std::fs::metadata(&path) {
Ok(md) => md.len(),
Err(_) => 0,
};
let file = std::fs::File::options()
.create(true)
.append(true)
.truncate(false)
.open(&path)
.expect("Unable to open or create log file");

// Create a file path to a backup of the previous logs with MAX file size
let mut copy_path = path.clone();
copy_path.as_mut_os_string().push(".bak");

let mut writer = BufWriter::new(file);
let mut stderr = BufWriter::new(std::io::stderr());
let (sender, receiver) = std::sync::mpsc::channel::<Bytes>();
std::thread::spawn(move || {
while let Ok(bytes) = receiver.recv() {
if let Err(e) = stderr.write_all(bytes.as_ref()) {
eprintln!("Unable to write to stderr: {e}");
};
if let Err(e) = writer.write_all(bytes.as_ref()) {
eprintln!("Unable to write to {path:?}: {e}");
};
let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(1000);
let writer_thread = tokio::task::spawn(async move {
loop {
tokio::select! {
bytes = receiver.recv() => {
match bytes {
Some(bytes) => {
if let Err(e) = stderr.write_all(bytes.as_ref()) {
eprintln!("Unable to write to stderr: {e}");
};

if let Err(e) = writer.write_all(bytes.as_ref()) {
eprintln!("Unable to write to {path:?}: {e}");
};

current_file_size += bytes.len() as u64;
if current_file_size > max_file_size {
// Flush the writer
let _ = writer.flush();
let file = writer.get_mut();

// Copy the current file to the backup
if let Err(e) = std::fs::copy(&path, &copy_path) {
log::error!("Unable to copy logs to backup file: {e}");
}

// Truncate the logs file
if let Err(e) = file.set_len(0) {
log::error!("Unable to truncate logs file: {e}");
}

current_file_size = 0;
}
},
None => break
}
},
_ = shutdown.wait() => break
}
}
let _ = writer.flush();
let _ = stderr.flush();
});
LogFileWriter { sender }
(LogFileWriter { sender }, writer_thread)
}

pub fn build_logger(target: Target) {
let target = match target {
pub fn build_logger(
target: Target,
max_file_size: Option<u64>,
shutdown: ShutdownSignal,
) -> Option<JoinHandle<()>> {
let (target, handle) = match target {
Target::File(path) => {
let writer = start_log_writer_thread(path);
env_logger::Target::Pipe(Box::new(writer))
let (writer, handle) = start_log_writer_thread(path, max_file_size, shutdown);
(env_logger::Target::Pipe(Box::new(writer)), Some(handle))
}
Target::Stderr => env_logger::Target::Stderr,
Target::Stderr => (env_logger::Target::Stderr, None),
};

let mut env_builder = env_logger::Builder::new();
env_builder
.parse_env(env_logger::Env::new().filter_or("FAUCET_LOG", "info"))
.target(target)
.init();

handle
}
}

Expand Down
Loading

0 comments on commit 6d33eb1

Please sign in to comment.