Skip to content

Commit

Permalink
feat: Takes care of child process cleaup
Browse files Browse the repository at this point in the history
  • Loading branch information
andyquinterom committed Oct 23, 2024
1 parent 0481893 commit e34116e
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 110 deletions.
30 changes: 25 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fxhash = "0.2.1"
toml = "0.8.19"
nix = "0.29.0"
signal-hook = "0.3.17"
ctrlc = { version = "3.4.5", features = ["termination"] }

[dev-dependencies]
rand = "0.8.5"
48 changes: 34 additions & 14 deletions src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ use crate::{
networking::get_available_sockets,
server::FaucetServerConfig,
};
use std::{ffi::OsStr, net::SocketAddr, path::Path, sync::atomic::AtomicBool, time::Duration};
use tokio::{process::Child, task::JoinHandle};
use std::{
ffi::OsStr,
net::SocketAddr,
path::Path,
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use tokio::{process::Child, sync::Mutex, task::JoinHandle};
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LinesCodec};

Expand Down Expand Up @@ -206,11 +212,11 @@ impl WorkerConfig {
}
}

struct Worker {
pub struct Worker {
/// Whether the worker should be stopped
_worker_task: JoinHandle<FaucetResult<()>>,
pub child: WorkerChild,
/// The address of the worker's socket.
config: WorkerConfig,
pub config: WorkerConfig,
}

async fn check_if_online(addr: SocketAddr) -> bool {
Expand All @@ -220,8 +226,20 @@ async fn check_if_online(addr: SocketAddr) -> bool {

const RECHECK_INTERVAL: Duration = Duration::from_millis(250);

fn spawn_worker_task(config: WorkerConfig) -> JoinHandle<FaucetResult<()>> {
tokio::spawn(async move {
pub struct WorkerChild {
handle: JoinHandle<FaucetResult<()>>,
stopper: tokio::sync::mpsc::Sender<()>,
}

impl WorkerChild {
pub fn kill(&self) {
let _ = self.stopper.try_send(());
}
}

fn spawn_worker_task(config: WorkerConfig) -> WorkerChild {
let (stopper, mut rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::spawn(async move {
loop {
let mut child = config.spawn_process(config);
let pid = child.id().expect("Failed to get plumber worker PID");
Expand All @@ -246,27 +264,29 @@ fn spawn_worker_task(config: WorkerConfig) -> JoinHandle<FaucetResult<()>> {
tokio::time::sleep(RECHECK_INTERVAL).await;
}

tokio::select! {
_ = child.wait() => (),
_ = rx.recv() => return FaucetResult::Ok(()),
}
let status = child.wait().await?;
config
.is_online
.store(false, std::sync::atomic::Ordering::SeqCst);
log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status, target = config.target);
}
})
});
WorkerChild { handle, stopper }
}

impl Worker {
pub fn from_config(config: WorkerConfig) -> FaucetResult<Self> {
let worker_task = spawn_worker_task(config);
Ok(Self {
_worker_task: worker_task,
config,
})
let child = spawn_worker_task(config);
Ok(Self { child, config })
}
}

pub(crate) struct Workers {
workers: Box<[Worker]>,
pub workers: Box<[Worker]>,
}

impl Workers {
Expand Down
13 changes: 6 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ use faucet_server::cli::{Args, Commands};
use faucet_server::error::FaucetResult;
use faucet_server::server::logger::build_logger;
use faucet_server::server::{FaucetServerBuilder, RouterConfig};

#[cfg(unix)]
use faucet_server::{cli::Shutdown, shutdown};

#[tokio::main]
pub async fn main() -> FaucetResult<()> {
let cli_args = Args::parse();

#[cfg(unix)]
match cli_args.shutdown {
Shutdown::Graceful => shutdown::graceful(),
let signal = match cli_args.shutdown {
Shutdown::Immediate => shutdown::immediate(),
}
#[cfg(unix)]
Shutdown::Graceful => shutdown::graceful(),
};

match cli_args.command {
Commands::Start(start_args) => {
Expand All @@ -42,7 +40,7 @@ pub async fn main() -> FaucetResult<()> {
.quarto(start_args.quarto)
.qmd(start_args.qmd)
.build()?
.run()
.run(signal)
.await?;
}
Commands::Router(router_args) => {
Expand All @@ -64,6 +62,7 @@ pub async fn main() -> FaucetResult<()> {
router_args.quarto,
router_args.ip_from.into(),
router_args.host.parse()?,
signal,
)
.await?;
}
Expand Down
65 changes: 42 additions & 23 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
},
error::{FaucetError, FaucetResult},
leak,
shutdown::ShutdownSignal,
};
use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request};
use hyper_util::rt::TokioIo;
Expand Down Expand Up @@ -198,7 +199,7 @@ pub struct FaucetServerConfig {
}

impl FaucetServerConfig {
pub async fn run(self) -> FaucetResult<()> {
pub async fn run(self, shutdown: ShutdownSignal) -> FaucetResult<()> {
let workers = Workers::new(self, "").await?;
let targets = workers.get_workers_config();
let load_balancer = LoadBalancer::new(self.strategy, self.extractor, &targets)?;
Expand All @@ -213,41 +214,59 @@ impl FaucetServerConfig {
// Bind to the port and listen for incoming TCP connections
let listener = TcpListener::bind(bind).await?;
log::info!(target: "faucet", "Listening on http://{}", bind);
loop {
let (tcp, client_addr) = listener.accept().await?;
let tcp = TokioIo::new(tcp);
log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);
let main_loop = || async {
loop {
let (tcp, client_addr) = listener.accept().await?;
let tcp = TokioIo::new(tcp);
log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);

tokio::task::spawn(async move {
let mut conn = http1::Builder::new()
.serve_connection(
tcp,
service_fn(|req: Request<Incoming>| {
service.call(req, Some(client_addr.ip()))
}),
)
.with_upgrades();
tokio::task::spawn(async move {
let mut conn = http1::Builder::new()
.serve_connection(
tcp,
service_fn(|req: Request<Incoming>| {
service.call(req, Some(client_addr.ip()))
}),
)
.with_upgrades();

let conn = pin!(&mut conn);
let conn = pin!(&mut conn);

if let Err(e) = conn.await {
log::error!(target: "faucet", "Connection error: {}", e);
}
});
if let Err(e) = conn.await {
log::error!(target: "faucet", "Connection error: {}", e);
}
});
}
FaucetResult::Ok(())
};

// Race the shutdown vs the main loop
tokio::select! {
_ = shutdown.wait() => (),
_ = main_loop() => (),
}

// Kill child process
workers.workers.iter().for_each(|w| {
log::info!(target: w.config.target, "Killing child process");
w.child.kill()
});

FaucetResult::Ok(())
}
pub async fn extract_service(self, prefix: &str) -> FaucetResult<FaucetServerService> {
pub async fn extract_service(
self,
prefix: &str,
) -> FaucetResult<(FaucetServerService, Workers)> {
let workers = Workers::new(self, prefix).await?;
let targets = workers.get_workers_config();
let load_balancer = LoadBalancer::new(self.strategy, self.extractor, &targets)?;

let load_balancer = load_balancer.clone();
let service: &'static _ = leak!(ServiceBuilder::new(ProxyService)
.layer(logging::LogLayer)
.layer(AddStateLayer::new(load_balancer))
.build());

Ok(FaucetServerService { inner: service })
Ok((FaucetServerService { inner: service }, workers))
}
}

Expand Down
Loading

0 comments on commit e34116e

Please sign in to comment.