Skip to content

Commit

Permalink
chore: Improves from clippy suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
andyquinterom committed Oct 23, 2024
1 parent e34116e commit 61d822a
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 122 deletions.
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ serde = { version = "1.0.204", features = ["derive"] }
fxhash = "0.2.1"
toml = "0.8.19"
nix = "0.29.0"
signal-hook = "0.3.17"
ctrlc = { version = "3.4.5", features = ["termination"] }

[dev-dependencies]
Expand Down
2 changes: 0 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl From<IpFrom> for load_balancing::IpExtractor {
}
}

#[cfg(unix)]
#[derive(clap::ValueEnum, Debug, Clone, Copy, Default)]
pub enum Shutdown {
Graceful,
Expand Down Expand Up @@ -178,7 +177,6 @@ pub struct Args {
pub command: Commands,

/// The strategy for shutting down faucet
#[cfg(unix)]
#[arg(long, env = "FAUCET_SHUTDOWN", default_value = "immediate")]
pub shutdown: Shutdown,
}
Expand Down
41 changes: 19 additions & 22 deletions src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@ use crate::{
networking::get_available_sockets,
server::FaucetServerConfig,
};
use std::{
ffi::OsStr,
net::SocketAddr,
path::Path,
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use tokio::{process::Child, sync::Mutex, task::JoinHandle};
use std::{ffi::OsStr, net::SocketAddr, path::Path, sync::atomic::AtomicBool, time::Duration};
use tokio::{process::Child, task::JoinHandle};
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LinesCodec};

Expand Down Expand Up @@ -60,17 +54,17 @@ fn log_stdio(mut child: Child, target: &'static str) -> FaucetResult<Child> {
}

#[derive(Copy, Clone)]
pub(crate) struct WorkerConfig {
pub(crate) wtype: WorkerType,
pub(crate) app_dir: Option<&'static str>,
pub(crate) rscript: &'static OsStr,
pub(crate) quarto: &'static OsStr,
pub(crate) workdir: &'static Path,
pub(crate) addr: SocketAddr,
pub(crate) target: &'static str,
pub(crate) worker_id: usize,
pub(crate) is_online: &'static AtomicBool,
pub(crate) qmd: Option<&'static Path>,
pub struct WorkerConfig {
pub wtype: WorkerType,
pub app_dir: Option<&'static str>,
pub rscript: &'static OsStr,
pub quarto: &'static OsStr,
pub workdir: &'static Path,
pub addr: SocketAddr,
pub target: &'static str,
pub worker_id: usize,
pub is_online: &'static AtomicBool,
pub qmd: Option<&'static Path>,
}

impl WorkerConfig {
Expand Down Expand Up @@ -227,7 +221,7 @@ async fn check_if_online(addr: SocketAddr) -> bool {
const RECHECK_INTERVAL: Duration = Duration::from_millis(250);

pub struct WorkerChild {
handle: JoinHandle<FaucetResult<()>>,
_handle: JoinHandle<FaucetResult<()>>,
stopper: tokio::sync::mpsc::Sender<()>,
}

Expand Down Expand Up @@ -275,7 +269,10 @@ fn spawn_worker_task(config: WorkerConfig) -> WorkerChild {
log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status, target = config.target);
}
});
WorkerChild { handle, stopper }
WorkerChild {
_handle: handle,
stopper,
}
}

impl Worker {
Expand All @@ -285,7 +282,7 @@ impl Worker {
}
}

pub(crate) struct Workers {
pub struct Workers {
pub workers: Box<[Worker]>,
}

Expand Down
48 changes: 12 additions & 36 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,36 +206,28 @@ mod tests {
.body(())
.unwrap_err();

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_io_error() {
let err = std::io::Error::new(std::io::ErrorKind::Other, "test");

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_error() {
let err = deadpool::managed::PoolError::Backend(FaucetError::unknown("test"));

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_build_error() {
let err = deadpool::managed::BuildError::NoRuntimeSpecified;

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
Expand All @@ -244,18 +236,14 @@ mod tests {
deadpool::managed::TimeoutType::Create,
);

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_closed_error() {
let err = deadpool::managed::PoolError::<FaucetError>::Closed;

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
Expand All @@ -264,50 +252,38 @@ mod tests {
deadpool::managed::HookError::message("test"),
);

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_no_runtime_specified_error() {
let err = deadpool::managed::PoolError::<FaucetError>::NoRuntimeSpecified;

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_hyper_invalid_header_value_error() {
let err = hyper::header::HeaderValue::from_bytes([0x00].as_ref()).unwrap_err();

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_addr_parse_error() {
let err = "INVALID".parse::<std::net::SocketAddr>().unwrap_err();

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_displat_missing_header() {
let err = FaucetError::BadRequest(BadRequestReason::MissingHeader("test"));
format!("{:?}", err);
format!("{}", err);
let _err = FaucetError::BadRequest(BadRequestReason::MissingHeader("test"));
}

#[test]
fn test_faucet_error_displat_invalid_header() {
let err = FaucetError::BadRequest(BadRequestReason::InvalidHeader("test"));
format!("{:?}", err);
format!("{}", err);
let _err = FaucetError::BadRequest(BadRequestReason::InvalidHeader("test"));
}

#[test]
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod error;
pub mod global_conn;
pub(crate) mod networking;
pub mod server;
#[cfg(unix)]
pub mod shutdown;

macro_rules! leak {
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub async fn main() -> FaucetResult<()> {

let signal = match cli_args.shutdown {
Shutdown::Immediate => shutdown::immediate(),
#[cfg(unix)]
Shutdown::Graceful => shutdown::graceful(),
};

Expand Down
40 changes: 23 additions & 17 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,28 +216,34 @@ impl FaucetServerConfig {
log::info!(target: "faucet", "Listening on http://{}", bind);
let main_loop = || async {
loop {
let (tcp, client_addr) = listener.accept().await?;
let tcp = TokioIo::new(tcp);
log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);
match listener.accept().await {
Err(e) => {
log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
return;
}
Ok((tcp, client_addr)) => {
let tcp = TokioIo::new(tcp);
log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);

tokio::task::spawn(async move {
let mut conn = http1::Builder::new()
.serve_connection(
tcp,
service_fn(|req: Request<Incoming>| {
service.call(req, Some(client_addr.ip()))
}),
)
.with_upgrades();
tokio::task::spawn(async move {
let mut conn = http1::Builder::new()
.serve_connection(
tcp,
service_fn(|req: Request<Incoming>| {
service.call(req, Some(client_addr.ip()))
}),
)
.with_upgrades();

let conn = pin!(&mut conn);
let conn = pin!(&mut conn);

if let Err(e) = conn.await {
log::error!(target: "faucet", "Connection error: {}", e);
if let Err(e) = conn.await {
log::error!(target: "faucet", "Connection error: {}", e);
}
});
}
});
};
}
FaucetResult::Ok(())
};

// Race the shutdown vs the main loop
Expand Down
46 changes: 26 additions & 20 deletions src/server/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,28 +171,34 @@ impl RouterConfig {
log::info!(target: "faucet", "Listening on http://{}", addr);
let main_loop = || async {
loop {
let (tcp, client_addr) = listener.accept().await?;
let tcp = TokioIo::new(tcp);
log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);

tokio::task::spawn(async move {
let mut conn = http1::Builder::new()
.serve_connection(
tcp,
service_fn(|req: Request<Incoming>| {
service.call(req, Some(client_addr.ip()))
}),
)
.with_upgrades();

let conn = pin!(&mut conn);

if let Err(e) = conn.await {
log::error!(target: "faucet", "Connection error: {}", e);
match listener.accept().await {
Err(e) => {
log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
return;
}
});
Ok((tcp, client_addr)) => {
let tcp = TokioIo::new(tcp);
log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);

tokio::task::spawn(async move {
let mut conn = http1::Builder::new()
.serve_connection(
tcp,
service_fn(|req: Request<Incoming>| {
service.call(req, Some(client_addr.ip()))
}),
)
.with_upgrades();

let conn = pin!(&mut conn);

if let Err(e) = conn.await {
log::error!(target: "faucet", "Connection error: {}", e);
}
});
}
}
}
FaucetResult::Ok(())
};

// Race the shutdown vs the main loop
Expand Down
Loading

0 comments on commit 61d822a

Please sign in to comment.