Skip to content

Commit ec83e7c

Browse files
AliSQLAliSQL
authored andcommitted
[Feature] Issue#22 PARTITION LOCK_DONE AND LOCK_COND
Description: ------------ Partition m_lock_done and m_lock_cond for group commit, each group commit leader signal their owner follower in every group commit stage.
1 parent 2645293 commit ec83e7c

3 files changed

Lines changed: 140 additions & 22 deletions

File tree

sql/binlog.cc

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,7 +1374,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
13741374

13751375

13761376
bool
1377-
Stage_manager::Mutex_queue::append(THD *first)
1377+
Stage_manager::Mutex_queue::append(THD *first, int *slot)
13781378
{
13791379
DBUG_ENTER("Stage_manager::Mutex_queue::append");
13801380
lock();
@@ -1393,6 +1393,33 @@ Stage_manager::Mutex_queue::append(THD *first)
13931393

13941394
bool empty= (m_first == NULL);
13951395
*m_last= first;
1396+
1397+
if (empty)
1398+
{
1399+
DBUG_ASSERT(m_first == first);
1400+
if (first->stage_cond_id == UNDEF_COND_SLOT)
1401+
{
1402+
if (unlikely(cond_index < 0))
1403+
{
1404+
/* adjust to zero */
1405+
cond_index= 0;
1406+
}
1407+
1408+
first->stage_cond_id= ((cond_index++)%MAX_STAGE_COND);
1409+
}
1410+
}
1411+
else
1412+
{
1413+
first->prev_to_commit= m_last_thd;
1414+
}
1415+
1416+
DBUG_ASSERT(m_first &&
1417+
(m_first->stage_cond_id != UNDEF_COND_SLOT));
1418+
1419+
/* The follower thread will always wait for the leader of
1420+
current stage. */
1421+
*slot= m_first->stage_cond_id;
1422+
13961423
DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
13971424
(ulonglong) m_first, (ulonglong) &m_first,
13981425
(ulonglong) m_last));
@@ -1404,6 +1431,9 @@ Stage_manager::Mutex_queue::append(THD *first)
14041431
while (first->next_to_commit)
14051432
first= first->next_to_commit;
14061433
m_last= &first->next_to_commit;
1434+
1435+
m_last_thd= first;
1436+
14071437
DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
14081438
(ulonglong) m_first, (ulonglong) &m_first,
14091439
(ulonglong) m_last));
@@ -1419,15 +1449,19 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
14191449
// If the queue was empty: we're the leader for this batch
14201450
DBUG_PRINT("debug", ("Enqueue 0x%llx to queue for stage %d",
14211451
(ulonglong) thd, stage));
1422-
bool leader= m_queue[stage].append(thd);
1452+
int slot= UNDEF_COND_SLOT;
1453+
bool leader= m_queue[stage].append(thd, &slot);
14231454

1455+
DBUG_ASSERT(slot != UNDEF_COND_SLOT);
14241456
/*
14251457
The stage mutex can be NULL if we are enrolling for the first
14261458
stage.
14271459
*/
14281460
if (stage_mutex)
14291461
mysql_mutex_unlock(stage_mutex);
14301462

1463+
if (leader)
1464+
thd->stage_leader= true;
14311465
/*
14321466
If the queue was not empty, we're a follower and wait for the
14331467
leader to process the queue. If we were holding a mutex, we have
@@ -1436,8 +1470,8 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
14361470
if (!leader)
14371471
{
14381472
DEBUG_SYNC(thd, "wait_as_follower");
1439-
mysql_mutex_lock(&m_lock_done);
14401473
#ifndef DBUG_OFF
1474+
mysql_mutex_lock(&m_lock_preempt);
14411475
/*
14421476
Leader can be awaiting all-clear to preempt follower's execution.
14431477
With setting the status the follower ensures it won't execute anything
@@ -1446,10 +1480,19 @@ Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
14461480
thd->transaction.flags.ready_preempt= 1;
14471481
if (leader_await_preempt_status)
14481482
mysql_cond_signal(&m_cond_preempt);
1483+
mysql_mutex_unlock(&m_lock_preempt);
14491484
#endif
1485+
mutex_enter_slot(slot);
14501486
while (thd->transaction.flags.pending)
1451-
mysql_cond_wait(&m_cond_done, &m_lock_done);
1452-
mysql_mutex_unlock(&m_lock_done);
1487+
enter_cond_slot(slot);
1488+
mutex_exit_slot(slot);
1489+
1490+
if (thd->stage_leader)
1491+
{
1492+
mutex_enter_slot(thd->stage_cond_id);
1493+
cond_signal_slot(thd->stage_cond_id);
1494+
mutex_exit_slot(thd->stage_cond_id);
1495+
}
14531496
}
14541497
return leader;
14551498
}
@@ -1463,7 +1506,10 @@ THD *Stage_manager::Mutex_queue::fetch_and_empty()
14631506
(ulonglong) m_first, (ulonglong) &m_first,
14641507
(ulonglong) m_last));
14651508
THD *result= m_first;
1509+
result->prev_to_commit= m_last_thd;
1510+
14661511
m_first= NULL;
1512+
m_last_thd= NULL;
14671513
m_last= &m_first;
14681514
DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
14691515
(ulonglong) m_first, (ulonglong) &m_first,
@@ -1488,14 +1534,14 @@ void Stage_manager::clear_preempt_status(THD *head)
14881534
{
14891535
DBUG_ASSERT(head);
14901536

1491-
mysql_mutex_lock(&m_lock_done);
1537+
mysql_mutex_lock(&m_lock_preempt);
14921538
while(!head->transaction.flags.ready_preempt)
14931539
{
14941540
leader_await_preempt_status= true;
1495-
mysql_cond_wait(&m_cond_preempt, &m_lock_done);
1541+
mysql_cond_wait(&m_cond_preempt, &m_lock_preempt);
14961542
}
14971543
leader_await_preempt_status= false;
1498-
mysql_mutex_unlock(&m_lock_done);
1544+
mysql_mutex_unlock(&m_lock_preempt);
14991545
}
15001546
#endif
15011547

@@ -7114,6 +7160,9 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
71147160
thd->transaction.flags.xid_written= false;
71157161
thd->transaction.flags.commit_low= !skip_commit;
71167162
thd->transaction.flags.run_hooks= !skip_commit;
7163+
thd->stage_leader= false;
7164+
thd->stage_cond_id= UNDEF_COND_SLOT;
7165+
thd->prev_to_commit= NULL;
71177166
#ifndef DBUG_OFF
71187167
/*
71197168
The group commit Leader may have to wait for follower whose transaction

sql/binlog.h

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class Master_info;
2525

2626
class Format_description_log_event;
2727

28+
#define barrier() __asm__ __volatile__("" ::: "memory")
29+
#define MAX_STAGE_COND 128
30+
#define UNDEF_COND_SLOT -1
31+
2832
/**
2933
Class for maintaining the commit stages for binary log group commit.
3034
*/
@@ -34,7 +38,7 @@ class Stage_manager {
3438
friend class Stage_manager;
3539
public:
3640
Mutex_queue()
37-
: m_first(NULL), m_last(&m_first), group_prepared_engine(NULL)
41+
: m_first(NULL), m_last(&m_first), m_last_thd(NULL), group_prepared_engine(NULL)
3842
{
3943
}
4044

@@ -59,7 +63,7 @@ class Stage_manager {
5963
}
6064

6165
/** Append a linked list of threads to the queue */
62-
bool append(THD *first);
66+
bool append(THD *first, int *slot);
6367

6468
/**
6569
Fetch the entire queue for a stage.
@@ -78,6 +82,8 @@ class Stage_manager {
7882
*/
7983
THD *m_first;
8084

85+
/* The last thd object of the queue */
86+
THD *m_last_thd;
8187
/**
8288
Pointer to the location holding the end of the queue.
8389
@@ -92,6 +98,12 @@ class Stage_manager {
9298
*/
9399
engine_lsn_map* group_prepared_engine;
94100

101+
/**
102+
slot inex for allocating m_lock_done/m_cond_done, only changed
103+
at first stage.
104+
*/
105+
int cond_index;
106+
95107
/** Lock for protecting the queue. */
96108
mysql_mutex_t m_lock;
97109
} MY_ATTRIBUTE((aligned(CPU_LEVEL1_DCACHE_LINESIZE)));
@@ -125,9 +137,13 @@ class Stage_manager {
125137
#endif
126138
)
127139
{
128-
mysql_mutex_init(key_LOCK_done, &m_lock_done, MY_MUTEX_INIT_FAST);
129-
mysql_cond_init(key_COND_done, &m_cond_done, NULL);
140+
for(int i= 0; i< MAX_STAGE_COND; i++)
141+
{
142+
mysql_mutex_init(key_LOCK_done, &m_lock_done[i], MY_MUTEX_INIT_FAST);
143+
mysql_cond_init(key_COND_done, &m_cond_done[i], NULL);
144+
}
130145
#ifndef DBUG_OFF
146+
mysql_mutex_init(key_LOCK_done, &m_lock_preempt, MY_MUTEX_INIT_FAST);
131147
/* reuse key_COND_done 'cos a new PSI object would be wasteful in DBUG_ON */
132148
mysql_cond_init(key_COND_done, &m_cond_preempt, NULL);
133149
#endif
@@ -152,8 +168,16 @@ class Stage_manager {
152168
{
153169
for (size_t i = 0 ; i < STAGE_COUNTER ; ++i)
154170
m_queue[i].deinit();
155-
mysql_cond_destroy(&m_cond_done);
156-
mysql_mutex_destroy(&m_lock_done);
171+
172+
for(int i= 0; i< MAX_STAGE_COND; i++)
173+
{
174+
mysql_cond_destroy(&m_cond_done[i]);
175+
mysql_mutex_destroy(&m_lock_done[i]);
176+
}
177+
#ifndef DBUG_OFF
178+
mysql_cond_destroy(&m_cond_preempt);
179+
mysql_mutex_destroy(&m_lock_preempt);
180+
#endif
157181
}
158182

159183
/**
@@ -203,11 +227,47 @@ class Stage_manager {
203227
}
204228

205229
void signal_done(THD *queue) {
206-
mysql_mutex_lock(&m_lock_done);
207-
for (THD *thd= queue ; thd ; thd = thd->next_to_commit)
208-
thd->transaction.flags.pending= false;
209-
mysql_mutex_unlock(&m_lock_done);
210-
mysql_cond_broadcast(&m_cond_done);
230+
THD* node= queue->prev_to_commit;
231+
THD* prev_node= NULL;
232+
233+
while(node)
234+
{
235+
prev_node= node->prev_to_commit;
236+
barrier();
237+
node->transaction.flags.pending= false;
238+
239+
if (node == queue)
240+
break;
241+
242+
node= prev_node;
243+
}
244+
245+
mutex_enter_slot(queue->stage_cond_id);
246+
cond_signal_slot(queue->stage_cond_id);
247+
mutex_exit_slot(queue->stage_cond_id);
248+
}
249+
250+
void mutex_enter_slot(int slot)
251+
{
252+
mysql_mutex_lock(&(m_lock_done[slot]));
253+
}
254+
255+
void mutex_exit_slot(int slot)
256+
{
257+
mysql_mutex_unlock(&(m_lock_done[slot]));
258+
}
259+
260+
void enter_cond_slot(int slot)
261+
{
262+
struct timespec abstime;
263+
set_timespec(abstime, 1);
264+
265+
mysql_cond_timedwait(&(m_cond_done[slot]), &(m_lock_done[slot]), &abstime);
266+
}
267+
268+
void cond_signal_slot(int slot)
269+
{
270+
mysql_cond_broadcast(&(m_cond_done[slot]));
211271
}
212272

213273
private:
@@ -221,14 +281,15 @@ class Stage_manager {
221281
Mutex_queue m_queue[STAGE_COUNTER];
222282

223283
/** Condition variable to indicate that the commit was processed */
224-
mysql_cond_t m_cond_done;
225-
284+
mysql_cond_t m_cond_done[MAX_STAGE_COND];
226285
/** Mutex used for the condition variable above */
227-
mysql_mutex_t m_lock_done;
286+
mysql_mutex_t m_lock_done[MAX_STAGE_COND];
287+
228288
#ifndef DBUG_OFF
229289
/** Flag is set by Leader when it starts waiting for follower's all-clear */
230290
bool leader_await_preempt_status;
231291

292+
mysql_mutex_t m_lock_preempt;
232293
/** Condition variable to indicate a follower started waiting for commit */
233294
mysql_cond_t m_cond_preempt;
234295
#endif

sql/sql_class.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2944,6 +2944,14 @@ class THD :public MDL_context_owner,
29442944
*/
29452945
THD *next_to_commit;
29462946

2947+
/* The previous node of commit queue for binary log group commit */
2948+
THD *prev_to_commit;
2949+
2950+
/*stage leader of group commit if true*/
2951+
bool stage_leader;
2952+
/*if this thread is a leader, then allocate a mysql_cond_t for it*/
2953+
int stage_cond_id;
2954+
29472955
/**
29482956
Functions to set and get transaction position.
29492957

0 commit comments

Comments
 (0)