Skip to content

Commit

Permalink
chore: Waits for kill processes to exit
Browse files Browse the repository at this point in the history
  • Loading branch information
andyquinterom committed Oct 24, 2024
1 parent 61d822a commit 8429a4b
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 66 deletions.
88 changes: 56 additions & 32 deletions src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ use crate::{
networking::get_available_sockets,
server::FaucetServerConfig,
};
use std::{ffi::OsStr, net::SocketAddr, path::Path, sync::atomic::AtomicBool, time::Duration};
use std::{
ffi::OsStr,
net::SocketAddr,
path::Path,
sync::atomic::{AtomicBool, AtomicU32, Ordering},
time::Duration,
};
use tokio::{process::Child, task::JoinHandle};
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LinesCodec};
Expand Down Expand Up @@ -221,56 +227,74 @@ async fn check_if_online(addr: SocketAddr) -> bool {
const RECHECK_INTERVAL: Duration = Duration::from_millis(250);

pub struct WorkerChild {
_handle: JoinHandle<FaucetResult<()>>,
handle: Option<JoinHandle<FaucetResult<()>>>,
stopper: tokio::sync::mpsc::Sender<()>,
}

impl WorkerChild {
pub fn kill(&self) {
pub async fn kill(&mut self) {
let _ = self.stopper.try_send(());
self.wait_until_done().await;
}
pub async fn wait_until_done(&mut self) {
if let Some(handle) = self.handle.take() {
let _ = handle.await;
}
}
}

fn spawn_worker_task(config: WorkerConfig) -> WorkerChild {
let (stopper, mut rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::spawn(async move {
let pid = AtomicU32::new(0);
loop {
let mut child = config.spawn_process(config);
let pid = child.id().expect("Failed to get plumber worker PID");
log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = config.addr.port(), target = config.target);
loop {
// Try to connect to the socket
let check_status = check_if_online(config.addr).await;
// If it's online, we can break out of the loop and start serving connections
if check_status {
log::info!(target: "faucet", "{target} is online and ready to serve connections", target = config.target);
config
.is_online
.store(check_status, std::sync::atomic::Ordering::SeqCst);
break;
}
// If it's not online but the child process has exited, we should break out of the loop
// and restart the process
if child.try_wait()?.is_some() {
break;
let child_manage_closure = || async {
let mut child = config.spawn_process(config);
pid.store(
child.id().expect("Failed to get plumber worker PID"),
Ordering::SeqCst,
);
log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = config.addr.port(), target = config.target, pid = pid.load(Ordering::SeqCst));
loop {
// Try to connect to the socket
let check_status = check_if_online(config.addr).await;
// If it's online, we can break out of the loop and start serving connections
if check_status {
log::info!(target: "faucet", "{target} is online and ready to serve connections", target = config.target);
config.is_online.store(check_status, Ordering::SeqCst);
break;
}
// If it's not online but the child process has exited, we should break out of the loop
// and restart the process
if child.try_wait()?.is_some() {
break;
}

tokio::time::sleep(RECHECK_INTERVAL).await;
}

tokio::time::sleep(RECHECK_INTERVAL).await;
}
child.wait().await
};

tokio::select! {
_ = child.wait() => (),
_ = rx.recv() => return FaucetResult::Ok(()),
status = child_manage_closure() => {
config
.is_online
.store(false, std::sync::atomic::Ordering::SeqCst);
log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid.load(Ordering::SeqCst), status?, target = config.target);
},
_ = rx.recv() => break,
}
}
match pid.load(Ordering::SeqCst) {
0 => (), // If PID is 0 that means the process has not even started
pid => {
log::info!(target: "faucet", "{target}'s process ({pid}) killed", target = config.target)
}
let status = child.wait().await?;
config
.is_online
.store(false, std::sync::atomic::Ordering::SeqCst);
log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status, target = config.target);
}
FaucetResult::Ok(())
});
WorkerChild {
_handle: handle,
handle: Some(handle),
stopper,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum FaucetError {
InvalidHeaderValues(hyper::header::InvalidHeaderValue),
Http(hyper::http::Error),
MissingArgument(&'static str),
DuplicateRoute(&'static str),
DuplicateRoute(String),
Utf8Coding,
BufferCapacity(tokio_tungstenite::tungstenite::error::CapacityError),
ProtocolViolation(tokio_tungstenite::tungstenite::error::ProtocolError),
Expand Down
40 changes: 20 additions & 20 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ fn determine_strategy(server_type: WorkerType, strategy: Option<Strategy>) -> St
match server_type {
WorkerType::Plumber =>
strategy.unwrap_or_else(|| {
log::info!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
Strategy::RoundRobin
}),
WorkerType::Shiny | WorkerType::QuartoShiny => match strategy {
None => {
log::info!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
Strategy::IpHash
},
Some(Strategy::RoundRobin) => {
log::info!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
log::debug!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
Strategy::IpHash
},
Some(Strategy::IpHash) => Strategy::IpHash,
Expand Down Expand Up @@ -84,22 +84,22 @@ impl FaucetServerBuilder {
self
}
pub fn strategy(mut self, strategy: Option<Strategy>) -> Self {
log::info!(target: "faucet", "Using load balancing strategy: {:?}", strategy);
log::debug!(target: "faucet", "Using load balancing strategy: {:?}", strategy);
self.strategy = strategy;
self
}
pub fn bind(mut self, bind: SocketAddr) -> Self {
log::info!(target: "faucet", "Will bind to: {}", bind);
log::debug!(target: "faucet", "Will bind to: {}", bind);
self.bind = Some(bind);
self
}
pub fn extractor(mut self, extractor: load_balancing::IpExtractor) -> Self {
log::info!(target: "faucet", "Using IP extractor: {:?}", extractor);
log::debug!(target: "faucet", "Using IP extractor: {:?}", extractor);
self.extractor = Some(extractor);
self
}
pub fn workers(mut self, n: usize) -> Self {
log::info!(target: "faucet", "Will spawn {} workers", n);
log::debug!(target: "faucet", "Will spawn {} workers", n);
self.n_workers = match n.try_into() {
Ok(n) => Some(n),
Err(_) => {
Expand All @@ -110,22 +110,22 @@ impl FaucetServerBuilder {
self
}
pub fn server_type(mut self, server_type: WorkerType) -> Self {
log::info!(target: "faucet", "Using worker type: {:?}", server_type);
log::debug!(target: "faucet", "Using worker type: {:?}", server_type);
self.server_type = Some(server_type);
self
}
pub fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
log::info!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
log::debug!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
self.workdir = Some(workdir.as_ref().into());
self
}
pub fn rscript(mut self, rscript: impl AsRef<OsStr>) -> Self {
log::info!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
log::debug!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
self.rscript = Some(rscript.as_ref().into());
self
}
pub fn quarto(mut self, quarto: impl AsRef<OsStr>) -> Self {
log::info!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
log::debug!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
self.quarto = Some(quarto.as_ref().into());
self
}
Expand All @@ -140,27 +140,27 @@ impl FaucetServerBuilder {
let strategy = determine_strategy(server_type, self.strategy);
let bind = self.bind;
let n_workers = self.n_workers.unwrap_or_else(|| {
log::info!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
log::debug!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
num_cpus::get().try_into().expect("num_cpus::get() returned 0")
});
let workdir = self.workdir
.map(|wd| leak!(wd, Path))
.unwrap_or_else(|| {
log::info!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
log::debug!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
Path::new(".")
});
let rscript = self.rscript.map(|wd| leak!(wd, OsStr)).unwrap_or_else(|| {
log::info!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
log::debug!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
OsStr::new("Rscript")
});
let extractor = self.extractor.unwrap_or_else(|| {
log::info!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
log::debug!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
load_balancing::IpExtractor::ClientAddr
});
let app_dir = self.app_dir.map(|app_dir| leak!(app_dir, str));
let qmd = self.qmd.map(|qmd| leak!(qmd, Path));
let quarto = self.quarto.map(|qmd| leak!(qmd, OsStr)).unwrap_or_else(|| {
log::info!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
log::debug!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
OsStr::new("quarto")
});
Ok(FaucetServerConfig {
Expand Down Expand Up @@ -200,7 +200,7 @@ pub struct FaucetServerConfig {

impl FaucetServerConfig {
pub async fn run(self, shutdown: ShutdownSignal) -> FaucetResult<()> {
let workers = Workers::new(self, "").await?;
let mut workers = Workers::new(self, "").await?;
let targets = workers.get_workers_config();
let load_balancer = LoadBalancer::new(self.strategy, self.extractor, &targets)?;
let bind = self.bind.ok_or(FaucetError::MissingArgument("bind"))?;
Expand Down Expand Up @@ -253,10 +253,10 @@ impl FaucetServerConfig {
}

// Kill child process
workers.workers.iter().for_each(|w| {
for w in &mut workers.workers {
log::info!(target: w.config.target, "Killing child process");
w.child.kill()
});
w.child.kill().await;
}

FaucetResult::Ok(())
}
Expand Down
23 changes: 10 additions & 13 deletions src/server/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct RouterConfig {

#[derive(Copy, Clone)]
struct RouterService {
routes: &'static [&'static str],
routes: &'static [String],
clients: &'static [FaucetServerService],
}

Expand Down Expand Up @@ -99,7 +99,7 @@ impl Service<hyper::Request<Incoming>> for RouterService {
) -> Result<Self::Response, Self::Error> {
let mut client = None;
for i in 0..self.routes.len() {
let route = self.routes[i];
let route = &self.routes[i];
if let Some(new_uri) = strip_prefix(req.uri(), route) {
client = Some(&self.clients[i]);
*req.uri_mut() = new_uri;
Expand Down Expand Up @@ -128,11 +128,10 @@ impl RouterConfig {
let mut clients = Vec::with_capacity(self.route.len());
let mut routes_set = HashSet::with_capacity(self.route.len());
for route_conf in self.route.into_iter() {
let route: &'static str = route_conf.route.leak();
if !routes_set.insert(route) {
return Err(FaucetError::DuplicateRoute(route));
let route = route_conf.route.as_str();
if !routes_set.insert(route.to_owned()) {
return Err(FaucetError::DuplicateRoute(route_conf.route));
}
routes.push(route);
let (client, workers) = FaucetServerBuilder::new()
.workdir(route_conf.config.workdir)
.server_type(route_conf.config.server_type)
Expand All @@ -146,6 +145,7 @@ impl RouterConfig {
.build()?
.extract_service(&format!("[{route}]::"))
.await?;
routes.push(route_conf.route);
all_workers.push(workers);
clients.push(client);
}
Expand All @@ -165,7 +165,7 @@ impl RouterConfig {
addr: SocketAddr,
shutdown: ShutdownSignal,
) -> FaucetResult<()> {
let (service, all_workers) = self.into_service(rscript, quarto, ip_from).await?;
let (service, mut all_workers) = self.into_service(rscript, quarto, ip_from).await?;
// Bind to the port and listen for incoming TCP connections
let listener = TcpListener::bind(addr).await?;
log::info!(target: "faucet", "Listening on http://{}", addr);
Expand Down Expand Up @@ -208,12 +208,9 @@ impl RouterConfig {
}

// Kill child process
all_workers.iter().for_each(|workers| {
workers.workers.iter().for_each(|w| {
log::info!(target: w.config.target, "Killing child process");
w.child.kill();
});
});
for w in all_workers.iter_mut().flat_map(|ws| &mut ws.workers) {
w.child.kill().await;
}

FaucetResult::Ok(())
}
Expand Down

0 comments on commit 8429a4b

Please sign in to comment.