#ifndef BINLOG_H_INCLUDED
/* Copyright (c) 2010, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
#define BINLOG_H_INCLUDED
#include "mysqld.h" /* opt_relay_logname */
#include "log_event.h"
#include "log.h"
class Relay_log_info;
class Master_info;
class Format_description_log_event;
#define barrier() __asm__ __volatile__("" ::: "memory")
#define MAX_STAGE_COND 128
#define UNDEF_COND_SLOT -1
/**
Class for maintaining the commit stages for binary log group commit.
*/
class Stage_manager {
public:
class Mutex_queue {
friend class Stage_manager;
public:
Mutex_queue()
: m_first(NULL), m_last_thd(NULL), m_last(&m_first), group_prepared_engine(NULL)
{
}
void init(
#ifdef HAVE_PSI_INTERFACE
PSI_mutex_key key_LOCK_queue
#endif
) {
mysql_mutex_init(key_LOCK_queue, &m_lock, MY_MUTEX_INIT_FAST);
}
void deinit() {
mysql_mutex_destroy(&m_lock);
if (group_prepared_engine)
{
delete group_prepared_engine;
}
}
bool is_empty() const {
return m_first == NULL;
}
/** Append a linked list of threads to the queue */
bool append(THD *first, int *slot);
/**
Fetch the entire queue for a stage.
This will fetch the entire queue in one go.
*/
THD *fetch_and_empty();
private:
void lock() { mysql_mutex_lock(&m_lock); }
void unlock() { mysql_mutex_unlock(&m_lock); }
/**
Pointer to the first thread in the queue, or NULL if the queue is
empty.
*/
THD *m_first;
/* The last thd object of the queue */
THD *m_last_thd;
/**
Pointer to the location holding the end of the queue.
This is either @c &first, or a pointer to the @c next_to_commit of
the last thread that is enqueued.
*/
THD **m_last;
/**
Store the max prepared log for each engine that supports ha_flush_logs.
We have to init group_prepared_engine after all plugins are inited.
*/
engine_lsn_map* group_prepared_engine;
/**
slot inex for allocating m_lock_done/m_cond_done, only changed
at first stage.
*/
int cond_index;
/** Lock for protecting the queue. */
mysql_mutex_t m_lock;
} MY_ATTRIBUTE((aligned(CPU_LEVEL1_DCACHE_LINESIZE)));
public:
Stage_manager()
{
}
~Stage_manager()
{
}
/**
Constants for queues for different stages.
*/
enum StageID {
FLUSH_STAGE,
SYNC_STAGE,
COMMIT_STAGE,
STAGE_COUNTER
};
void init(
#ifdef HAVE_PSI_INTERFACE
PSI_mutex_key key_LOCK_flush_queue,
PSI_mutex_key key_LOCK_sync_queue,
PSI_mutex_key key_LOCK_commit_queue,
PSI_mutex_key key_LOCK_done,
PSI_cond_key key_COND_done
#endif
)
{
for(int i= 0; i< MAX_STAGE_COND; i++)
{
mysql_mutex_init(key_LOCK_done, &m_lock_done[i], MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_done, &m_cond_done[i], NULL);
}
#ifndef DBUG_OFF
mysql_mutex_init(key_LOCK_done, &m_lock_preempt, MY_MUTEX_INIT_FAST);
/* reuse key_COND_done 'cos a new PSI object would be wasteful in DBUG_ON */
mysql_cond_init(key_COND_done, &m_cond_preempt, NULL);
#endif
m_queue[FLUSH_STAGE].init(
#ifdef HAVE_PSI_INTERFACE
key_LOCK_flush_queue
#endif
);
m_queue[SYNC_STAGE].init(
#ifdef HAVE_PSI_INTERFACE
key_LOCK_sync_queue
#endif
);
m_queue[COMMIT_STAGE].init(
#ifdef HAVE_PSI_INTERFACE
key_LOCK_commit_queue
#endif
);
}
void deinit()
{
for (size_t i = 0 ; i < STAGE_COUNTER ; ++i)
m_queue[i].deinit();
for(int i= 0; i< MAX_STAGE_COND; i++)
{
mysql_cond_destroy(&m_cond_done[i]);
mysql_mutex_destroy(&m_lock_done[i]);
}
#ifndef DBUG_OFF
mysql_cond_destroy(&m_cond_preempt);
mysql_mutex_destroy(&m_lock_preempt);
#endif
}
/**
Enroll a set of sessions for a stage.
This will queue the session thread for writing and flushing.
If the thread being queued is assigned as stage leader, it will
return immediately.
If wait_if_follower is true the thread is not the stage leader,
the thread will be wait for the queue to be processed by the
leader before it returns.
In DBUG-ON version the follower marks is preempt status as ready.
@param stage Stage identifier for the queue to append to.
@param first Queue to append.
@param stage_mutex
Pointer to the currently held stage mutex, or NULL if
we're not in a stage.
@retval true Thread is stage leader.
@retval false Thread was not stage leader and processing has been done.
*/
bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex);
#ifndef DBUG_OFF
/**
The method ensures the follower's execution path can be preempted
by the leader's thread.
Preempt status of @c head follower is checked to engange the leader
into waiting when set.
@param head THD* of a follower thread
*/
void clear_preempt_status(THD *head);
#endif
/**
Fetch the entire queue and empty it.
@return Pointer to the first session of the queue.
*/
THD *fetch_queue_for(StageID stage) {
DBUG_PRINT("debug", ("Fetching queue for stage %d", stage));
return m_queue[stage].fetch_and_empty();
}
void signal_done(THD *queue) {
THD* node= queue->prev_to_commit;
THD* prev_node= NULL;
while(node)
{
prev_node= node->prev_to_commit;
barrier();
node->transaction.flags.pending= false;
if (node == queue)
break;
node= prev_node;
}
mutex_enter_slot(queue->stage_cond_id);
cond_signal_slot(queue->stage_cond_id);
mutex_exit_slot(queue->stage_cond_id);
}
void mutex_enter_slot(int slot)
{
mysql_mutex_lock(&(m_lock_done[slot]));
}
void mutex_exit_slot(int slot)
{
mysql_mutex_unlock(&(m_lock_done[slot]));
}
void enter_cond_slot(int slot)
{
struct timespec abstime;
set_timespec(abstime, 1);
mysql_cond_timedwait(&(m_cond_done[slot]), &(m_lock_done[slot]), &abstime);
}
void cond_signal_slot(int slot)
{
mysql_cond_broadcast(&(m_cond_done[slot]));
}
private:
/**
Queues for sessions.
We need two queues:
- Waiting. Threads waiting to be processed
- Committing. Threads waiting to be committed.
*/
Mutex_queue m_queue[STAGE_COUNTER];
/** Condition variable to indicate that the commit was processed */
mysql_cond_t m_cond_done[MAX_STAGE_COND];
/** Mutex used for the condition variable above */
mysql_mutex_t m_lock_done[MAX_STAGE_COND];
#ifndef DBUG_OFF
/** Flag is set by Leader when it starts waiting for follower's all-clear */
bool leader_await_preempt_status;
mysql_mutex_t m_lock_preempt;
/** Condition variable to indicate a follower started waiting for commit */
mysql_cond_t m_cond_preempt;
#endif
};
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
#ifdef HAVE_PSI_INTERFACE
/** The instrumentation key to use for @ LOCK_index. */
PSI_mutex_key m_key_LOCK_index;
PSI_mutex_key m_key_COND_done;
PSI_mutex_key m_key_LOCK_commit_queue;
PSI_mutex_key m_key_LOCK_done;
PSI_mutex_key m_key_LOCK_flush_queue;
PSI_mutex_key m_key_LOCK_sync_queue;
/** The instrumentation key to use for @ LOCK_commit. */
PSI_mutex_key m_key_LOCK_commit;
/** The instrumentation key to use for @ LOCK_sync. */
PSI_mutex_key m_key_LOCK_sync;
/** The instrumentation key to use for @ LOCK_xids. */
PSI_mutex_key m_key_LOCK_xids;
/** The instrumentation key to use for @ update_cond. */
PSI_cond_key m_key_update_cond;
/** The instrumentation key to use for @ prep_xids_cond. */
PSI_cond_key m_key_prep_xids_cond;
/** The instrumentation key to use for opening the log file. */
PSI_file_key m_key_file_log;
/** The instrumentation key to use for opening the log index file. */
PSI_file_key m_key_file_log_index;
#endif
/* POSIX thread objects are inited by init_pthread_objects() */
mysql_mutex_t LOCK_index;
mysql_mutex_t LOCK_commit;
mysql_mutex_t LOCK_sync;
mysql_mutex_t LOCK_xids;
mysql_cond_t update_cond;
ulonglong bytes_written;
IO_CACHE index_file;
char index_file_name[FN_REFLEN];
/*
crash_safe_index_file is temp file used for guaranteeing
index file crash safe when master server restarts.
*/
IO_CACHE crash_safe_index_file;
char crash_safe_index_file_name[FN_REFLEN];
/*
purge_file is a temp file used in purge_logs so that the index file
can be updated before deleting files from disk, yielding better crash
recovery. It is created on demand the first time purge_logs is called
and then reused for subsequent calls. It is cleaned up in cleanup().
*/
IO_CACHE purge_index_file;
char purge_index_file_name[FN_REFLEN];
/*
The max size before rotation (usable only if log_type == LOG_BIN: binary
logs and relay logs).
For a binlog, max_size should be max_binlog_size.
For a relay log, it should be max_relay_log_size if this is non-zero,
max_binlog_size otherwise.
max_size is set in init(), and dynamically changed (when one does SET
GLOBAL MAX_BINLOG_SIZE|MAX_RELAY_LOG_SIZE) by fix_max_binlog_size and
fix_max_relay_log_size).
*/
ulong max_size;
// current file sequence number for load data infile binary logging
uint file_id;
uint open_count; // For replication
int readers_count;
/* pointer to the sync period variable, for binlog this will be
sync_binlog_period, for relay log this will be
sync_relay_log_period
*/
uint *sync_period_ptr;
uint sync_counter;
my_atomic_rwlock_t m_prep_xids_lock;
mysql_cond_t m_prep_xids_cond;
volatile int32 m_prep_xids;
/**
Increment the prepared XID counter.
*/
void inc_prep_xids(THD *thd) {
DBUG_ENTER("MYSQL_BIN_LOG::inc_prep_xids");
my_atomic_rwlock_wrlock(&m_prep_xids_lock);
#ifndef DBUG_OFF
int result= my_atomic_add32(&m_prep_xids, 1);
#else
(void) my_atomic_add32(&m_prep_xids, 1);
#endif
DBUG_PRINT("debug", ("m_prep_xids: %d", result + 1));
my_atomic_rwlock_wrunlock(&m_prep_xids_lock);
thd->transaction.flags.xid_written= true;
DBUG_VOID_RETURN;
}
/**
Decrement the prepared XID counter.
Signal m_prep_xids_cond if the counter reaches zero.
*/
void dec_prep_xids(THD *thd) {
DBUG_ENTER("MYSQL_BIN_LOG::dec_prep_xids");
my_atomic_rwlock_wrlock(&m_prep_xids_lock);
int32 result= my_atomic_add32(&m_prep_xids, -1);
DBUG_PRINT("debug", ("m_prep_xids: %d", result - 1));
my_atomic_rwlock_wrunlock(&m_prep_xids_lock);
thd->transaction.flags.xid_written= false;
/* If the old value was 1, it is zero now. */
if (result == 1)
{
mysql_mutex_lock(&LOCK_xids);
mysql_cond_signal(&m_prep_xids_cond);
mysql_mutex_unlock(&LOCK_xids);
}
DBUG_VOID_RETURN;
}
int32 get_prep_xids() {
my_atomic_rwlock_rdlock(&m_prep_xids_lock);
int32 result= my_atomic_load32(&m_prep_xids);
my_atomic_rwlock_rdunlock(&m_prep_xids_lock);
return result;
}
inline uint get_sync_period()
{
return *sync_period_ptr;
}
int write_to_file(IO_CACHE *cache);
/*
This is used to start writing to a new log file. The difference from
new_file() is locking. new_file_without_locking() does not acquire
LOCK_log.
*/
int new_file_without_locking(Format_description_log_event *extra_description_event);
int new_file_impl(bool need_lock, Format_description_log_event *extra_description_event);
/** Manage the stages in ordered_commit. */
Stage_manager stage_manager;
void do_flush(THD *thd);
public:
using MYSQL_LOG::generate_name;
using MYSQL_LOG::is_open;
/* This is relay log */
bool is_relay_log;
ulong signal_cnt; // update of the counter is checked by heartbeat
uint8 checksum_alg_reset; // to contain a new value when binlog is rotated
/*
Holds the last seen in Relay-Log FD's checksum alg value.
The initial value comes from the slave's local FD that heads
the very first Relay-Log file. In the following the value may change
with each received master's FD_m.
Besides to be used in verification events that IO thread receives
(except the 1st fake Rotate, see @c Master_info:: checksum_alg_before_fd),
the value specifies if/how to compute checksum for slave's local events
and the first fake Rotate (R_f^1) coming from the master.
R_f^1 needs logging checksum-compatibly with the RL's heading FD_s.
Legends for the checksum related comments:
FD - Format-Description event,
R - Rotate event
R_f - the fake Rotate event
E - an arbirary event
The underscore indexes for any event
`_s' indicates the event is generated by Slave
`_m' - by Master
Two special underscore indexes of FD:
FD_q - Format Description event for queuing (relay-logging)
FD_e - Format Description event for executing (relay-logging)
Upper indexes:
E^n - n:th event is a sequence
RL - Relay Log
(A) - checksum algorithm descriptor value
FD.(A) - the value of (A) in FD
*/
uint8 relay_log_checksum_alg;
MYSQL_BIN_LOG(uint *sync_period);
/*
note that there's no destructor ~MYSQL_BIN_LOG() !
The reason is that we don't want it to be automatically called
on exit() - but only during the correct shutdown process
*/
#ifdef HAVE_PSI_INTERFACE
void set_psi_keys(PSI_mutex_key key_LOCK_index,
PSI_mutex_key key_LOCK_commit,
PSI_mutex_key key_LOCK_commit_queue,
PSI_mutex_key key_LOCK_done,
PSI_mutex_key key_LOCK_flush_queue,
PSI_mutex_key key_LOCK_log,
PSI_mutex_key key_LOCK_sync,
PSI_mutex_key key_LOCK_sync_queue,
PSI_mutex_key key_LOCK_xids,
PSI_cond_key key_COND_done,
PSI_cond_key key_update_cond,
PSI_cond_key key_prep_xids_cond,
PSI_file_key key_file_log,
PSI_file_key key_file_log_index)
{
m_key_COND_done= key_COND_done;
m_key_LOCK_commit_queue= key_LOCK_commit_queue;
m_key_LOCK_done= key_LOCK_done;
m_key_LOCK_flush_queue= key_LOCK_flush_queue;
m_key_LOCK_sync_queue= key_LOCK_sync_queue;
m_key_LOCK_index= key_LOCK_index;
m_key_LOCK_log= key_LOCK_log;
m_key_LOCK_commit= key_LOCK_commit;
m_key_LOCK_sync= key_LOCK_sync;
m_key_LOCK_xids= key_LOCK_xids;
m_key_update_cond= key_update_cond;
m_key_prep_xids_cond= key_prep_xids_cond;
m_key_file_log= key_file_log;
m_key_file_log_index= key_file_log_index;
}
#endif
/**
Find the oldest binary log that contains any GTID that
is not in the given gtid set.
@param[out] binlog_file_name, the file name of oldest binary log found
@param[in] gtid_set, the given gtid set
@param[out] first_gtid, the first GTID information from the binary log
file returned at binlog_file_name
@param[out] errmsg, the error message outputted, which is left untouched
if the function returns false
@return false on success, true on error.
*/
bool find_first_log_not_in_gtid_set(char *binlog_file_name,
const Gtid_set *gtid_set,
Gtid *first_gtid,
const char **errmsg);
/**
Reads the set of all GTIDs in the binary log, and the set of all
lost GTIDs in the binary log, and stores each set in respective
argument.
@param gtid_set Will be filled with all GTIDs in this binary log.
@param lost_groups Will be filled with all GTIDs in the
Previous_gtids_log_event of the first binary log that has a
Previous_gtids_log_event.
@param last_gtid Will be filled with the last availble GTID information
in the binary/relay log files.
@param verify_checksum If true, checksums will be checked.
@param need_lock If true, LOCK_log, LOCK_index, and
global_sid_lock->wrlock are acquired; otherwise they are asserted
to be taken already.
@param is_server_starting True if the server is starting.
@return false on success, true on error.
*/
bool init_gtid_sets(Gtid_set *gtid_set, Gtid_set *lost_groups,
Gtid *last_gtid, bool verify_checksum,
bool need_lock, bool is_server_starting= false);
void set_previous_gtid_set(Gtid_set *previous_gtid_set_param)
{
previous_gtid_set= previous_gtid_set_param;
}
private:
Gtid_set* previous_gtid_set;
int open(const char *opt_name) { return open_binlog(opt_name); }
bool change_stage(THD *thd, Stage_manager::StageID stage,
THD* queue, mysql_mutex_t *leave,
mysql_mutex_t *enter);
std::pair