Skip to content

Commit

Permalink
feat: Allows saving of HTTP events into PostgreSQL (#150)
Browse files Browse the repository at this point in the history
* feat: Allows saving of HTTP events into PostgreSQL

* chore: Changes fields in postgresql schema

* chore: Adds better plumber example

* chore: Adds UUID to http events table

* chore: Adds optional version to the namespace
  • Loading branch information
andyquinterom authored Oct 30, 2024
1 parent 56f4430 commit b215aef
Show file tree
Hide file tree
Showing 18 changed files with 1,534 additions and 133 deletions.
850 changes: 850 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ fxhash = "0.2.1"
toml = "0.8.19"
nix = "0.29.0"
ctrlc = { version = "3.4.5", features = ["termination"] }
tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4", "with-uuid-1"] }
deadpool-postgres = { version = "0.14.0", features = ["rt_tokio_1"] }
bytes = "1.8.0"
tokio-postgres-rustls = "0.13.0"
rustls = "0.23.15"
chrono = "0.4.38"
uuid = { version = "1.11.0", features = ["v7"] }

[dev-dependencies]
rand = "0.8.5"
55 changes: 55 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,61 @@ git clone https://github.com/ixpantia/faucet.git
cargo install --path .
```

## HTTP Telemetry

faucet now offers the option of saving HTTP events to a PostgreSQL database.
This can be very helpful for tracking latency, total API calls and other
important information.

In order to use this feature you will need a PostgreSQL database with a table
called `faucet_http_events`. You can create the table using this table with
the following SQL query:

```sql
CREATE TABLE faucet_http_events (
request_uuid UUID,
namespace TEXT,
version TEXT,
target TEXT,
worker_route TEXT,
worker_id INT,
ip_addr INET,
method TEXT,
path TEXT,
query_params TEXT,
http_version TEXT,
status SMALLINT,
user_agent TEXT,
elapsed BIGINT,
time TIMESTAMPTZ
);
```

### Connection Strings

In order to connect to the database you will need to pass the
`FAUCET_TELEMETRY_POSTGRES_STRING` environment variable or the
`--pg-con-string` CLI argument.

This should include either a connection string or a URL with the `postgres://`
protocol.

#### Example connection strings

```sh
FAUCET_TELEMETRY_POSTGRES_STRING="host=localhost user=postgres connect_timeout=10 keepalives=0"
FAUCET_TELEMETRY_POSTGRES_STRING="host=/var/lib/postgresql,localhost port=1234 user=postgres password='password with spaces'"
FAUCET_TELEMETRY_POSTGRES_STRING="postgresql://user@localhost"
FAUCET_TELEMETRY_POSTGRES_STRING="postgresql://user:[email protected]/mydb?connect_timeout=10"
```

### Telemetry Namespaces

It is likely you want to track different services on the same database. You
can control the column `namespace` using the environment variable
`FAUCET_TELEMETRY_NAMESPACE` or cli argument `--telemetry-namespace`. By
default, this value is `"faucet"`.

## Contributing

If you want to contribute to `faucet` please read the
Expand Down
47 changes: 47 additions & 0 deletions examples/shiny/app.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
library(shiny)
library(bslib)

# Define UI for app that draws a histogram ----
ui <- page_sidebar(
# App title ----
title = "Hello Shiny!",
# Sidebar panel for inputs ----
sidebar = sidebar(
# Input: Slider for the number of bins ----
sliderInput(
inputId = "bins",
label = "Number of bins:",
min = 1,
max = 50,
value = 30
)
),
# Output: Histogram ----
plotOutput(outputId = "distPlot")
)

# Define server logic required to draw a histogram ----
server <- function(input, output) {

# Histogram of the Old Faithful Geyser Data ----
# with requested number of bins
# This expression that generates a histogram is wrapped in a call
# to renderPlot to indicate that:
#
# 1. It is "reactive" and therefore should be automatically
# re-executed when inputs (input$bins) change
# 2. Its output type is a plot
output$distPlot <- renderPlot({

x <- faithful$waiting
bins <- seq(min(x), max(x), length.out = input$bins + 1)

hist(x, breaks = bins, col = "#007bc2", border = "white",
xlab = "Waiting time to next eruption (in mins)",
main = "Histogram of waiting times")

})

}

shinyApp(ui, server)
35 changes: 32 additions & 3 deletions examples/simple/plumber.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
#* @get /
function() {
"Hello, world!"
# plumber.R

#* @param n Seconds to sleep
#* @get /sleep
function(n = 1L) {
Sys.sleep(as.numeric(n))
}

#* Echo the parameter that was sent in
#* @param msg The message to echo back.
#* @get /echo
function(msg = "") {
list(msg = paste0("The message is: '", msg, "'"))
}

#* Plot out data from the iris dataset
#* @param spec If provided, filter the data to only this species (e.g. 'setosa')
#* @get /plot
#* @serializer png
function(spec) {
myData <- iris
title <- "All Species"

# Filter if the species was specified
if (!missing(spec)) {
title <- paste0("Only the '", spec, "' Species")
myData <- subset(iris, Species == spec)
}

plot(myData$Sepal.Length, myData$Petal.Length,
main = title, xlab = "Sepal Length", ylab = "Petal Length"
)
}
20 changes: 20 additions & 0 deletions postgres.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE TABLE faucet_http_events (
request_uuid UUID,
namespace TEXT,
version TEXT,
target TEXT,
worker_route TEXT,
worker_id INT,
ip_addr INET,
method TEXT,
path TEXT,
query_params TEXT,
http_version TEXT,
status SMALLINT,
user_agent TEXT,
elapsed BIGINT,
time TIMESTAMPTZ NOT NULL
);

-- For use in timescale
-- SELECT create_hypertable('faucet_http_events', by_range('time'));
12 changes: 12 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ pub struct Args {
/// The strategy for shutting down faucet
#[arg(long, env = "FAUCET_SHUTDOWN", default_value = "immediate")]
pub shutdown: Shutdown,

/// Connection string to a PostgreSQL database for saving HTTP events.
#[arg(long, env = "FAUCET_TELEMETRY_POSTGRES_STRING", default_value = None)]
pub pg_con_string: Option<String>,

/// Save HTTP events on PostgreSQL under a specific namespace.
#[arg(long, env = "FAUCET_TELEMETRY_NAMESPACE", default_value = "faucet")]
pub telemetry_namespace: String,

/// Represents the source code version of the service to run. This is useful for telemetry.
#[arg(long, env = "FAUCET_TELEMETRY_VERSION", default_value = None)]
pub telemetry_version: Option<String>,
}

impl StartArgs {
Expand Down
81 changes: 36 additions & 45 deletions src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use crate::{
leak,
networking::get_available_sockets,
server::FaucetServerConfig,
shutdown::ShutdownSignal,
};
use std::{
ffi::OsStr,
net::SocketAddr,
path::Path,
sync::atomic::{AtomicBool, AtomicU32, Ordering},
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use tokio::{process::Child, task::JoinHandle};
Expand Down Expand Up @@ -69,23 +70,20 @@ pub struct WorkerConfig {
pub addr: SocketAddr,
pub target: &'static str,
pub worker_id: usize,
pub worker_route: Option<&'static str>,
pub is_online: &'static AtomicBool,
pub qmd: Option<&'static Path>,
}

impl WorkerConfig {
fn new(
worker_id: usize,
addr: SocketAddr,
server_config: FaucetServerConfig,
target_prefix: &str,
) -> Self {
fn new(worker_id: usize, addr: SocketAddr, server_config: &FaucetServerConfig) -> Self {
Self {
addr,
worker_id,
is_online: leak!(AtomicBool::new(false)),
workdir: server_config.workdir,
target: leak!(format!("{}Worker::{}", target_prefix, worker_id)),
worker_route: server_config.route,
target: leak!(format!("Worker::{}", worker_id)),
app_dir: server_config.app_dir,
wtype: server_config.server_type,
rscript: server_config.rscript,
Expand All @@ -100,6 +98,7 @@ impl WorkerConfig {
is_online: leak!(AtomicBool::new(online)),
addr: addr.parse().unwrap(),
app_dir: None,
worker_route: None,
rscript: OsStr::new(""),
wtype: crate::client::worker::WorkerType::Shiny,
worker_id: 1,
Expand Down Expand Up @@ -228,39 +227,33 @@ const RECHECK_INTERVAL: Duration = Duration::from_millis(250);

pub struct WorkerChild {
handle: Option<JoinHandle<FaucetResult<()>>>,
stopper: tokio::sync::mpsc::Sender<()>,
}

impl WorkerChild {
pub async fn kill(&mut self) {
let _ = self.stopper.try_send(());
self.wait_until_done().await;
}
pub async fn wait_until_done(&mut self) {
if let Some(handle) = self.handle.take() {
let _ = handle.await;
}
}
}

fn spawn_worker_task(config: WorkerConfig) -> WorkerChild {
let (stopper, mut rx) = tokio::sync::mpsc::channel(1);
fn spawn_worker_task(config: WorkerConfig, shutdown: ShutdownSignal) -> WorkerChild {
let handle = tokio::spawn(async move {
let pid = AtomicU32::new(0);
loop {
let child_manage_closure = || async {
let mut child = config.spawn_process(config);
pid.store(
child.id().expect("Failed to get plumber worker PID"),
Ordering::SeqCst,
);
log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = config.addr.port(), target = config.target, pid = pid.load(Ordering::SeqCst));
'outer: loop {
let mut child = config.spawn_process(config);
let pid = child.id().expect("Failed to get plumber worker PID");

// We will run this loop asynchrnously on this same thread.
// We will use this to wait for either the stop signal
// or the child exiting
let child_loop = async {
log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = config.addr.port(), target = config.target);
loop {
// Try to connect to the socket
let check_status = check_if_online(config.addr).await;
// If it's online, we can break out of the loop and start serving connections
if check_status {
log::info!(target: "faucet", "{target} is online and ready to serve connections", target = config.target);
log::info!(target: "faucet", "{target} is online and ready to serve connections at {route}", target = config.target, route = config.worker_route.unwrap_or("/"));
config.is_online.store(check_status, Ordering::SeqCst);
break;
}
Expand All @@ -272,36 +265,36 @@ fn spawn_worker_task(config: WorkerConfig) -> WorkerChild {

tokio::time::sleep(RECHECK_INTERVAL).await;
}
child.wait().await
FaucetResult::Ok(child.wait().await?)
};

tokio::select! {
status = child_manage_closure() => {
// If we receive a stop signal that means we will stop the outer loop
// and kill the process
_ = shutdown.wait() => {
let _ = child.kill().await;
log::info!(target: "faucet", "{target}'s process ({pid}) killed", target = config.target);
break 'outer;
},
// If our child loop stops that means the process crashed. We will restart it
status = child_loop => {
config
.is_online
.store(false, std::sync::atomic::Ordering::SeqCst);
log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid.load(Ordering::SeqCst), status?, target = config.target);
},
_ = rx.recv() => break,
}
}
match pid.load(Ordering::SeqCst) {
0 => (), // If PID is 0 that means the process has not even started
pid => {
log::info!(target: "faucet", "{target}'s process ({pid}) killed", target = config.target)
log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status?, target = config.target);
continue 'outer;
}
}
}
FaucetResult::Ok(())
});
WorkerChild {
handle: Some(handle),
stopper,
}
}

impl Worker {
pub fn from_config(config: WorkerConfig) -> FaucetResult<Self> {
let child = spawn_worker_task(config);
pub fn from_config(config: WorkerConfig, shutdown: ShutdownSignal) -> FaucetResult<Self> {
let child = spawn_worker_task(config, shutdown);
Ok(Self { child, config })
}
}
Expand All @@ -313,15 +306,13 @@ pub struct Workers {
impl Workers {
pub(crate) async fn new(
server_config: FaucetServerConfig,
target_prefix: &str,
shutdown: ShutdownSignal,
) -> FaucetResult<Self> {
let workers = get_available_sockets(server_config.n_workers.get())
.await
.enumerate()
.map(|(id, socket_addr)| {
WorkerConfig::new(id + 1, socket_addr, server_config, target_prefix)
})
.map(Worker::from_config)
.map(|(id, socket_addr)| WorkerConfig::new(id + 1, socket_addr, &server_config))
.map(|config| Worker::from_config(config, shutdown.clone()))
.collect::<FaucetResult<Box<[Worker]>>>()?;
Ok(Self { workers })
}
Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ pub enum FaucetError {
BufferCapacity(tokio_tungstenite::tungstenite::error::CapacityError),
ProtocolViolation(tokio_tungstenite::tungstenite::error::ProtocolError),
WSWriteBufferFull(tokio_tungstenite::tungstenite::Message),
PostgreSQL(tokio_postgres::Error),
AttackAttempt,
}

impl From<tokio_postgres::Error> for FaucetError {
fn from(value: tokio_postgres::Error) -> Self {
Self::PostgreSQL(value)
}
}

impl From<tokio_tungstenite::tungstenite::Error> for FaucetError {
fn from(value: tokio_tungstenite::tungstenite::Error) -> Self {
use tokio_tungstenite::tungstenite::error::UrlError;
Expand Down Expand Up @@ -141,6 +148,7 @@ impl std::fmt::Display for FaucetError {
Self::Utf8Coding => write!(f, "Utf8 Coding error"),
Self::BufferCapacity(cap_err) => write!(f, "Buffer Capacity: {cap_err}"),
Self::WSWriteBufferFull(buf) => write!(f, "Web Socket Write buffer full, {buf}"),
Self::PostgreSQL(value) => write!(f, "PostgreSQL error: {value}"),
Self::BadRequest(r) => match r {
BadRequestReason::UnsupportedUrlScheme => {
write!(f, "UnsupportedUrlScheme use ws:// os wss://")
Expand Down
Loading

0 comments on commit b215aef

Please sign in to comment.