Skip to content

Commit

Permalink
add expression parsing for --latency-marker
Browse files Browse the repository at this point in the history
  • Loading branch information
lwalkin committed Apr 18, 2015
1 parent 7a086c2 commit 5767368
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 52 deletions.
5 changes: 5 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@

0.6: ??
* Parse \{connection.uid} type expressions in --first-message,
--message, --latency-marker parameters, making sent data unique per
connection.

0.5: 2015-Apr-14
* --enable-asan and --enable-tsan flags to enable address/thread sanitizer.
* Do not account latencies for --first-message.
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
AC_INIT(tcpkali, 0.5, [email protected])
AC_INIT(tcpkali, 0.6, [email protected])

AC_CONFIG_SRCDIR([src/tcpkali.c])
AC_CONFIG_AUX_DIR(config)
Expand Down
19 changes: 12 additions & 7 deletions src/tcpkali.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,16 +420,21 @@ int main(int argc, char **argv) {
conf.websocket_enable = 1;
engine_params.websocket_enable = 1;
break;
case 'L':
engine_params.latency_marker_data = (uint8_t *)strdup(optarg);
engine_params.latency_marker_size = strlen(optarg);
case 'L': { /* --latency-marker */
char *data = strdup(optarg);
size_t size = strlen(optarg);
if(unescape_message_data)
unescape(engine_params.latency_marker_data,
&engine_params.latency_marker_size);
if(engine_params.latency_marker_size == 0) {
unescape(data, &size);
if(size == 0) {
fprintf(stderr, "--latency-marker: Non-empty marker expected\n");
exit(EX_USAGE);
}
if(parse_expression(&engine_params.latency_marker, data, size, 0)
== -1) {
fprintf(stderr, "--latency-marker: Failed to parse expression\n");
exit(EX_USAGE);
}
}
break;
default:
fprintf(stderr, "%s: unknown option\n", option);
Expand Down Expand Up @@ -519,7 +524,7 @@ int main(int argc, char **argv) {
* Check that we will actually send messages
* if we are also told to measure latency.
*/
if(engine_params.latency_marker_data) {
if(engine_params.latency_marker) {
if(engine_params.data_template.once_size
== engine_params.data_template.total_size
|| (argc - optind == 0)) {
Expand Down
87 changes: 55 additions & 32 deletions src/tcpkali_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ struct connection {
unsigned message_bytes_credit; /* See (EXPL:1) below. */
/* Boyer-Moore-Horspool substring search algorithm data */
struct StreamBMH *sbmh_ctx;
/* The following fields might be shared across connections. */
int sbmh_shared;
struct StreamBMH_Occ *sbmh_occ;
const uint8_t *sbmh_data;
size_t sbmh_size;
} latency;
};

Expand Down Expand Up @@ -296,11 +301,14 @@ struct engine *engine_start(struct engine_params params) {
eng->global_feedback_pipe_rd = gfbk_pipe_rd;

/*
* Initialize the Boyer-Moore-Horspool occurrences table once.
* Initialize the Boyer-Moore-Horspool occurrences table once,
* if it can be shared between connections. It can only be shared
* if it is trivial (does not depend on dynamic \{expressions}).
*/
if(params.latency_marker_data) {
sbmh_init(NULL, &params.sbmh_occ,
params.latency_marker_data, params.latency_marker_size);
if(params.latency_marker && EXPR_IS_TRIVIAL(params.latency_marker)) {
sbmh_init(NULL, &params.sbmh_shared_occ,
(void *)params.latency_marker->u.data.data,
params.latency_marker->u.data.size);
}

for(int n = 0; n < eng->n_workers; n++) {
Expand All @@ -311,7 +319,7 @@ struct engine *engine_start(struct engine_params params) {
largs->remote_stats = calloc(params.remote_addresses.n_addrs, sizeof(largs->remote_stats[0]));
largs->address_offset = n;
largs->thread_no = n;
if(params.latency_marker_data) {
if(params.latency_marker) {
int decims_in_1s = 10 * 1000; /* decimilliseconds, 1/10 ms */
int ret = hdr_init(
1, /* 1/10 milliseconds is the lowest storable value. */
Expand Down Expand Up @@ -947,6 +955,14 @@ explode_data_template(struct transport_data_spec *data, struct loop_arguments *l
return *data;
}

static void
explode_string_expression(char **buf_p, size_t *size, tk_expr_t *expr, struct loop_arguments *largs UNUSED, struct connection *conn) {
*buf_p = 0;
ssize_t s = eval_expression(buf_p, 0, expr, expr_callback, conn);
assert(s >= 0);
*size = s;
}

static void start_new_connection(TK_P) {
struct loop_arguments *largs = tk_userdata(TK_A);
struct remote_stats *remote_stats;
Expand Down Expand Up @@ -1045,20 +1061,35 @@ static void start_new_connection(TK_P) {
largs->params.channel_send_rate,
conn->data.single_message_size);

if(largs->params.latency_marker_data
&& conn->data.single_message_size) {
if(largs->params.latency_marker && conn->data.single_message_size) {
conn->latency.message_bytes_credit /* See (EXPL:1) below. */
= conn->data.single_message_size - 1;

/*
* Initialize the Boyer-Moore-Horspool context for substring search.
*/
conn->latency.sbmh_ctx
= malloc(SBMH_SIZE(largs->params.latency_marker_size));
struct StreamBMH_Occ *init_occ = NULL;
if(EXPR_IS_TRIVIAL(largs->params.latency_marker)) {
/* Shared search table and expression */
conn->latency.sbmh_shared = 1;
conn->latency.sbmh_occ = &largs->params.sbmh_shared_occ;
conn->latency.sbmh_data = (uint8_t *)largs->params.latency_marker->u.data.data;
conn->latency.sbmh_size = largs->params.latency_marker->u.data.size;
} else {
/* Individual search table. */
conn->latency.sbmh_shared = 0;
conn->latency.sbmh_occ = malloc(sizeof(*conn->latency.sbmh_occ));
assert(conn->latency.sbmh_occ);
init_occ = conn->latency.sbmh_occ;
explode_string_expression((char **)&conn->latency.sbmh_data,
&conn->latency.sbmh_size,
largs->params.latency_marker,
largs, conn);
}
conn->latency.sbmh_ctx = malloc(SBMH_SIZE(conn->latency.sbmh_size));
assert(conn->latency.sbmh_ctx);
sbmh_init(conn->latency.sbmh_ctx, NULL,
largs->params.latency_marker_data,
largs->params.latency_marker_size);
sbmh_init(conn->latency.sbmh_ctx, init_occ,
conn->latency.sbmh_data, conn->latency.sbmh_size);

/*
* Initialize the latency histogram by copying out the template
Expand Down Expand Up @@ -1389,21 +1420,10 @@ static void update_io_interest(TK_P_ struct connection *conn, int events) {
#endif
}

static void latency_record_outgoing_ts(TK_P_ struct connection *conn, struct transport_data_spec *data, const void *ptr, size_t wrote) {
static void latency_record_outgoing_ts(TK_P_ struct connection *conn, size_t wrote) {
if(!conn->latency.sent_timestamps)
return;

void *data_start = data->ptr + data->once_size;
if(ptr < data_start) {
/* Ignore the --first-message in our calculations. */
if(wrote > (size_t)(data_start - ptr)) {
wrote -= (data_start - ptr);
ptr = data_start;
} else {
return;
}
}

struct loop_arguments *largs = tk_userdata(TK_A);

/*
Expand Down Expand Up @@ -1450,15 +1470,13 @@ static void latency_record_incoming_ts(TK_P_ struct connection *conn, char *buf,
if(!conn->latency.sent_timestamps)
return;

struct loop_arguments *largs = tk_userdata(TK_A);

uint8_t *lm = largs->params.latency_marker_data;
size_t lm_size = largs->params.latency_marker_size;
const uint8_t *lm = conn->latency.sbmh_data;
size_t lm_size = conn->latency.sbmh_size;
int num_markers_found = 0;

for(; size > 0; ) {
size_t analyzed = sbmh_feed(conn->latency.sbmh_ctx,
&largs->params.sbmh_occ,
conn->latency.sbmh_occ,
lm, lm_size, (unsigned char *)buf, size);
if(conn->latency.sbmh_ctx->found == sbmh_true) {
buf += analyzed;
Expand Down Expand Up @@ -1666,8 +1684,7 @@ static void connection_cb(TK_P_ tk_io *w, int revents) {
wrote -= available_header;
if(wrote > 0) {
/* Record latencies for the body only, not headers */
latency_record_outgoing_ts(TK_A_ conn,
&conn->data, position, wrote);
latency_record_outgoing_ts(TK_A_ conn, wrote);
}
}
}
Expand Down Expand Up @@ -1730,8 +1747,14 @@ static void connection_free_internals(struct connection *conn) {
free(conn->latency.histogram);

/* Remove Boyer-Moore-Horspool string search context. */
if(conn->latency.sbmh_ctx)
if(conn->latency.sbmh_ctx) {
free(conn->latency.sbmh_ctx);
if(conn->latency.sbmh_shared == 0) {
free(conn->latency.sbmh_occ);
free((void *)conn->latency.sbmh_data);
}
}

}

/*
Expand Down
6 changes: 3 additions & 3 deletions src/tcpkali_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "tcpkali_transport.h"
#include "tcpkali_atomic.h"
#include "tcpkali_rate.h"
#include "tcpkali_expr.h"

long number_of_cpus();

Expand Down Expand Up @@ -65,9 +66,8 @@ struct engine_params {
int websocket_enable; /* Enable Websocket responder on (-l) */
/* Pre-computed message data template */
struct transport_data_spec data_template;
uint8_t *latency_marker_data; /* --latency-marker */
size_t latency_marker_size;
struct StreamBMH_Occ sbmh_occ; /* Streaming Boyer-Moore-Horspool */
tk_expr_t *latency_marker; /* --latency-marker */
struct StreamBMH_Occ sbmh_shared_occ; /* Streaming Boyer-Moore-Horspool */
};

struct engine *engine_start(struct engine_params);
Expand Down
27 changes: 19 additions & 8 deletions src/tcpkali_expr.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,31 @@ parse_payload_data(struct transport_data_spec *data, int debug) {
switch(parse_expression(&expr,
(char *)data->ptr + data->ws_hdr_size,
data->once_size - data->ws_hdr_size, debug)) {
case 0: break;
default: return -1;
case 0:
free_expr(expr);
break;
case 1:
data->expr_head = expr;
data->flags |= TDS_FLAG_EXPRESSION;
break;
default:
return -1;
}

/*
* Attempt to find expression in body.
*/
switch(parse_expression(&expr, (char *)data->ptr + data->once_size,
data->total_size - data->once_size, debug)) {
case 0: break;
default: return -1;
case 0:
free_expr(expr);
break;
case 1:
data->expr_body = expr;
data->flags |= TDS_FLAG_EXPRESSION;
break;
default:
return -1;
}

return (data->flags & TDS_FLAG_EXPRESSION) ? 1 : 0;
Expand All @@ -122,12 +128,17 @@ parse_expression(tk_expr_t **expr_p, const char *buf, size_t size, int debug) {
if(ret == 0) {
assert(expr);
if(expr->type == EXPR_DATA) {
free_expr(expr);
/* Trivial expression found, should exactly match input. */
assert(expr->u.data.size == size);
assert(memcmp(expr->u.data.data, buf, size) == 0);
if(expr_p) *expr_p = expr;
else free_expr(expr);
return 0; /* No expression found */
} else {
if(expr_p) *expr_p = expr;
else free_expr(expr);
return 1;
}
if(expr_p) *expr_p = expr;
else free_expr(expr);
return 1;
} else {
char tmp[PRINTABLE_DATA_SUGGESTED_BUFFER_SIZE(size)];
DEBUG("Failed to parse \"%s\"\n",
Expand Down
16 changes: 15 additions & 1 deletion src/tcpkali_expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,23 @@ typedef struct tk_expr {
size_t estimate_size;
} tk_expr_t;

int parse_payload_data(struct transport_data_spec *data, int debug);
/*
* Trivial expression is expression which does not contain any
* interesting (dynamically computable) values and is basically
* a constant string.
*/
#define EXPR_IS_TRIVIAL(e) ((e)->type == EXPR_DATA)

/*
* Return values:
* 0: No expression was found, returns a simple EXPR_DATA node.
* 1: An expression was found, returned.
* -1: Expression parsing failed.
*/
int parse_expression(tk_expr_t **, const char *expr_buf, size_t size, int debug);

int parse_payload_data(struct transport_data_spec *data, int debug);

typedef ssize_t (expr_callback_f)(char *buf, size_t size, tk_expr_t *, void *key);
ssize_t eval_expression(char **buf_p, size_t size, tk_expr_t *, expr_callback_f, void *key);

Expand Down

0 comments on commit 5767368

Please sign in to comment.