Skip to content

Commit

Permalink
emit renamed into move; we'll get to limit the inbound bandwidth
Browse files Browse the repository at this point in the history
  • Loading branch information
lwalkin committed Feb 12, 2015
1 parent 8122011 commit 5e93a81
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/tcpkali.c
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ int main(int argc, char **argv) {
}
engine_params.channel_bandwidth_Bps = message_bandwidth;
/* Write in msize blocks unless they're large, then use default. */
engine_params.minimal_write_size = msize < 1460 ? msize : 0;
engine_params.minimal_move_size = msize < 1460 ? msize : 0;
}

if(optind == argc && conf.listen_port == 0) {
Expand Down Expand Up @@ -712,7 +712,7 @@ static int open_connections_until_maxed_out(struct engine *eng, double connect_r
to_start = conn_deficit;
}
engine_initiate_new_connections(eng, to_start);
pacefier_emitted(&keepup_pace, connect_rate, allowed, now);
pacefier_moved(&keepup_pace, connect_rate, allowed, now);

/* Do not update/print checkpoint stats too often. */
if(update_stats) {
Expand Down
8 changes: 4 additions & 4 deletions src/tcpkali_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ struct engine *engine_start(struct engine_params params) {
* instead of sending short messages one by one.
*/
replicate_payload(&params.data, 64*1024);
if(params.minimal_write_size == 0)
params.minimal_write_size = 1460; /* ~MTU */
if(params.minimal_move_size == 0)
params.minimal_move_size = 1460; /* ~MTU */
params.epoch = tk_now(TK_DEFAULT); /* Single epoch for all threads */

struct engine *eng = calloc(1, sizeof(*eng));
Expand Down Expand Up @@ -1304,7 +1304,7 @@ static void connection_cb(TK_P_ tk_io *w, int revents) {
size_t bw = largs->params.channel_bandwidth_Bps;
if(bw != 0) {
size_t bytes = pacefier_allow(&conn->bw_pace, bw, tk_now(TK_A));
size_t smallest_block_to_send = largs->params.minimal_write_size;
size_t smallest_block_to_send = largs->params.minimal_move_size;
if(bytes < smallest_block_to_send) {
double delay = (double)(smallest_block_to_send-bytes)/bw;
if(delay > 1.0) delay = 1.0;
Expand Down Expand Up @@ -1341,7 +1341,7 @@ static void connection_cb(TK_P_ tk_io *w, int revents) {
} else {
conn->write_offset += wrote;
conn->data_sent += wrote;
if(bw) pacefier_emitted(&conn->bw_pace, bw, wrote, tk_now(TK_A));
if(bw) pacefier_moved(&conn->bw_pace, bw, wrote, tk_now(TK_A));
latency_record_outgoing_ts(TK_A_ conn, &largs->params.data, position, wrote);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/tcpkali_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct engine_params {
struct addresses listen_addresses;
size_t requested_workers; /* Number of threads to start */
size_t channel_bandwidth_Bps; /* Single channel bw, bytes per second. */
size_t minimal_write_size;
size_t minimal_move_size; /* Number of bytes to read/write at once */
enum {
DBG_ALWAYS,
DBG_ERROR,
Expand Down
16 changes: 8 additions & 8 deletions src/tcpkali_pacefier.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Machine Zone, Inc.
* Copyright (c) 2014, 2015 Machine Zone, Inc.
*
* Original author: Lev Walkin <[email protected]>
*
Expand Down Expand Up @@ -37,29 +37,29 @@ pacefier_init(struct pacefier *p, double now) {
}

/*
* Get the number of events we can emit now, since we've advanced our time
* Get the number of events we can move now, since we've advanced our time
* forward a little.
*/
static inline size_t
pacefier_allow(struct pacefier *p, double events_per_second, double now) {
double elapsed = now - p->previous_ts;
ssize_t emit_events = elapsed * events_per_second; /* Implicit rounding */
if(emit_events > 0)
return emit_events;
ssize_t move_events = elapsed * events_per_second; /* Implicit rounding */
if(move_events > 0)
return move_events;
else
return 0;
}

/*
* Record the actually emitted events.
* Record the actually moved events.
*/
static inline void
pacefier_emitted(struct pacefier *p, double events_per_second, size_t emitted, double now) {
pacefier_moved(struct pacefier *p, double events_per_second, size_t moved, double now) {
/*
* The number of allowed events is almost always less
* than what's actually computed.
*/
p->previous_ts += emitted/events_per_second;
p->previous_ts += moved/events_per_second;
/*
* If the process cannot keep up with the pace, it will result in
* previous_ts shifting in the past more and more with time.
Expand Down

0 comments on commit 5e93a81

Please sign in to comment.