Skip to content

Commit

Permalink
window based latency measurement
Browse files Browse the repository at this point in the history
  • Loading branch information
lwalkin committed Sep 21, 2016
1 parent ea3f415 commit d553055
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 110 deletions.
1 change: 1 addition & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
0.9.*:
* Export --latency-connect and --latency-first-bytes to statsd.
* --latency-percentiles now affect --statsd reporting as well.
* --statsd-latency-window alternative to end-to-end latency measurements.

0.9: 2016-Aug-24
* Added -r@<Latency> form to measure message rate at a given latency.
Expand Down
23 changes: 23 additions & 0 deletions deps/HdrHistogram/hdr_histogram.c
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,29 @@ int64_t hdr_add(struct hdr_histogram* h, struct hdr_histogram* from)
return dropped;
}

struct hdr_histogram *hdr_diff(struct hdr_histogram *base, struct hdr_histogram *update) {
struct hdr_histogram *h;

if(hdr_init(base->lowest_trackable_value, base->highest_trackable_value,
base->significant_figures, &h) != 0) {
return NULL;
} else {
assert(update->lowest_trackable_value == base->lowest_trackable_value);
assert(update->highest_trackable_value == base->highest_trackable_value);
assert(update->significant_figures == base->significant_figures);

struct hdr_iter iter;
int32_t ct;
for(int32_t ct = 0; ct < update->counts_len; ct++) {
h->counts[ct] = update->counts[ct] - base->counts[ct];
}
h->total_count = update->total_count - base->total_count;
hdr_reset_internal_counters(h); /* Re-sets min and max */

return h;
}
}


// ## ## ### ## ## ## ######## ######
// ## ## ## ## ## ## ## ## ## ##
Expand Down
5 changes: 5 additions & 0 deletions deps/HdrHistogram/hdr_histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ bool hdr_record_corrected_value(struct hdr_histogram* h, int64_t value, int64_t
*/
int64_t hdr_add(struct hdr_histogram* h, struct hdr_histogram* from);

/*
* Create a new histogram representing the delta between 'base' and 'update'.
*/
struct hdr_histogram *hdr_diff(struct hdr_histogram *base, struct hdr_histogram *update);

int64_t hdr_min(struct hdr_histogram* h);
int64_t hdr_max(struct hdr_histogram* h);
int64_t hdr_value_at_percentile(struct hdr_histogram* h, double percentile);
Expand Down
71 changes: 49 additions & 22 deletions src/tcpkali.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ static struct option cli_long_options[] = {
{"statsd-host", 1, 0, CLI_STATSD_OFFSET + 'h'},
{"statsd-port", 1, 0, CLI_STATSD_OFFSET + 'p'},
{"statsd-namespace", 1, 0, CLI_STATSD_OFFSET + 'n'},
{"statsd-latency-window", 1, 0, CLI_STATSD_OFFSET + 'w'},
{"unescape-message-args", 0, 0, 'e'},
{"version", 0, 0, 'V'},
{"verbose", 1, 0, CLI_VERBOSE_OFFSET + 'v'},
Expand All @@ -115,6 +116,7 @@ static struct tcpkali_config {
int max_connections;
double connect_rate; /* New connects per second. */
double test_duration; /* Seconds for the full test. */
double latency_window; /* Seconds */
int statsd_enable;
char *statsd_host;
int statsd_port;
Expand Down Expand Up @@ -466,6 +468,15 @@ main(int argc, char **argv) {
exit(EX_USAGE);
}
break;
case CLI_STATSD_OFFSET + 'w':
conf.latency_window = parse_with_multipliers(
option, optarg, s_multiplier,
sizeof(s_multiplier) / sizeof(s_multiplier[0]));
if(conf.latency_window <= 0) {
fprintf(stderr, "Expected positive --statsd-latency-window=%s\n", optarg);
exit(EX_USAGE);
}
break;
case 'l':
conf.listen_port = atoi(optarg);
if(conf.listen_port <= 0 || conf.listen_port >= 65535) {
Expand Down Expand Up @@ -596,6 +607,26 @@ main(int argc, char **argv) {
if(!engine_params.requested_workers)
engine_params.requested_workers = number_of_cpus();


/*
* Check that we'll have a chance to report latency
*/
if(conf.latency_window) {
if(conf.latency_window > conf.test_duration) {
fprintf(stderr, "--statsd-latency-window=%gs exceeds --duration=%gs.\n",
conf.latency_window, conf.test_duration);
exit(EX_USAGE);
}
if(conf.latency_window >= conf.test_duration / 2) {
warning("--statsd-latency-window=%gs might result in too few latency reports.\n", conf.latency_window);
}
if(conf.latency_window < 0.5) {
fprintf(stderr, "--statsd-latency-window=%gs is too small. Try 0.5s.\n",
conf.latency_window);
exit(EX_USAGE);
}
}

/*
* Check that the system environment is prepared to handle high load.
*/
Expand Down Expand Up @@ -816,33 +847,28 @@ main(int argc, char **argv) {

struct engine *eng = engine_start(engine_params);

/*
* Convert SIGINT into change of a flag.
* Has to be run after all other threads are run, otherwise
* a signal can be delivered to a wrong thread.
*/
sig_atomic_t term_flag = 0;
flagify_term_signals(&term_flag);

/*
* Traffic in/out moving average, smoothing period is 3 seconds.
*/
mavg traffic_mavgs[2];
mavg_init(&traffic_mavgs[0], tk_now(TK_DEFAULT), 3.0);
mavg_init(&traffic_mavgs[1], tk_now(TK_DEFAULT), 3.0);
struct stats_checkpoint checkpoint = {0, 0, {0, 0, 0, 0}, {0, 0, 0, 0}};
struct oc_args oc_args = {
.eng = eng,
.connect_rate = conf.connect_rate,
.max_connections = conf.max_connections,
.checkpoint = &checkpoint,
.traffic_mavgs = traffic_mavgs,
.connect_rate = conf.connect_rate,
.latency_window = conf.latency_window,
.statsd = statsd,
.term_flag = &term_flag,
.rate_modulator = &rate_modulator,
.latency_percentiles = &latency_percentiles,
.print_stats = print_stats
};
mavg_init(&oc_args.traffic_mavgs[0], tk_now(TK_DEFAULT), 3.0);
mavg_init(&oc_args.traffic_mavgs[1], tk_now(TK_DEFAULT), 3.0);

/*
* Convert SIGINT into change of a flag.
* Has to be run after all other threads are run, otherwise
* a signal can be delivered to a wrong thread.
*/
flagify_term_signals(&oc_args.term_flag);

/*
* Ramp up to the specified number of connections by opening them at a
Expand All @@ -851,7 +877,7 @@ main(int argc, char **argv) {
if(conf.max_connections) {
oc_args.epoch_end = tk_now(TK_DEFAULT) + conf.test_duration;
if(open_connections_until_maxed_out(PHASE_ESTABLISHING_CONNECTIONS,
oc_args) == OC_CONNECTED) {
&oc_args) == OC_CONNECTED) {
fprintf(stderr, "%s", tcpkali_clear_eol());
fprintf(stderr, "Ramped up to %d connections.\n",
conf.max_connections);
Expand All @@ -874,16 +900,16 @@ main(int argc, char **argv) {
* (initial_traffic_stats) contain traffic numbers accumulated duing
* ramp-up time.
*/
checkpoint.initial_traffic_stats = engine_traffic(eng);
checkpoint.epoch_start = tk_now(TK_DEFAULT);
oc_args.checkpoint.initial_traffic_stats = engine_traffic(eng);
oc_args.checkpoint.epoch_start = tk_now(TK_DEFAULT);

/* Reset the test duration after ramp-up. */
enum oc_return_value orv = open_connections_until_maxed_out(
PHASE_STEADY_STATE, oc_args);
PHASE_STEADY_STATE, &oc_args);

fprintf(stderr, "%s", tcpkali_clear_eol());
engine_terminate(eng, checkpoint.epoch_start,
checkpoint.initial_traffic_stats, &latency_percentiles);
engine_terminate(eng, oc_args.checkpoint.epoch_start,
oc_args.checkpoint.initial_traffic_stats, &latency_percentiles);

/* Send zeroes, otherwise graphs would continue showing non-zeroes... */
report_to_statsd(statsd, 0, requested_latency_types, &latency_percentiles);
Expand Down Expand Up @@ -1072,6 +1098,7 @@ usage_long(char *argv0, struct tcpkali_config *conf) {
" --statsd-host <host> StatsD host to send data (default is localhost)\n"
" --statsd-port <port> StatsD port to use (default is %d)\n"
" --statsd-namespace <string> Metric namespace (default is \"%s\")\n"
" --statsd-latency-window <T> Aggregate latencies in discrete windows\n"
"\n"
"Variable units and recognized multipliers:\n"
" <N>, <Rate>: k (1000, as in \"5k\" is 5000), m (1000000)\n"
Expand Down
9 changes: 9 additions & 0 deletions src/tcpkali_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ typedef enum {
SLT_MARKER = (1 << 2)
} statsd_report_latency_types;

/*
* Snapshot of the current latency.
*/
struct latency_snapshot {
struct hdr_histogram *connect_histogram;
struct hdr_histogram *firstbyte_histogram;
struct hdr_histogram *marker_histogram;
};

/*
* Array of doubles used in e.g. overriding reported latency percentiles.
*/
Expand Down
22 changes: 22 additions & 0 deletions src/tcpkali_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,28 @@ engine_collect_latency_snapshot(struct engine *eng) {
return latency;
}

struct latency_snapshot *
engine_diff_latency_snapshot(struct latency_snapshot *base, struct latency_snapshot *update) {

assert(base);
assert(update);

struct latency_snapshot *diff = calloc(1, sizeof(*diff));
assert(diff);

if(base->connect_histogram)
diff->connect_histogram =
hdr_diff(base->connect_histogram, update->connect_histogram);
if(base->firstbyte_histogram)
diff->firstbyte_histogram =
hdr_diff(base->firstbyte_histogram, update->firstbyte_histogram);
if(base->marker_histogram)
diff->marker_histogram =
hdr_diff(base->marker_histogram, update->marker_histogram);

return diff;
}

non_atomic_traffic_stats
engine_traffic(struct engine *eng) {
non_atomic_traffic_stats traffic = {0, 0, 0, 0};
Expand Down
8 changes: 2 additions & 6 deletions src/tcpkali_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,11 @@ void engine_get_connection_stats(struct engine *, size_t *connecting,
size_t *counter);

/*
* Snapshot of the current latency.
* Create snapshot of the current latency histogram.
*/
struct latency_snapshot {
struct hdr_histogram *connect_histogram;
struct hdr_histogram *firstbyte_histogram;
struct hdr_histogram *marker_histogram;
};
void engine_prepare_latency_snapshot(struct engine *);
struct latency_snapshot *engine_collect_latency_snapshot(struct engine *);
struct latency_snapshot *engine_diff_latency_snapshot(struct latency_snapshot *base, struct latency_snapshot *update);
void engine_free_latency_snapshot(struct latency_snapshot *);

size_t engine_initiate_new_connections(struct engine *, size_t n);
Expand Down
Loading

0 comments on commit d553055

Please sign in to comment.