Skip to content

Commit

Permalink
split the connection reservoir logic off from the main file
Browse files Browse the repository at this point in the history
  • Loading branch information
Lev Walkin committed Jan 28, 2016
1 parent 218b2b3 commit 94edfd8
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 281 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ tcpkali_SOURCES = \
tcpkali_logging.c tcpkali_logging.h \
tcpkali_traffic_stats.h \
tcpkali_common.h tcpkali_rate.h \
tcpkali_statsd.c tcpkali_statsd.h \
tcpkali_run.c tcpkali_run.h \
tcpkali.c tcpkali.h
tcpkali_LDFLAGS = -L$(libdir) $(LIBUV)
tcpkali_LDADD = $(top_builddir)/deps/libev/libev.la \
Expand Down
282 changes: 2 additions & 280 deletions src/tcpkali.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@
#include <statsd.h>

#include "tcpkali.h"
#include "tcpkali_run.h"
#include "tcpkali_mavg.h"
#include "tcpkali_data.h"
#include "tcpkali_pacefier.h"
#include "tcpkali_events.h"
#include "tcpkali_signals.h"
#include "tcpkali_terminfo.h"
#include "tcpkali_websocket.h"
#include "tcpkali_transport.h"
#include "tcpkali_syslimits.h"
#include "tcpkali_terminfo.h"
#include "tcpkali_logging.h"

/*
Expand Down Expand Up @@ -129,28 +129,6 @@ static struct tcpkali_config {
.statsd_port = 8125,
.statsd_namespace = "tcpkali"};

struct stats_checkpoint {
double epoch_start; /* Start of current checkpoint epoch */
double last_update; /* Last we updated the checkpoint structure */
non_atomic_traffic_stats initial_traffic_stats; /* Ramp-up phase traffic */
non_atomic_traffic_stats last_traffic_stats;
};

enum work_phase { PHASE_ESTABLISHING_CONNECTIONS, PHASE_STEADY_STATE };

/*
* What we are sending to statsd?
*/
typedef struct {
size_t opened;
size_t conns_in;
size_t conns_out;
size_t bps_in;
size_t bps_out;
non_atomic_traffic_stats traffic_delta;
struct latency_snapshot *latency;
} statsd_feedback;

/*
* Bunch of utility functions defined at the end of this file.
*/
Expand All @@ -164,12 +142,6 @@ static double parse_with_multipliers(const char *, char *str,
struct multiplier *, int n);
static int parse_array_of_doubles(const char *option, char *str,
struct array_of_doubles *array);
static int open_connections_until_maxed_out(
struct engine *eng, double connect_rate, int max_connections,
double epoch_end, struct stats_checkpoint *, mavg traffic_mavgs[2],
Statsd *statsd, int *term_flag, enum work_phase phase, int print_stats);
static void print_connections_line(int conns, int max_conns, int conns_counter);
static void report_to_statsd(Statsd *statsd, statsd_feedback *opt);

/* clang-format off */
static struct multiplier km_multiplier[] = { { "k", 1000 }, { "m", 1000000 } };
Expand Down Expand Up @@ -825,256 +797,6 @@ main(int argc, char **argv) {
return 0;
}

static const char *
time_progress(double start, double now, double stop) {
const char *clocks[] = {"🕛 ", "🕐 ", "🕑 ", "🕒 ", "🕓 ", "🕔 ",
"🕕 ", "🕖 ", "🕗 ", "🕘 ", "🕙 ", "🕚 "};
if(!tcpkali_is_utf8()) return "";
double span = (stop - start) / (sizeof(clocks) / sizeof(clocks[0]));
int pos = (now - start) / span;
if(pos < 0)
pos = 0;
else if(pos > 11)
pos = 11;
return clocks[pos];
}

static void
format_latencies(char *buf, size_t size, struct latency_snapshot *latency) {
if(latency->connect_histogram || latency->firstbyte_histogram
|| latency->marker_histogram) {
char *p = buf;
p += snprintf(p, size, " (");
if(latency->connect_histogram)
p += snprintf(
p, size - (p - buf), "c=%.1f ",
hdr_value_at_percentile(latency->connect_histogram, 95.0)
/ 10.0);
if(latency->firstbyte_histogram)
p += snprintf(
p, size - (p - buf), "fb=%.1f ",
hdr_value_at_percentile(latency->firstbyte_histogram, 95.0)
/ 10.0);
if(latency->marker_histogram)
p += snprintf(
p, size - (p - buf), "m=%.1f ",
hdr_value_at_percentile(latency->marker_histogram, 95.0)
/ 10.0);
snprintf(p, size - (p - buf), "ms⁹⁵ᵖ)");
} else {
buf[0] = '\0';
}
}

static int
open_connections_until_maxed_out(struct engine *eng, double connect_rate,
int max_connections, double epoch_end,
struct stats_checkpoint *checkpoint,
mavg traffic_mavgs[2], Statsd *statsd,
int *term_flag, enum work_phase phase,
int print_stats) {
tk_now_update(TK_DEFAULT);
double now = tk_now(TK_DEFAULT);

/*
* It is a little bit better to batch the starts by issuing several
* start commands per small time tick. Ends up doing less write()
* operations per batch.
* Therefore, we round the timeout_us upwards to the nearest millisecond.
*/
long timeout_us = 1000 * ceil(1000.0 / connect_rate);
if(timeout_us > 250000) timeout_us = 250000;

struct pacefier keepup_pace;
pacefier_init(&keepup_pace, now);

ssize_t conn_deficit = 1; /* Assume connections still have to be est. */

while(now < epoch_end && !*term_flag
/* ...until we have all connections established or
* we're in a steady state. */
&& (phase == PHASE_STEADY_STATE || conn_deficit > 0)) {
usleep(timeout_us);
tk_now_update(TK_DEFAULT);
now = tk_now(TK_DEFAULT);
int update_stats = (now - checkpoint->last_update) >= 0.25;

size_t connecting, conns_in, conns_out, conns_counter;
engine_get_connection_stats(eng, &connecting, &conns_in, &conns_out,
&conns_counter);
conn_deficit = max_connections - (connecting + conns_out);

size_t allowed = pacefier_allow(&keepup_pace, connect_rate, now);
size_t to_start = allowed;
if(conn_deficit <= 0) {
to_start = 0;
}
if(to_start > (size_t)conn_deficit) {
to_start = conn_deficit;
}
engine_initiate_new_connections(eng, to_start);
pacefier_moved(&keepup_pace, connect_rate, allowed, now);

/* Do not update/print checkpoint stats too often. */
if(update_stats) {
checkpoint->last_update = now;
/* Fall through and do the chekpoint update. */
} else {
continue;
}

/*
* traffic_delta.* contains traffic observed within the last
* period (now - checkpoint->last_stats_sent).
*/
non_atomic_traffic_stats _last = checkpoint->last_traffic_stats;
checkpoint->last_traffic_stats = engine_traffic(eng);
non_atomic_traffic_stats traffic_delta =
subtract_traffic_stats(checkpoint->last_traffic_stats, _last);

mavg_bump(&traffic_mavgs[0], now, (double)traffic_delta.bytes_rcvd);
mavg_bump(&traffic_mavgs[1], now, (double)traffic_delta.bytes_sent);

double bps_in = 8 * mavg_per_second(&traffic_mavgs[0], now);
double bps_out = 8 * mavg_per_second(&traffic_mavgs[1], now);

engine_prepare_latency_snapshot(eng);
struct latency_snapshot *latency = engine_collect_latency_snapshot(eng);
report_to_statsd(statsd,
&(statsd_feedback){.opened = to_start,
.conns_in = conns_in,
.conns_out = conns_out,
.bps_in = bps_in,
.bps_out = bps_out,
.traffic_delta = traffic_delta,
.latency = latency});

if(print_stats) {
if(phase == PHASE_ESTABLISHING_CONNECTIONS) {
print_connections_line(conns_out, max_connections,
conns_counter);
} else {
char latency_buf[256];
format_latencies(latency_buf, sizeof(latency_buf), latency);

fprintf(stderr,
"%sTraffic %.3f↓, %.3f↑ Mbps "
"(conns %ld↓ %ld↑ %ld⇡; seen %ld)%s%s\r",
time_progress(checkpoint->epoch_start, now, epoch_end),
bps_in / 1000000.0, bps_out / 1000000.0, (long)conns_in,
(long)conns_out, (long)connecting, (long)conns_counter,
latency_buf, tcpkali_clear_eol());
}
}

engine_free_latency_snapshot(latency);
}

return (now >= epoch_end || *term_flag) ? -1 : 0;
}

static void
report_to_statsd(Statsd *statsd, statsd_feedback *sf) {
static statsd_feedback empty_feedback;

if(!statsd) return;
if(!sf) sf = &empty_feedback;

statsd_resetBatch(statsd);

#define SBATCH(t, str, value) \
do { \
int ret = statsd_addToBatch(statsd, t, str, value, 1); \
if(ret == STATSD_BATCH_FULL) { \
statsd_sendBatch(statsd); \
ret = statsd_addToBatch(statsd, t, str, value, 1); \
} \
assert(ret == STATSD_SUCCESS); \
} while(0)

SBATCH(STATSD_COUNT, "connections.opened", sf->opened);
SBATCH(STATSD_GAUGE, "connections.total", sf->conns_in + sf->conns_out);
SBATCH(STATSD_GAUGE, "connections.total.in", sf->conns_in);
SBATCH(STATSD_GAUGE, "connections.total.out", sf->conns_out);
SBATCH(STATSD_GAUGE, "traffic.bitrate", sf->bps_in + sf->bps_out);
SBATCH(STATSD_GAUGE, "traffic.bitrate.in", sf->bps_in);
SBATCH(STATSD_GAUGE, "traffic.bitrate.out", sf->bps_out);
SBATCH(STATSD_COUNT, "traffic.data",
sf->traffic_delta.bytes_rcvd + sf->traffic_delta.bytes_sent);
SBATCH(STATSD_COUNT, "traffic.data.rcvd", sf->traffic_delta.bytes_rcvd);
SBATCH(STATSD_COUNT, "traffic.data.sent", sf->traffic_delta.bytes_sent);
SBATCH(STATSD_COUNT, "traffic.data.reads", sf->traffic_delta.num_reads);
SBATCH(STATSD_COUNT, "traffic.data.writes", sf->traffic_delta.num_writes);

if(sf->latency->marker_histogram || sf == &empty_feedback) {
struct {
unsigned p50;
unsigned p95;
unsigned p99;
unsigned p99_5;
unsigned mean;
unsigned max;
} lat;

if(sf->latency->marker_histogram) {
struct hdr_histogram *hist = sf->latency->marker_histogram;
lat.p50 = hdr_value_at_percentile(hist, 50.0) / 10.0;
lat.p95 = hdr_value_at_percentile(hist, 95.0) / 10.0;
lat.p99 = hdr_value_at_percentile(hist, 99.0) / 10.0;
lat.p99_5 = hdr_value_at_percentile(hist, 99.5) / 10.0;
lat.mean = hdr_mean(hist) / 10.0;
lat.max = hdr_max(hist) / 10.0;
assert(lat.p95 < 1000000);
assert(lat.mean < 1000000);
assert(lat.max < 1000000);
} else {
memset(&lat, 0, sizeof(lat));
}

SBATCH(STATSD_GAUGE, "latency.mean", lat.mean);
SBATCH(STATSD_GAUGE, "latency.50", lat.p50);
SBATCH(STATSD_GAUGE, "latency.95", lat.p95);
SBATCH(STATSD_GAUGE, "latency.99", lat.p99);
SBATCH(STATSD_GAUGE, "latency.99.5", lat.p99_5);
SBATCH(STATSD_GAUGE, "latency.max", lat.max);
}

statsd_sendBatch(statsd);
}

static void
print_connections_line(int conns, int max_conns, int conns_counter) {
int terminal_width = tcpkali_terminal_width();

char info[terminal_width + 1];
ssize_t info_width = snprintf(info, sizeof(info), "| %d of %d (%d)", conns,
max_conns, conns_counter);

int ribbon_width = terminal_width - info_width - 1;
if(ribbon_width > 0.6 * terminal_width) ribbon_width = 0.6 * terminal_width;
if(ribbon_width > 50) ribbon_width = 50;

if(info_width > terminal_width || ribbon_width < 5) {
/* Can't fit stuff on the screen, make dumb print-outs */
printf("| %d of %d (%d)\n", conns, max_conns, conns_counter);
return;
}

char ribbon[ribbon_width + 1];
ribbon[0] = '|';
int at = 1 + ((ribbon_width - 2) * conns) / max_conns;
for(int i = 1; i < ribbon_width; i++) {
if(i < at)
ribbon[i] = '=';
else if(i > at)
ribbon[i] = '-';
else if(i == at)
ribbon[i] = '>';
}
ribbon[ribbon_width] = 0;
fprintf(stderr, "%s%s%s\r", ribbon, info, tcpkali_clear_eol());
}

static double
parse_with_multipliers(const char *option, char *str, struct multiplier *ms,
int n) {
Expand Down
1 change: 0 additions & 1 deletion src/tcpkali.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#define TCPKALI_H

#include "tcpkali_common.h"
#include "tcpkali_dns.h"
#include "tcpkali_engine.h"

#endif /* TCPKALI_H */
1 change: 1 addition & 0 deletions src/tcpkali_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "tcpkali_atomic.h"
#include "tcpkali_rate.h"
#include "tcpkali_expr.h"
#include "tcpkali_dns.h"

long number_of_cpus();

Expand Down
Loading

0 comments on commit 94edfd8

Please sign in to comment.