-
Notifications
You must be signed in to change notification settings - Fork 198
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
remove bias from moving average estimator
- Loading branch information
Alexey Pyalling
authored and
lwalkin
committed
Jan 17, 2017
1 parent
ffd2a4e
commit 22d3ed3
Showing
3 changed files
with
95 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright (c) 2014 Machine Zone, Inc. | ||
* Copyright (c) 2014, 2017 Machine Zone, Inc. | ||
* | ||
* Original author: Lev Walkin <[email protected]> | ||
* | ||
|
@@ -38,95 +38,117 @@ | |
#include <assert.h> | ||
|
||
typedef struct { | ||
double | ||
unprocessed_events; /* Events not incorporated into historic_average. */ | ||
double smoothing_window; /* Smooth (produce moving average) over the given | ||
time. */ | ||
double last_update_ts; /* Last time we updated the moving average. */ | ||
double historic_average; /* Number of events in the smoothing window. */ | ||
} mavg; | ||
const double decay_factor; | ||
double accumulator; | ||
} exp_moving_average; | ||
|
||
/* | ||
* Figure out how often we aggregate data. | ||
* Aggregate the events at least 8 times per smoothing window | ||
* (if the window is tiny), but no less frequently than 8 times a second. | ||
*/ | ||
static double __attribute__((unused)) mavg_aggregate_over(mavg *m) { | ||
return m->smoothing_window > 1.0 ? 0.125 : m->smoothing_window / 8; | ||
static void __attribute__((unused)) | ||
exp_moving_average_init(exp_moving_average *ema, double decay) { | ||
assert(decay > 0 && decay <= 1); | ||
*(double *)&ema->decay_factor = decay; | ||
ema->accumulator = 0 / 0.0; | ||
} | ||
|
||
static void __attribute__((unused)) | ||
exp_moving_average_add(exp_moving_average *ema, double x) { | ||
if(isfinite(ema->accumulator) == 0) | ||
ema->accumulator = x; | ||
else | ||
ema->accumulator = | ||
ema->decay_factor * x + (1 - ema->decay_factor) * ema->accumulator; | ||
} | ||
|
||
typedef struct { | ||
const double aggregate_window; /* Time window to aggregate data in. */ | ||
double events; /* Aggregated events for rate calculation. */ | ||
double right_edge_ts; /* Time to aggregate a new rate point. */ | ||
double update_ts; /* Last time we got an update. */ | ||
exp_moving_average storage; /* Aggregated measurement results. */ | ||
} mavg; | ||
|
||
/* | ||
* Initialize the empty moving average structure. Keep the smoothing window | ||
* within the structure. | ||
* Typical size of the aggregate_window is (1/8.0) seconds. | ||
* Typical size of the smoothing window is 3.0 seconds. | ||
*/ | ||
static void __attribute__((unused)) | ||
mavg_init(mavg *m, double now, double window) { | ||
assert(window > 0.0); | ||
memset(m, 0, sizeof(*m)); | ||
m->smoothing_window = window; | ||
m->last_update_ts = now; | ||
m->historic_average = 0 / 0.0; | ||
mavg_init(mavg *m, double start_time, double aggregate_window, | ||
double smoothing_window) { | ||
assert(aggregate_window <= smoothing_window); | ||
exp_moving_average_init(&m->storage, | ||
aggregate_window / smoothing_window); | ||
m->events = 0; | ||
*(double *)&m->aggregate_window = aggregate_window; | ||
m->right_edge_ts = start_time + aggregate_window; | ||
m->update_ts = start_time; | ||
} | ||
|
||
static double __attribute__((unused)) | ||
mavg_smoothing_window_s(mavg *m) { | ||
return m->aggregate_window / m->storage.decay_factor; | ||
} | ||
|
||
/* | ||
* Bump the moving average with the specified number of events. | ||
*/ | ||
static void __attribute__((unused)) | ||
mavg_bump(mavg *m, double now, double events) { | ||
double aggregate_over = mavg_aggregate_over(m); | ||
double elapsed = now - m->last_update_ts; | ||
mavg_add(mavg *m, double now, double new_events) { | ||
|
||
if(events == 0.0 && m->unprocessed_events == 0.0) return; | ||
if(new_events == 0.0 && m->events == 0.0) | ||
return; | ||
|
||
if(elapsed < aggregate_over) { | ||
m->unprocessed_events += events; | ||
} else { | ||
assert(!(new_events < 0)); | ||
|
||
if(m->right_edge_ts > now) { | ||
/* | ||
* Start fast: do not slowly ramp up over smoothing_window. | ||
* Just extrapolate with what we know already. | ||
* We are still aggregating results and has not reached | ||
* the edge of the update window | ||
*/ | ||
if(isfinite(m->historic_average) == 0) { | ||
m->historic_average = | ||
aggregate_over * (m->unprocessed_events + events) / elapsed; | ||
m->unprocessed_events = 0; | ||
m->last_update_ts = now; | ||
return; | ||
} | ||
|
||
double window = m->smoothing_window; | ||
double unproc = m->unprocessed_events; | ||
double prev_win_avg = | ||
unproc | ||
+ (m->historic_average - unproc) * exp(-aggregate_over / window); | ||
m->historic_average = | ||
prev_win_avg * exp((aggregate_over - elapsed) / window); | ||
m->unprocessed_events = events; /* Save these new events for later. */ | ||
m->last_update_ts = now; | ||
m->events += new_events; | ||
m->update_ts = now; | ||
return; | ||
} | ||
|
||
if(m->right_edge_ts + mavg_smoothing_window_s(m) < now) { | ||
m->right_edge_ts = now + m->aggregate_window; | ||
m->update_ts = now; | ||
m->events = 0; | ||
m->storage.accumulator = new_events / m->aggregate_window; | ||
return; | ||
} | ||
|
||
/* | ||
* Using a model where events arrive at the same rate, | ||
* distribute them between timeframe to measure/store, | ||
* and future events. | ||
*/ | ||
double old_events = new_events * (m->right_edge_ts - m->update_ts) | ||
/ (now - m->update_ts); | ||
exp_moving_average_add(&m->storage, | ||
(m->events + old_events) / m->aggregate_window); | ||
m->update_ts = m->right_edge_ts; | ||
m->right_edge_ts += m->aggregate_window; | ||
m->events = 0; | ||
|
||
/* If last event was long ago - we want to make a set of updates */ | ||
mavg_add(m, now, new_events - old_events); | ||
} | ||
|
||
static double __attribute__((unused)) mavg_per_second(mavg *m, double now) { | ||
double elapsed = now - m->last_update_ts; | ||
if(elapsed > m->smoothing_window) { | ||
double elapsed = now - m->update_ts; | ||
if(elapsed > mavg_smoothing_window_s(m)) { | ||
/* | ||
* If we stopped for too long, we report zero. | ||
* Otherwise we'll never reach zero (traffic, hits/s), | ||
* just asymptotically approach it. | ||
* This is confusing for humans. So we report zero. | ||
*/ | ||
return 0.0; | ||
} else if(isfinite(m->historic_average)) { | ||
if(m->unprocessed_events) { | ||
mavg_bump(m, now, 0); | ||
} | ||
double aggregate_over = mavg_aggregate_over(m); | ||
double window = m->smoothing_window; | ||
double unproc = m->unprocessed_events; | ||
double prev_win_avg = | ||
unproc | ||
+ (m->historic_average - unproc) * exp(-aggregate_over / window); | ||
double avg = prev_win_avg * exp((aggregate_over - elapsed) / window); | ||
return avg / aggregate_over; | ||
} else if(isfinite(m->storage.accumulator)) { | ||
return m->storage.accumulator | ||
* pow(1 - m->storage.decay_factor, | ||
(now - m->update_ts) / m->aggregate_window); | ||
} else { | ||
return 0.0; | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters