Skip to content

Commit a6eed3a

Browse files
author
Venkatesh Duggirala
committed
Bug#20574628: SEMI-SYNC REPLICATION PERFORMANCE DEGRADES WITH A HIGH NUMBER OF THREADS
Problem: when semi-sync replication is enabled and when the number of threads running is increasing (going beyond a threshold), performance of the system is going drastically down. Analysis: A transaction in semi-sync replication waits for the ACK inside ReplSemiSyncMaster::commitTrx on a condition variable COND_binlog_send_. In reportReplyBinlog function, when server receives an ACK, it sends condition broadcast to all the threads that are waiting on COND_binlog_send_. Each transaction thread waiting an ACK does the follow (simplified): - hold a lock LOCK_binlog_ - in a loop ( while (is_on()) ) : - compare the ACK position with its own position; - if the ACK is still behind, it waits on a condition variable COND_binlog_send_ - if the ACK is ahead or equal, it exits this loop and proceeds with the next step - unlock the lock LOCK_binlog_ The problem here with this design is that all the threads wake up and while it is likely that only few will return to the application and most of them will go back to wait on the same condition variable. This creates unnecessary context switches and a lot of LLC cache thrashing in multi-cpu systems which reduces the throughput of the system. Fix: The approach splits the *one* condvar used for all threads to wait into *several* condvars. Each of the condvars is associated with a position range. Thence only those that fall threads that are waiting for a given acknowledged range are awaken when the ACK comes back (as opposed to waking up all transactions, regardless whether the ACK affects them or not).
1 parent e2558f5 commit a6eed3a

2 files changed

Lines changed: 101 additions & 55 deletions

File tree

plugin/semisync/semisync_master.cc

Lines changed: 83 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/* Copyright (C) 2007 Google Inc.
2-
Copyright (c) 2008 MySQL AB, 2008-2009 Sun Microsystems, Inc.
3-
Use is subject to license terms.
2+
Copyright (c) 2008, 2015, Oracle and/or its affiliates. All rights reserved.
43
54
This program is free software; you can redistribute it and/or modify
65
it under the terms of the GNU General Public License as published by
@@ -224,6 +223,54 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
224223
return (entry != NULL);
225224
}
226225

226+
int ActiveTranx::signal_waiting_sessions_all()
227+
{
228+
const char *kWho = "ActiveTranx::signal_waiting_sessions_all";
229+
function_enter(kWho);
230+
for (TranxNode* entry= trx_front_; entry; entry=entry->next_)
231+
mysql_cond_broadcast(&entry->cond);
232+
233+
return function_exit(kWho, 0);
234+
}
235+
236+
int ActiveTranx::signal_waiting_sessions_up_to(const char *log_file_name,
237+
my_off_t log_file_pos)
238+
{
239+
const char *kWho = "ActiveTranx::signal_waiting_sessions_up_to";
240+
function_enter(kWho);
241+
242+
TranxNode* entry= trx_front_;
243+
int cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
244+
while (entry && cmp <= 0)
245+
{
246+
mysql_cond_broadcast(&entry->cond);
247+
entry= entry->next_;
248+
if (entry)
249+
cmp= ActiveTranx::compare(entry->log_name_, entry->log_pos_, log_file_name, log_file_pos) ;
250+
}
251+
252+
return function_exit(kWho, (entry != NULL));
253+
}
254+
255+
TranxNode * ActiveTranx::find_active_tranx_node(const char *log_file_name,
256+
my_off_t log_file_pos)
257+
{
258+
const char *kWho = "ActiveTranx::find_active_tranx_node";
259+
function_enter(kWho);
260+
261+
TranxNode* entry= trx_front_;
262+
263+
while (entry)
264+
{
265+
if (ActiveTranx::compare(log_file_name, log_file_pos, entry->log_name_,
266+
entry->log_pos_) <= 0)
267+
break;
268+
entry= entry->next_;
269+
}
270+
function_exit(kWho, 0);
271+
return entry;
272+
}
273+
227274
int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
228275
my_off_t log_file_pos)
229276
{
@@ -238,7 +285,8 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
238285

239286
while (new_front)
240287
{
241-
if (compare(new_front, log_file_name, log_file_pos) > 0)
288+
if (compare(new_front, log_file_name, log_file_pos) > 0 ||
289+
new_front->n_waiters > 0)
242290
break;
243291
new_front = new_front->next_;
244292
}
@@ -365,8 +413,6 @@ int ReplSemiSyncMaster::initObject()
365413
/* Mutex initialization can only be done after MY_INIT(). */
366414
mysql_mutex_init(key_ss_mutex_LOCK_binlog_,
367415
&LOCK_binlog_, MY_MUTEX_INIT_FAST);
368-
mysql_cond_init(key_ss_cond_COND_binlog_send_,
369-
&COND_binlog_send_, NULL);
370416

371417
if (rpl_semi_sync_master_enabled)
372418
result = enableMaster();
@@ -442,7 +488,6 @@ ReplSemiSyncMaster::~ReplSemiSyncMaster()
442488
if (init_done_)
443489
{
444490
mysql_mutex_destroy(&LOCK_binlog_);
445-
mysql_cond_destroy(&COND_binlog_send_);
446491
}
447492

448493
delete active_tranxs_;
@@ -458,22 +503,6 @@ void ReplSemiSyncMaster::unlock()
458503
mysql_mutex_unlock(&LOCK_binlog_);
459504
}
460505

461-
void ReplSemiSyncMaster::cond_broadcast()
462-
{
463-
mysql_cond_broadcast(&COND_binlog_send_);
464-
}
465-
466-
int ReplSemiSyncMaster::cond_timewait(struct timespec *wait_time)
467-
{
468-
const char *kWho = "ReplSemiSyncMaster::cond_timewait()";
469-
int wait_res;
470-
471-
function_enter(kWho);
472-
wait_res= mysql_cond_timedwait(&COND_binlog_send_,
473-
&LOCK_binlog_, wait_time);
474-
return function_exit(kWho, wait_res);
475-
}
476-
477506
void ReplSemiSyncMaster::add_slave()
478507
{
479508
lock();
@@ -579,10 +608,6 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
579608
reply_file_pos_ = log_file_pos;
580609
reply_file_name_inited_ = true;
581610

582-
/* Remove all active transaction nodes before this point. */
583-
assert(active_tranxs_ != NULL);
584-
active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
585-
586611
if (trace_level_ & kTraceDetail)
587612
{
588613
if(!skipped_event)
@@ -612,16 +637,14 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
612637
}
613638

614639
l_end:
615-
unlock();
616640

617641
if (can_release_threads)
618642
{
619643
if (trace_level_ & kTraceDetail)
620644
sql_print_information("%s: signal all waiting threads.", kWho);
621-
622-
cond_broadcast();
645+
active_tranxs_->signal_waiting_sessions_up_to(reply_file_name_, reply_file_pos_);
623646
}
624-
647+
unlock();
625648
return function_exit(kWho, 0);
626649
}
627650

@@ -648,8 +671,18 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
648671
/* Acquire the mutex. */
649672
lock();
650673

674+
TranxNode* entry= NULL;
675+
mysql_cond_t* thd_cond= NULL;
676+
if (active_tranxs_)
677+
{
678+
entry=
679+
active_tranxs_->find_active_tranx_node(trx_wait_binlog_name,
680+
trx_wait_binlog_pos);
681+
if (entry)
682+
thd_cond= &entry->cond;
683+
}
651684
/* This must be called after acquired the lock */
652-
THD_ENTER_COND(NULL, &COND_binlog_send_, &LOCK_binlog_,
685+
THD_ENTER_COND(NULL, thd_cond, &LOCK_binlog_,
653686
& stage_waiting_for_semi_sync_ack_from_slave,
654687
& old_stage);
655688

@@ -751,7 +784,11 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
751784
kWho, wait_timeout_,
752785
wait_file_name_, (unsigned long)wait_file_pos_);
753786

754-
wait_result = cond_timewait(&abstime);
787+
/* wait for the position to be ACK'ed back */
788+
assert(entry);
789+
entry->n_waiters++;
790+
wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
791+
entry->n_waiters--;
755792
rpl_semi_sync_master_wait_sessions--;
756793

757794
if (wait_result != 0)
@@ -790,14 +827,12 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
790827
}
791828
}
792829

793-
/*
794-
At this point, the binlog file and position of this transaction
795-
must have been removed from ActiveTranx.
796-
*/
797-
assert(!getMasterEnabled() ||
798-
!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
799-
trx_wait_binlog_pos));
800-
l_end:
830+
/* Last waiter removes the TranxNode */
831+
if (is_on() && active_tranxs_ && entry && entry->n_waiters == 0)
832+
active_tranxs_->clear_active_tranx_nodes(trx_wait_binlog_name,
833+
trx_wait_binlog_pos);
834+
835+
l_end:
801836
/* Update the status counter. */
802837
if (is_on())
803838
rpl_semi_sync_master_yes_transactions++;
@@ -838,15 +873,17 @@ int ReplSemiSyncMaster::switch_off()
838873
function_enter(kWho);
839874
state_ = false;
840875

841-
/* Clear the active transaction list. */
842-
assert(active_tranxs_ != NULL);
843-
result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
844-
845876
rpl_semi_sync_master_off_times++;
846877
wait_file_name_inited_ = false;
847878
reply_file_name_inited_ = false;
848879
sql_print_information("Semi-sync replication switched OFF.");
849-
cond_broadcast(); /* wake up all waiting threads */
880+
881+
/* signal waiting sessions */
882+
active_tranxs_->signal_waiting_sessions_all();
883+
884+
/* Clear the active transaction list. */
885+
assert(active_tranxs_ != NULL);
886+
result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
850887

851888
return function_exit(kWho, result);
852889
}
@@ -1234,6 +1271,7 @@ int ReplSemiSyncMaster::resetMaster()
12341271
rpl_semi_sync_master_trx_wait_time = 0;
12351272
rpl_semi_sync_master_net_wait_num = 0;
12361273
rpl_semi_sync_master_net_wait_time = 0;
1274+
active_tranxs_->clear_active_tranx_nodes(NULL, 0);
12371275

12381276
unlock();
12391277

plugin/semisync/semisync_master.h

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/* Copyright (C) 2007 Google Inc.
2-
Copyright (c) 2008 MySQL AB, 2009 Sun Microsystems, Inc.
3-
Use is subject to license terms.
2+
Copyright (c) 2008, 2015, Oracle and/or its affiliates. All rights reserved.
43
54
This program is free software; you can redistribute it and/or modify
65
it under the terms of the GNU General Public License as published by
@@ -30,7 +29,9 @@ extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
3029

3130
struct TranxNode {
3231
char log_name_[FN_REFLEN];
33-
my_off_t log_pos_;
32+
my_off_t log_pos_;
33+
mysql_cond_t cond;
34+
int n_waiters;
3435
struct TranxNode *next_; /* the next node in the sorted list */
3536
struct TranxNode *hash_next_; /* the next node during hash collision */
3637
};
@@ -128,6 +129,7 @@ class TranxNodeAllocator
128129
trx_node->log_pos_= 0;
129130
trx_node->next_= 0;
130131
trx_node->hash_next_= 0;
132+
trx_node->n_waiters= 0;
131133
return trx_node;
132134
}
133135

@@ -246,6 +248,12 @@ class TranxNodeAllocator
246248
/* New Block is always the current_block */
247249
current_block= block;
248250
++block_num;
251+
252+
for (int i=0; i< BLOCK_TRANX_NODES; i++)
253+
mysql_cond_init(key_ss_cond_COND_binlog_send_,
254+
&current_block->nodes[i].cond,
255+
NULL);
256+
249257
return 0;
250258
}
251259
return 1;
@@ -257,6 +265,8 @@ class TranxNodeAllocator
257265
*/
258266
void free_block(Block *block)
259267
{
268+
for (int i=0; i< BLOCK_TRANX_NODES; i++)
269+
mysql_cond_destroy(&block->nodes[i].cond);
260270
my_free(block);
261271
--block_num;
262272
}
@@ -330,6 +340,11 @@ class ActiveTranx
330340
}
331341

332342
public:
343+
int signal_waiting_sessions_all();
344+
int signal_waiting_sessions_up_to(const char *log_file_name,
345+
my_off_t log_file_pos);
346+
TranxNode* find_active_tranx_node(const char *log_file_name,
347+
my_off_t log_file_pos);
333348
ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
334349
~ActiveTranx();
335350

@@ -376,11 +391,6 @@ class ReplSemiSyncMaster
376391
/* True when initObject has been called */
377392
bool init_done_;
378393

379-
/* This cond variable is signaled when enough binlog has been sent to slave,
380-
* so that a waiting trx can return the 'ok' to the client for a commit.
381-
*/
382-
mysql_cond_t COND_binlog_send_;
383-
384394
/* Mutex that protects the following state variables and the active
385395
* transaction list.
386396
* Under no cirumstances we can acquire mysql_bin_log.LOCK_log if we are
@@ -434,8 +444,6 @@ class ReplSemiSyncMaster
434444

435445
void lock();
436446
void unlock();
437-
void cond_broadcast();
438-
int cond_timewait(struct timespec *wait_time);
439447

440448
/* Is semi-sync replication on? */
441449
bool is_on() {

0 commit comments

Comments
 (0)