Skip to content

Commit

Permalink
quick detection of the marker token in output stream
Browse files Browse the repository at this point in the history
  • Loading branch information
lwalkin committed Jan 19, 2017
1 parent f0af1db commit 7462580
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 8 deletions.
37 changes: 30 additions & 7 deletions src/tcpkali_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -2255,17 +2255,28 @@ scan_incoming_bytes(TK_P_ struct connection *conn, char *buf, size_t size) {
}


void update_timestamps(char *ptr, size_t size) {
static void override_timestamp(char *ptr, size_t size, unsigned long long ts) {
assert(size >= (sizeof(MESSAGE_MARKER_TOKEN)-1) + 16 + 1);
assert(ptr[0] == MESSAGE_MARKER_TOKEN[0]);
ptr += sizeof(MESSAGE_MARKER_TOKEN) - 1;
snprintf(ptr, size, "%016llx", ts);
ptr[16] = '!';
}

static void update_timestamps(char *ptr, size_t size) {
struct timeval tp;
gettimeofday(&tp, NULL);
unsigned long long ts = (unsigned long long)tp.tv_sec * 1000000 + tp.tv_usec;
char *end = ptr + size;
while((ptr = memmem(ptr, end - ptr, MESSAGE_MARKER_TOKEN, sizeof(MESSAGE_MARKER_TOKEN) - 1))) {
if(end - ptr < 15 + 16 + 1) break;
ptr += sizeof(MESSAGE_MARKER_TOKEN) - 1;
sprintf(ptr, "%016llx", ts);
ptr += 16;
*ptr++ = '!';
size_t remaining = end - ptr;
const size_t full_marker = (sizeof(MESSAGE_MARKER_TOKEN)-1) + 16 + 1;
if(remaining >= full_marker) {
override_timestamp(ptr, remaining, ts);
ptr += full_marker;
} else {
break;
}
}
}

Expand Down Expand Up @@ -2330,7 +2341,19 @@ largest_contiguous_chunk(struct loop_arguments *largs, struct connection *conn,
}

if(largs->params.message_marker) {
update_timestamps((char*) *position, *available_body);
if(conn->data.marker_token_ptr >= *position) {
/* Short-circquit search: we know where marker is, directly. */
struct timeval tp;
gettimeofday(&tp, NULL);
unsigned long long ts =
(unsigned long long)tp.tv_sec * 1000000 + tp.tv_usec;
size_t offset = conn->data.marker_token_ptr - *position;
override_timestamp(conn->data.marker_token_ptr,
(*available_body) - offset, ts);
} else {
/* Do a string search to find our markers and update timestamps */
update_timestamps((char *)*position, *available_body);
}
}
}

Expand Down
49 changes: 48 additions & 1 deletion src/tcpkali_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ replicate_payload(struct transport_data_spec *data, size_t target_size) {
size_t payload_size = data->total_size - data->once_size;

assert(!(data->flags & TDS_FLAG_REPLICATED));
assert(data->marker_token_ptr == 0); /* Can't be replicated */

if(!payload_size) {
/* Can't blow up an empty buffer. */
Expand Down Expand Up @@ -327,6 +328,31 @@ message_collection_has(const struct message_collection *mc, enum tk_expr_type t)
return 0;
}

typedef struct {
expr_callback_f *original_callback;
void *original_key;
void **marker_ptr_ptr;
int multiple_message_markers;
} callback_wrapper_key_t;
static ssize_t
callback_wrapper(char *buf, size_t size, tk_expr_t *expr, void *key,
long *output_value) {
callback_wrapper_key_t *wkey = key;

if(expr->type == EXPR_MESSAGE_MARKER) {
if(!*wkey->marker_ptr_ptr && !wkey->multiple_message_markers) {
*wkey->marker_ptr_ptr = buf;
} else {
/* Use more generic (slow) scanning algorithm later */
wkey->multiple_message_markers = 1;
*wkey->marker_ptr_ptr = 0;
}
}

return wkey->original_callback(buf, size, expr, wkey->original_key,
output_value);
}


struct transport_data_spec *
transport_spec_from_message_collection(struct transport_data_spec *out_spec,
Expand Down Expand Up @@ -361,6 +387,9 @@ transport_spec_from_message_collection(struct transport_data_spec *out_spec,
data_spec->total_size = data_spec->once_size;
}

callback_wrapper_key_t callback_key = {.original_callback = optional_cb,
.original_key = expr_cb_key};

enum websocket_side ws_side =
(tws_side == TWS_SIDE_CLIENT) ? WS_SIDE_CLIENT : WS_SIDE_SERVER;

Expand All @@ -383,6 +412,7 @@ transport_spec_from_message_collection(struct transport_data_spec *out_spec,
}

size_t estimate_ws_frame_size = 0;
void *marker_ptr = 0;

if(snip->flags & MSK_EXPRESSION_FOUND) {
ssize_t reified_size;
Expand All @@ -402,12 +432,25 @@ transport_spec_from_message_collection(struct transport_data_spec *out_spec,
snip->expr->estimate_size);
tptr += estimate_ws_frame_size;
}
callback_key.marker_ptr_ptr = &marker_ptr;
reified_size = eval_expression(
(char **)&tptr,
data_spec->allocated_size
- (data_spec->total_size + estimate_ws_frame_size),
snip->expr, optional_cb, expr_cb_key, 0,
snip->expr, callback_wrapper, &callback_key, 0,
(tws_side == TWS_SIDE_CLIENT));
if(callback_key.multiple_message_markers) {
data_spec->marker_token_ptr = 0;
marker_ptr = 0;
} else if(marker_ptr) {
if(data_spec->marker_token_ptr) {
callback_key.multiple_message_markers = 1;
data_spec->marker_token_ptr = 0;
marker_ptr = 0;
} else {
data_spec->marker_token_ptr = marker_ptr;
}
}
assert(reified_size >= 0);
data = 0;
size = reified_size;
Expand Down Expand Up @@ -451,6 +494,10 @@ transport_spec_from_message_collection(struct transport_data_spec *out_spec,
+ data_spec->total_size
+ estimate_ws_frame_size,
size);
if(marker_ptr) {
marker_ptr -= estimate_ws_frame_size - ws_frame_size;
data_spec->marker_token_ptr = marker_ptr;
}
}
} else {
ws_frame_size = websocket_frame_header(
Expand Down
1 change: 1 addition & 0 deletions src/tcpkali_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ struct transport_data_spec {
size_t total_size;
size_t allocated_size;
size_t single_message_size;
void *marker_token_ptr;
enum transport_data_flags {
TDS_FLAG_NONE = 0x00,
TDS_FLAG_PTR_SHARED = 0x01, /* Disallow freeing .ptr field */
Expand Down
1 change: 1 addition & 0 deletions test/check-tcpkali-operation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ done
rm_testfile

check 24 "Packet rate estimate: (19|20)" ${TCPKALI} -m 'Foo\{message.marker}' -r10
check 25 "Packet rate estimate: (19|20)" ${TCPKALI} --ws -m '\{message.marker}\{re [a-z]{1,300}}' -r10

0 comments on commit 7462580

Please sign in to comment.