Skip to content

Commit

Permalink
remove bias from moving average estimator
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Pyalling authored and lwalkin committed Jan 17, 2017
1 parent ffd2a4e commit 22d3ed3
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 72 deletions.
4 changes: 2 additions & 2 deletions src/tcpkali.c
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,8 @@ main(int argc, char **argv) {
.latency_percentiles = &latency_percentiles,
.print_stats = print_stats
};
mavg_init(&oc_args.traffic_mavgs[0], tk_now(TK_DEFAULT), 3.0);
mavg_init(&oc_args.traffic_mavgs[1], tk_now(TK_DEFAULT), 3.0);
mavg_init(&oc_args.traffic_mavgs[0], tk_now(TK_DEFAULT), 1.0 / 8, 3.0);
mavg_init(&oc_args.traffic_mavgs[1], tk_now(TK_DEFAULT), 1.0 / 8, 3.0);

/*
* Convert SIGINT into change of a flag.
Expand Down
144 changes: 83 additions & 61 deletions src/tcpkali_mavg.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Machine Zone, Inc.
* Copyright (c) 2014, 2017 Machine Zone, Inc.
*
* Original author: Lev Walkin <[email protected]>
*
Expand Down Expand Up @@ -38,95 +38,117 @@
#include <assert.h>

typedef struct {
double
unprocessed_events; /* Events not incorporated into historic_average. */
double smoothing_window; /* Smooth (produce moving average) over the given
time. */
double last_update_ts; /* Last time we updated the moving average. */
double historic_average; /* Number of events in the smoothing window. */
} mavg;
const double decay_factor;
double accumulator;
} exp_moving_average;

/*
* Figure out how often we aggregate data.
* Aggregate the events at least 8 times per smoothing window
* (if the window is tiny), but no less frequently than 8 times a second.
*/
static double __attribute__((unused)) mavg_aggregate_over(mavg *m) {
return m->smoothing_window > 1.0 ? 0.125 : m->smoothing_window / 8;
static void __attribute__((unused))
exp_moving_average_init(exp_moving_average *ema, double decay) {
assert(decay > 0 && decay <= 1);
*(double *)&ema->decay_factor = decay;
ema->accumulator = 0 / 0.0;
}

static void __attribute__((unused))
exp_moving_average_add(exp_moving_average *ema, double x) {
if(isfinite(ema->accumulator) == 0)
ema->accumulator = x;
else
ema->accumulator =
ema->decay_factor * x + (1 - ema->decay_factor) * ema->accumulator;
}

typedef struct {
const double aggregate_window; /* Time window to aggregate data in. */
double events; /* Aggregated events for rate calculation. */
double right_edge_ts; /* Time to aggregate a new rate point. */
double update_ts; /* Last time we got an update. */
exp_moving_average storage; /* Aggregated measurement results. */
} mavg;

/*
* Initialize the empty moving average structure. Keep the smoothing window
* within the structure.
* Typical size of the aggregate_window is (1/8.0) seconds.
* Typical size of the smoothing window is 3.0 seconds.
*/
static void __attribute__((unused))
mavg_init(mavg *m, double now, double window) {
assert(window > 0.0);
memset(m, 0, sizeof(*m));
m->smoothing_window = window;
m->last_update_ts = now;
m->historic_average = 0 / 0.0;
mavg_init(mavg *m, double start_time, double aggregate_window,
double smoothing_window) {
assert(aggregate_window <= smoothing_window);
exp_moving_average_init(&m->storage,
aggregate_window / smoothing_window);
m->events = 0;
*(double *)&m->aggregate_window = aggregate_window;
m->right_edge_ts = start_time + aggregate_window;
m->update_ts = start_time;
}

static double __attribute__((unused))
mavg_smoothing_window_s(mavg *m) {
return m->aggregate_window / m->storage.decay_factor;
}

/*
* Bump the moving average with the specified number of events.
*/
static void __attribute__((unused))
mavg_bump(mavg *m, double now, double events) {
double aggregate_over = mavg_aggregate_over(m);
double elapsed = now - m->last_update_ts;
mavg_add(mavg *m, double now, double new_events) {

if(events == 0.0 && m->unprocessed_events == 0.0) return;
if(new_events == 0.0 && m->events == 0.0)
return;

if(elapsed < aggregate_over) {
m->unprocessed_events += events;
} else {
assert(!(new_events < 0));

if(m->right_edge_ts > now) {
/*
* Start fast: do not slowly ramp up over smoothing_window.
* Just extrapolate with what we know already.
* We are still aggregating results and has not reached
* the edge of the update window
*/
if(isfinite(m->historic_average) == 0) {
m->historic_average =
aggregate_over * (m->unprocessed_events + events) / elapsed;
m->unprocessed_events = 0;
m->last_update_ts = now;
return;
}

double window = m->smoothing_window;
double unproc = m->unprocessed_events;
double prev_win_avg =
unproc
+ (m->historic_average - unproc) * exp(-aggregate_over / window);
m->historic_average =
prev_win_avg * exp((aggregate_over - elapsed) / window);
m->unprocessed_events = events; /* Save these new events for later. */
m->last_update_ts = now;
m->events += new_events;
m->update_ts = now;
return;
}

if(m->right_edge_ts + mavg_smoothing_window_s(m) < now) {
m->right_edge_ts = now + m->aggregate_window;
m->update_ts = now;
m->events = 0;
m->storage.accumulator = new_events / m->aggregate_window;
return;
}

/*
* Using a model where events arrive at the same rate,
* distribute them between timeframe to measure/store,
* and future events.
*/
double old_events = new_events * (m->right_edge_ts - m->update_ts)
/ (now - m->update_ts);
exp_moving_average_add(&m->storage,
(m->events + old_events) / m->aggregate_window);
m->update_ts = m->right_edge_ts;
m->right_edge_ts += m->aggregate_window;
m->events = 0;

/* If last event was long ago - we want to make a set of updates */
mavg_add(m, now, new_events - old_events);
}

static double __attribute__((unused)) mavg_per_second(mavg *m, double now) {
double elapsed = now - m->last_update_ts;
if(elapsed > m->smoothing_window) {
double elapsed = now - m->update_ts;
if(elapsed > mavg_smoothing_window_s(m)) {
/*
* If we stopped for too long, we report zero.
* Otherwise we'll never reach zero (traffic, hits/s),
* just asymptotically approach it.
* This is confusing for humans. So we report zero.
*/
return 0.0;
} else if(isfinite(m->historic_average)) {
if(m->unprocessed_events) {
mavg_bump(m, now, 0);
}
double aggregate_over = mavg_aggregate_over(m);
double window = m->smoothing_window;
double unproc = m->unprocessed_events;
double prev_win_avg =
unproc
+ (m->historic_average - unproc) * exp(-aggregate_over / window);
double avg = prev_win_avg * exp((aggregate_over - elapsed) / window);
return avg / aggregate_over;
} else if(isfinite(m->storage.accumulator)) {
return m->storage.accumulator
* pow(1 - m->storage.decay_factor,
(now - m->update_ts) / m->aggregate_window);
} else {
return 0.0;
}
Expand Down
19 changes: 10 additions & 9 deletions src/tcpkali_run.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ format_latency(char *buf, size_t size, const char *prefix,
if(hist) {
if(hist->total_count) {
return snprintf(buf, size, "%s%.1f ", prefix,
hdr_value_at_percentile(hist, 95.0)
/ 10.0);
hdr_value_at_percentile(hist, 95.0) / 10.0);
} else {
return snprintf(buf, size, "%s? ", prefix);
}
Expand Down Expand Up @@ -236,7 +235,6 @@ static enum {

enum oc_return_value
open_connections_until_maxed_out(enum work_phase phase, struct oc_args *args) {

tk_now_update(TK_DEFAULT);
double now = tk_now(TK_DEFAULT);

Expand All @@ -255,11 +253,12 @@ open_connections_until_maxed_out(enum work_phase phase, struct oc_args *args) {
ssize_t conn_deficit = 1; /* Assume connections still have to be est. */

statsd_report_latency_types requested_latency_types =
engine_params(args->eng)->latency_setting;
engine_params(args->eng)->latency_setting;
if(requested_latency_types && args->latency_window
&& !args->previous_window_latency) {
engine_prepare_latency_snapshot(args->eng);
args->previous_window_latency = engine_collect_latency_snapshot(args->eng);
args->previous_window_latency =
engine_collect_latency_snapshot(args->eng);
}

while(now < args->epoch_end && !args->term_flag
Expand Down Expand Up @@ -298,8 +297,10 @@ open_connections_until_maxed_out(enum work_phase phase, struct oc_args *args) {
non_atomic_traffic_stats traffic_delta =
subtract_traffic_stats(args->checkpoint.last_traffic_stats, _last);

mavg_bump(&args->traffic_mavgs[0], now, (double)traffic_delta.bytes_rcvd);
mavg_bump(&args->traffic_mavgs[1], now, (double)traffic_delta.bytes_sent);
mavg_add(&args->traffic_mavgs[0], now,
(double)traffic_delta.bytes_rcvd);
mavg_add(&args->traffic_mavgs[1], now,
(double)traffic_delta.bytes_sent);

double bps_in = 8 * mavg_per_second(&args->traffic_mavgs[0], now);
double bps_out = 8 * mavg_per_second(&args->traffic_mavgs[1], now);
Expand Down Expand Up @@ -338,8 +339,8 @@ open_connections_until_maxed_out(enum work_phase phase, struct oc_args *args) {
} else {
/* Latencies are sent in-line with byte stats, few times a second */
feedback.latency = latency;
report_to_statsd(args->statsd, &feedback,
requested_latency_types, args->latency_percentiles);
report_to_statsd(args->statsd, &feedback, requested_latency_types,
args->latency_percentiles);
}

if(args->print_stats) {
Expand Down

0 comments on commit 22d3ed3

Please sign in to comment.