/* Copyright (c) 2009, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
#include "my_global.h"
#include "log.h"
#include "binlog.h"
#include "log_event.h"
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "sql_plugin.h"
#include "rpl_handler.h"
#include "rpl_info_factory.h"
#include "rpl_utility.h"
#include "debug_sync.h"
#include "global_threads.h"
#include "sql_show.h"
#include "sql_parse.h"
#include "rpl_mi.h"
#include
#include
#include
using std::max;
using std::min;
using std::string;
using std::list;
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
/**
@defgroup Binary_Log Binary Log
@{
*/
#define MY_OFF_T_UNDEF (~(my_off_t)0UL)
/*
Constants required for the limit unsafe warnings suppression
*/
//seconds after which the limit unsafe warnings suppression will be activated
#define LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT 50
//number of limit unsafe warnings after which the suppression will be activated
#define LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT 50
#define MAX_SESSION_ATTACH_TRIES 10
static ulonglong limit_unsafe_suppression_start_time= 0;
static bool unsafe_warning_suppression_is_activated= false;
static int limit_unsafe_warning_count= 0;
static handlerton *binlog_hton;
bool opt_binlog_order_commits= true;
bool opt_gtid_precommit= false;
const char *log_bin_index= 0;
const char *log_bin_basename= 0;
MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period);
static int binlog_init(void *p);
static int binlog_start_trans_and_stmt(THD *thd, Log_event *start_event);
static int binlog_close_connection(handlerton *hton, THD *thd);
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv);
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
THD *thd);
static int binlog_commit(handlerton *hton, THD *thd, bool all);
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
/**
Helper class to hold a mutex for the duration of the
block.
Eliminates the need for explicit unlocking of mutexes on, e.g.,
error returns. On passing a null pointer, the sentry will not do
anything.
*/
class Mutex_sentry
{
public:
Mutex_sentry(mysql_mutex_t *mutex)
: m_mutex(mutex)
{
if (m_mutex)
mysql_mutex_lock(mutex);
}
~Mutex_sentry()
{
if (m_mutex)
mysql_mutex_unlock(m_mutex);
#ifndef DBUG_OFF
m_mutex= 0;
#endif
}
private:
mysql_mutex_t *m_mutex;
// It's not allowed to copy this object in any way
Mutex_sentry(Mutex_sentry const&);
void operator=(Mutex_sentry const&);
};
/**
Print system time.
*/
static void print_system_time()
{
#ifdef __WIN__
SYSTEMTIME utc_time;
GetSystemTime(&utc_time);
const long hrs= utc_time.wHour;
const long mins= utc_time.wMinute;
const long secs= utc_time.wSecond;
#else
/* Using time() instead of my_time() to avoid looping */
const time_t curr_time= time(NULL);
/* Calculate time of day */
const long tmins = curr_time / 60;
const long thrs = tmins / 60;
const long hrs = thrs % 24;
const long mins = tmins % 60;
const long secs = curr_time % 60;
#endif
char hrs_buf[3]= "00";
char mins_buf[3]= "00";
char secs_buf[3]= "00";
int base= 10;
my_safe_itoa(base, hrs, &hrs_buf[2]);
my_safe_itoa(base, mins, &mins_buf[2]);
my_safe_itoa(base, secs, &secs_buf[2]);
my_safe_printf_stderr("---------- %s:%s:%s UTC - ",
hrs_buf, mins_buf, secs_buf);
}
/**
Helper class to perform a thread excursion.
This class is used to temporarily switch to another session (THD
structure). It will set up thread specific "globals" correctly
so that the POSIX thread looks exactly like the session attached to.
However, PSI_thread info is not touched as it is required to show
the actual physial view in PFS instrumentation i.e., it should
depict as the real thread doing the work instead of thread it switched
to.
On destruction, the original session (which is supplied to the
constructor) will be re-attached automatically. For example, with
this code, the value of @c current_thd will be the same before and
after execution of the code.
@code
{
Thread_excursion excursion(current_thd);
for (int i = 0 ; i < count ; ++i)
excursion.attach_to(other_thd[i]);
}
@endcode
@warning The class is not designed to be inherited from.
*/
class Thread_excursion
{
public:
Thread_excursion(THD *thd)
: m_original_thd(thd)
{
}
~Thread_excursion() {
#ifndef EMBEDDED_LIBRARY
if (unlikely(setup_thread_globals(m_original_thd)))
DBUG_ASSERT(0); // Out of memory?!
#endif
}
/**
Try to attach the POSIX thread to a session.
- This function attaches the POSIX thread to a session
in MAX_SESSION_ATTACH_TRIES tries when encountering
'out of memory' error, and terminates the server after
failed in MAX_SESSION_ATTACH_TRIES tries.
@param[in] thd The thd of a session
*/
void try_to_attach_to(THD *thd)
{
int i= 0;
/*
Attach the POSIX thread to a session in MAX_SESSION_ATTACH_TRIES
tries when encountering 'out of memory' error.
*/
while (i < MAX_SESSION_ATTACH_TRIES)
{
/*
Currently attach_to(...) returns ER_OUTOFMEMORY or 0. So
we continue to attach the POSIX thread when encountering
the ER_OUTOFMEMORY error. Please take care other error
returned from attach_to(...) in future.
*/
if (!attach_to(thd))
{
if (i > 0)
sql_print_warning("Server overcomes the temporary 'out of memory' "
"in '%d' tries while attaching to session thread "
"during the group commit phase.\n", i + 1);
break;
}
i++;
}
/*
Terminate the server after failed to attach the POSIX thread
to a session in MAX_SESSION_ATTACH_TRIES tries.
*/
if (MAX_SESSION_ATTACH_TRIES == i)
{
print_system_time();
my_safe_printf_stderr("%s", "[Fatal] Out of memory while attaching to "
"session thread during the group commit phase. "
"Data consistency between master and slave can "
"be guaranteed after server restarts.\n");
_exit(EXIT_FAILURE);
}
}
private:
/**
Attach the POSIX thread to a session.
*/
int attach_to(THD *thd)
{
#ifndef EMBEDDED_LIBRARY
if (DBUG_EVALUATE_IF("simulate_session_attach_error", 1, 0)
|| unlikely(setup_thread_globals(thd)))
{
/*
Indirectly uses pthread_setspecific, which can only return
ENOMEM or EINVAL. Since store_globals are using correct keys,
the only alternative is out of memory.
*/
return ER_OUTOFMEMORY;
}
#endif /* EMBEDDED_LIBRARY */
return 0;
}
int setup_thread_globals(THD *thd) const {
int error= 0;
THD *original_thd= my_pthread_getspecific(THD*, THR_THD);
MEM_ROOT* original_mem_root= my_pthread_getspecific(MEM_ROOT*, THR_MALLOC);
if ((error= my_pthread_setspecific_ptr(THR_THD, thd)))
goto exit0;
if ((error= my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root)))
goto exit1;
if ((error= set_mysys_var(thd->mysys_var)))
goto exit2;
goto exit0;
exit2:
error= my_pthread_setspecific_ptr(THR_MALLOC, original_mem_root);
exit1:
error= my_pthread_setspecific_ptr(THR_THD, original_thd);
exit0:
return error;
}
THD *m_original_thd;
};
/**
Caches for non-transactional and transactional data before writing
it to the binary log.
@todo All the access functions for the flags suggest that the
encapsuling is not done correctly, so try to move any logic that
requires access to the flags into the cache.
*/
class binlog_cache_data
{
public:
binlog_cache_data(bool trx_cache_arg,
my_off_t max_binlog_cache_size_arg,
ulong *ptr_binlog_cache_use_arg,
ulong *ptr_binlog_cache_disk_use_arg)
: m_pending(0), saved_max_binlog_cache_size(max_binlog_cache_size_arg),
ptr_binlog_cache_use(ptr_binlog_cache_use_arg),
ptr_binlog_cache_disk_use(ptr_binlog_cache_disk_use_arg)
{
reset();
flags.transactional= trx_cache_arg;
cache_log.end_of_file= saved_max_binlog_cache_size;
}
int finalize(THD *thd, Log_event *end_event);
int flush(THD *thd, my_off_t *bytes, bool *wrote_xid);
int write_event(THD *thd, Log_event *event);
virtual ~binlog_cache_data()
{
DBUG_ASSERT(is_binlog_empty());
close_cached_file(&cache_log);
}
bool is_binlog_empty() const
{
my_off_t pos= my_b_tell(&cache_log);
DBUG_PRINT("debug", ("%s_cache - pending: 0x%llx, bytes: %llu",
(flags.transactional ? "trx" : "stmt"),
(ulonglong) pending(), (ulonglong) pos));
return pending() == NULL && pos == 0;
}
bool is_group_cache_empty() const
{
return group_cache.is_empty();
}
bool is_finalized() const {
return flags.finalized;
}
Rows_log_event *pending() const
{
return m_pending;
}
void set_pending(Rows_log_event *const pending)
{
m_pending= pending;
}
void set_incident(void)
{
flags.incident= true;
}
bool has_incident(void) const
{
return flags.incident;
}
bool has_xid() const {
// There should only be an XID event if we are transactional
DBUG_ASSERT((flags.transactional && flags.with_xid) || !flags.with_xid);
return flags.with_xid;
}
bool is_trx_cache() const
{
return flags.transactional;
}
my_off_t get_byte_position() const
{
return my_b_tell(&cache_log);
}
virtual void reset()
{
compute_statistics();
truncate(0);
/*
If IOCACHE has a file associated, change its size to 0.
It is safer to do it here, since we are certain that one
asked the cache to go to position 0 with truncate.
*/
if(cache_log.file != -1)
{
int error= 0;
if((error= my_chsize(cache_log.file, 0, 0, MYF(MY_WME))))
sql_print_warning("Unable to resize binlog IOCACHE auxilary file");
DBUG_EXECUTE_IF("show_io_cache_size",
{
ulong file_size= my_seek(cache_log.file,
0L,MY_SEEK_END,MYF(MY_WME+MY_FAE));
sql_print_error("New size:%ld", file_size);
});
}
flags.incident= false;
flags.with_xid= false;
flags.immediate= false;
flags.finalized= false;
/*
The truncate function calls reinit_io_cache that calls my_b_flush_io_cache
which may increase disk_writes. This breaks the disk_writes use by the
binary log which aims to compute the ratio between in-memory cache usage
and disk cache usage. To avoid this undesirable behavior, we reset the
variable after truncating the cache.
*/
cache_log.disk_writes= 0;
group_cache.clear();
DBUG_ASSERT(is_binlog_empty());
}
/*
Sets the write position to point at the position given. If the
cache has swapped to a file, it reinitializes it, so that the
proper data is added to the IO_CACHE buffer. Otherwise, it just
does a my_b_seek.
my_b_seek will not work if the cache has swapped, that's why
we do this workaround.
@param[IN] pos the new write position.
@param[IN] use_reinit if the position should be reset resorting
to reset_io_cache (which may issue a flush_io_cache
inside)
@return The previous write position.
*/
my_off_t reset_write_pos(my_off_t pos, bool use_reinit)
{
DBUG_ENTER("reset_write_pos");
DBUG_ASSERT(cache_log.type == WRITE_CACHE);
my_off_t oldpos= get_byte_position();
if (use_reinit)
reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0);
else
my_b_seek(&cache_log, pos);
DBUG_RETURN(oldpos);
}
/*
Cache to store data before copying it to the binary log.
*/
IO_CACHE cache_log;
/**
The group cache for this cache.
*/
Group_cache group_cache;
protected:
/*
It truncates the cache to a certain position. This includes deleting the
pending event.
*/
void truncate(my_off_t pos)
{
DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos));
remove_pending_event();
reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0);
cache_log.end_of_file= saved_max_binlog_cache_size;
}
/**
Flush pending event to the cache buffer.
*/
int flush_pending_event(THD *thd) {
if (m_pending)
{
m_pending->set_flags(Rows_log_event::STMT_END_F);
if (int error= write_event(thd, m_pending))
return error;
thd->clear_binlog_table_maps();
}
return 0;
}
/**
Remove the pending event.
*/
int remove_pending_event() {
delete m_pending;
m_pending= NULL;
return 0;
}
struct Flags {
/*
Defines if this is either a trx-cache or stmt-cache, respectively, a
transactional or non-transactional cache.
*/
bool transactional:1;
/*
This indicates that some events did not get into the cache and most likely
it is corrupted.
*/
bool incident:1;
/*
This indicates that the cache should be written without BEGIN/END.
*/
bool immediate:1;
/*
This flag indicates that the buffer was finalized and has to be
flushed to disk.
*/
bool finalized:1;
/*
This indicates that the cache contain an XID event.
*/
bool with_xid:1;
} flags;
private:
/*
Pending binrows event. This event is the event where the rows are currently
written.
*/
Rows_log_event *m_pending;
/**
This function computes binlog cache and disk usage.
*/
void compute_statistics()
{
if (!is_binlog_empty())
{
statistic_increment(*ptr_binlog_cache_use, &LOCK_status);
if (cache_log.disk_writes != 0)
statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status);
}
}
/*
Stores the values of maximum size of the cache allowed when this cache
is configured. This corresponds to either
. max_binlog_cache_size or max_binlog_stmt_cache_size.
*/
my_off_t saved_max_binlog_cache_size;
/*
Stores a pointer to the status variable that keeps track of the in-memory
cache usage. This corresponds to either
. binlog_cache_use or binlog_stmt_cache_use.
*/
ulong *ptr_binlog_cache_use;
/*
Stores a pointer to the status variable that keeps track of the disk
cache usage. This corresponds to either
. binlog_cache_disk_use or binlog_stmt_cache_disk_use.
*/
ulong *ptr_binlog_cache_disk_use;
binlog_cache_data& operator=(const binlog_cache_data& info);
binlog_cache_data(const binlog_cache_data& info);
};
class binlog_stmt_cache_data
: public binlog_cache_data
{
public:
binlog_stmt_cache_data(bool trx_cache_arg,
my_off_t max_binlog_cache_size_arg,
ulong *ptr_binlog_cache_use_arg,
ulong *ptr_binlog_cache_disk_use_arg)
: binlog_cache_data(trx_cache_arg,
max_binlog_cache_size_arg,
ptr_binlog_cache_use_arg,
ptr_binlog_cache_disk_use_arg)
{
}
using binlog_cache_data::finalize;
int finalize(THD *thd);
};
int
binlog_stmt_cache_data::finalize(THD *thd)
{
if (flags.immediate)
{
if (int error= finalize(thd, NULL))
return error;
}
else
{
Query_log_event
end_evt(thd, STRING_WITH_LEN("COMMIT"), false, false, true, 0, true);
if (int error= finalize(thd, &end_evt))
return error;
}
return 0;
}
class binlog_trx_cache_data : public binlog_cache_data
{
public:
binlog_trx_cache_data(bool trx_cache_arg,
my_off_t max_binlog_cache_size_arg,
ulong *ptr_binlog_cache_use_arg,
ulong *ptr_binlog_cache_disk_use_arg)
: binlog_cache_data(trx_cache_arg,
max_binlog_cache_size_arg,
ptr_binlog_cache_use_arg,
ptr_binlog_cache_disk_use_arg),
m_cannot_rollback(FALSE), before_stmt_pos(MY_OFF_T_UNDEF)
{ }
void reset()
{
DBUG_ENTER("reset");
DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
m_cannot_rollback= FALSE;
before_stmt_pos= MY_OFF_T_UNDEF;
binlog_cache_data::reset();
DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
DBUG_VOID_RETURN;
}
bool cannot_rollback() const
{
return m_cannot_rollback;
}
void set_cannot_rollback()
{
m_cannot_rollback= TRUE;
}
my_off_t get_prev_position() const
{
return before_stmt_pos;
}
void set_prev_position(my_off_t pos)
{
DBUG_ENTER("set_prev_position");
DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
before_stmt_pos= pos;
DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
DBUG_VOID_RETURN;
}
void restore_prev_position()
{
DBUG_ENTER("restore_prev_position");
DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
binlog_cache_data::truncate(before_stmt_pos);
before_stmt_pos= MY_OFF_T_UNDEF;
DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
DBUG_VOID_RETURN;
}
void restore_savepoint(my_off_t pos)
{
DBUG_ENTER("restore_savepoint");
DBUG_PRINT("enter", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
binlog_cache_data::truncate(pos);
if (pos <= before_stmt_pos)
before_stmt_pos= MY_OFF_T_UNDEF;
DBUG_PRINT("return", ("before_stmt_pos: %llu", (ulonglong) before_stmt_pos));
DBUG_VOID_RETURN;
}
using binlog_cache_data::truncate;
int truncate(THD *thd, bool all);
private:
/*
It will be set TRUE if any statement which cannot be rolled back safely
is put in trx_cache.
*/
bool m_cannot_rollback;
/*
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
binlog_trx_cache_data& operator=(const binlog_trx_cache_data& info);
binlog_trx_cache_data(const binlog_trx_cache_data& info);
};
class binlog_cache_mngr {
public:
binlog_cache_mngr(my_off_t max_binlog_stmt_cache_size_arg,
ulong *ptr_binlog_stmt_cache_use_arg,
ulong *ptr_binlog_stmt_cache_disk_use_arg,
my_off_t max_binlog_cache_size_arg,
ulong *ptr_binlog_cache_use_arg,
ulong *ptr_binlog_cache_disk_use_arg)
: stmt_cache(FALSE, max_binlog_stmt_cache_size_arg,
ptr_binlog_stmt_cache_use_arg,
ptr_binlog_stmt_cache_disk_use_arg),
trx_cache(TRUE, max_binlog_cache_size_arg,
ptr_binlog_cache_use_arg,
ptr_binlog_cache_disk_use_arg)
{ }
binlog_cache_data* get_binlog_cache_data(bool is_transactional)
{
if (is_transactional)
return &trx_cache;
else
return &stmt_cache;
}
IO_CACHE* get_binlog_cache_log(bool is_transactional)
{
return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log);
}
/**
Convenience method to check if both caches are empty.
*/
bool is_binlog_empty() const {
return stmt_cache.is_binlog_empty() && trx_cache.is_binlog_empty();
}
/*
clear stmt_cache and trx_cache if they are not empty
*/
void reset()
{
if (!stmt_cache.is_binlog_empty())
stmt_cache.reset();
if (!trx_cache.is_binlog_empty())
trx_cache.reset();
}
#ifndef DBUG_OFF
bool dbug_any_finalized() const {
return stmt_cache.is_finalized() || trx_cache.is_finalized();
}
#endif
bool all_finalized() const {
return (stmt_cache.is_finalized() && trx_cache.is_finalized());
}
/*
Convenience method to flush both caches to the binary log.
@param bytes_written Pointer to variable that will be set to the
number of bytes written for the flush.
@param wrote_xid Pointer to variable that will be set to @c
true if any XID event was written to the
binary log. Otherwise, the variable will not
be touched.
@return Error code on error, zero if no error.
*/
int flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
my_off_t stmt_bytes= 0;
my_off_t trx_bytes= 0;
bool clear_gtid= false;
DBUG_ASSERT(stmt_cache.has_xid() == 0);
if (gtid_mode && all_finalized())
{
DBUG_ASSERT(thd->gtid_precommit);
DBUG_ASSERT(thd->enable_unsafe_stmt);
/**
stmt_cache and trx_cache are both not empty. They will generate
Gtid seperately. So we need to clear the owned gtid after flushing
stmt_cache to avoid some ASSERT later.
*/
clear_gtid= true;
}
if (int error= stmt_cache.flush(thd, &stmt_bytes, wrote_xid))
return error;
if (clear_gtid)
{
thd->clear_owned_gtids();
thd->variables.gtid_next.set_undefined();
}
if (int error= trx_cache.flush(thd, &trx_bytes, wrote_xid))
return error;
*bytes_written= stmt_bytes + trx_bytes;
return 0;
}
binlog_stmt_cache_data stmt_cache;
binlog_trx_cache_data trx_cache;
private:
binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
binlog_cache_mngr(const binlog_cache_mngr& info);
};
static binlog_cache_mngr *thd_get_cache_mngr(const THD *thd)
{
/*
If opt_bin_log is not set, binlog_hton->slot == -1 and hence
thd_get_ha_data(thd, hton) segfaults.
*/
DBUG_ASSERT(opt_bin_log);
return (binlog_cache_mngr *)thd_get_ha_data(thd, binlog_hton);
}
/**
Checks if the BINLOG_CACHE_SIZE's value is greater than MAX_BINLOG_CACHE_SIZE.
If this happens, the BINLOG_CACHE_SIZE is set to MAX_BINLOG_CACHE_SIZE.
*/
void check_binlog_cache_size(THD *thd)
{
if (binlog_cache_size > max_binlog_cache_size)
{
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX,
ER(ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX),
(ulong) binlog_cache_size,
(ulong) max_binlog_cache_size);
}
else
{
sql_print_warning(ER_DEFAULT(ER_BINLOG_CACHE_SIZE_GREATER_THAN_MAX),
(ulong) binlog_cache_size,
(ulong) max_binlog_cache_size);
}
binlog_cache_size= max_binlog_cache_size;
}
}
/**
Checks if the BINLOG_STMT_CACHE_SIZE's value is greater than MAX_BINLOG_STMT_CACHE_SIZE.
If this happens, the BINLOG_STMT_CACHE_SIZE is set to MAX_BINLOG_STMT_CACHE_SIZE.
*/
void check_binlog_stmt_cache_size(THD *thd)
{
if (binlog_stmt_cache_size > max_binlog_stmt_cache_size)
{
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX,
ER(ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX),
(ulong) binlog_stmt_cache_size,
(ulong) max_binlog_stmt_cache_size);
}
else
{
sql_print_warning(ER_DEFAULT(ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX),
(ulong) binlog_stmt_cache_size,
(ulong) max_binlog_stmt_cache_size);
}
binlog_stmt_cache_size= max_binlog_stmt_cache_size;
}
}
/**
Check whether binlog_hton has valid slot and enabled
*/
bool binlog_enabled()
{
return(binlog_hton && binlog_hton->slot != HA_SLOT_UNDEF);
}
/*
Save position of binary log transaction cache.
SYNPOSIS
binlog_trans_log_savepos()
thd The thread to take the binlog data from
pos Pointer to variable where the position will be stored
DESCRIPTION
Save the current position in the binary log transaction cache into
the variable pointed to by 'pos'
*/
static void
binlog_trans_log_savepos(THD *thd, my_off_t *pos)
{
DBUG_ENTER("binlog_trans_log_savepos");
DBUG_ASSERT(pos != NULL);
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
DBUG_ASSERT(mysql_bin_log.is_open());
*pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("position: %lu", (ulong) *pos));
DBUG_VOID_RETURN;
}
/*
this function is mostly a placeholder.
conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open)
should be moved here.
*/
static int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
binlog_hton->db_type=DB_TYPE_BINLOG;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->close_connection= binlog_close_connection;
binlog_hton->savepoint_set= binlog_savepoint_set;
binlog_hton->savepoint_rollback= binlog_savepoint_rollback;
binlog_hton->savepoint_rollback_can_release_mdl=
binlog_savepoint_rollback_can_release_mdl;
binlog_hton->commit= binlog_commit;
binlog_hton->rollback= binlog_rollback;
binlog_hton->prepare= binlog_prepare;
binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
return 0;
}
static int binlog_close_connection(handlerton *hton, THD *thd)
{
DBUG_ENTER("binlog_close_connection");
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
DBUG_ASSERT(cache_mngr->is_binlog_empty());
DBUG_ASSERT(cache_mngr->trx_cache.is_group_cache_empty() &&
cache_mngr->stmt_cache.is_group_cache_empty());
DBUG_PRINT("debug", ("Set ha_data slot %d to 0x%llx", binlog_hton->slot, (ulonglong) NULL));
thd_set_ha_data(thd, binlog_hton, NULL);
cache_mngr->~binlog_cache_mngr();
my_free(cache_mngr);
DBUG_RETURN(0);
}
int binlog_cache_data::write_event(THD *thd, Log_event *ev)
{
DBUG_ENTER("binlog_cache_data::write_event");
if (gtid_mode > 0)
{
Group_cache::enum_add_group_status status=
group_cache.add_logged_group(thd, get_byte_position());
if (status == Group_cache::ERROR)
DBUG_RETURN(1);
else if (status == Group_cache::APPEND_NEW_GROUP)
{
Gtid_log_event gtid_ev(thd, is_trx_cache());
if (gtid_ev.write(&cache_log) != 0)
DBUG_RETURN(1);
}
}
if (ev != NULL)
{
DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
{DBUG_SET("+d,simulate_file_write_error");});
if (ev->write(&cache_log) != 0)
{
DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
{
DBUG_SET("-d,simulate_file_write_error");
DBUG_SET("-d,simulate_disk_full_at_flush_pending");
/*
after +d,simulate_file_write_error the local cache
is in unsane state. Since -d,simulate_file_write_error
revokes the first simulation do_write_cache()
can't be run without facing an assert.
So it's blocked with the following 2nd simulation:
*/
DBUG_SET("+d,simulate_do_write_cache_failure");
});
DBUG_RETURN(1);
}
if (ev->get_type_code() == XID_EVENT)
flags.with_xid= true;
if (ev->is_using_immediate_logging())
flags.immediate= true;
}
DBUG_RETURN(0);
}
/**
Checks if the given GTID exists in the Group_cache. If not, add it
as an empty group.
@todo Move this function into the cache class?
@param thd THD object that owns the Group_cache
@param cache_data binlog_cache_data object for the cache
@param gtid GTID to check
*/
static int write_one_empty_group_to_cache(THD *thd,
binlog_cache_data *cache_data,
Gtid gtid)
{
DBUG_ENTER("write_one_empty_group_to_cache");
Group_cache *group_cache= &cache_data->group_cache;
if (group_cache->contains_gtid(gtid))
DBUG_RETURN(0);
/*
Apparently this code is not being called. We need to
investigate if this is a bug or this code is not
necessary. /Alfranio
Empty groups are currently being handled in the function
gtid_empty_group_log_and_cleanup().
*/
DBUG_ASSERT(0); /*NOTREACHED*/
#ifdef NON_ERROR_GTID
IO_CACHE *cache= &cache_data->cache_log;
Group_cache::enum_add_group_status status= group_cache->add_empty_group(gtid);
if (status == Group_cache::ERROR)
DBUG_RETURN(1);
DBUG_ASSERT(status == Group_cache::APPEND_NEW_GROUP);
Gtid_specification spec= { GTID_GROUP, gtid };
Gtid_log_event gtid_ev(thd, cache_data->is_trx_cache(), &spec);
if (gtid_ev.write(cache) != 0)
DBUG_RETURN(1);
#endif
DBUG_RETURN(0);
}
/**
Writes all GTIDs that the thread owns to the stmt/trx cache, if the
GTID is not already in the cache.
@todo Move this function into the cache class?
@param thd THD object for the thread that owns the cache.
@param cache_data The cache.
*/
static int write_empty_groups_to_cache(THD *thd, binlog_cache_data *cache_data)
{
DBUG_ENTER("write_empty_groups_to_cache");
if (thd->owned_gtid.sidno == -1)
{
#ifdef HAVE_GTID_NEXT_LIST
Gtid_set::Gtid_iterator git(&thd->owned_gtid_set);
Gtid gtid= git.get();
while (gtid.sidno != 0)
{
if (write_one_empty_group_to_cache(thd, cache_data, gtid) != 0)
DBUG_RETURN(1);
git.next();
gtid= git.get();
}
#else
DBUG_ASSERT(0);
#endif
}
else if (thd->owned_gtid.sidno > 0)
if (write_one_empty_group_to_cache(thd, cache_data, thd->owned_gtid) != 0)
DBUG_RETURN(1);
DBUG_RETURN(0);
}
/**
@todo Move this function into the cache class?
*/
static int
gtid_before_write_cache(THD* thd, binlog_cache_data* cache_data)
{
DBUG_ENTER("gtid_before_write_cache");
int error= 0;
DBUG_ASSERT(thd->variables.gtid_next.type != UNDEFINED_GROUP);
if (gtid_mode == 0)
DBUG_RETURN(0);
Group_cache* group_cache= &cache_data->group_cache;
global_sid_lock->rdlock();
if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
{
if (group_cache->generate_automatic_gno(thd) !=
RETURN_STATUS_OK)
{
global_sid_lock->unlock();
DBUG_RETURN(1);
}
}
if (write_empty_groups_to_cache(thd, cache_data) != 0)
{
global_sid_lock->unlock();
DBUG_RETURN(1);
}
global_sid_lock->unlock();
/*
If an automatic group number was generated, change the first event
into a "real" one.
*/
if (thd->variables.gtid_next.type == AUTOMATIC_GROUP)
{
DBUG_ASSERT(group_cache->get_n_groups() == 1);
Cached_group *cached_group= group_cache->get_unsafe_pointer(0);
DBUG_ASSERT(cached_group->spec.type != AUTOMATIC_GROUP);
Gtid_log_event gtid_ev(thd, cache_data->is_trx_cache(),
&cached_group->spec);
bool using_file= cache_data->cache_log.pos_in_file > 0;
my_off_t saved_position= cache_data->reset_write_pos(0, using_file);
error= gtid_ev.write(&cache_data->cache_log);
cache_data->reset_write_pos(saved_position, using_file);
}
DBUG_RETURN(error);
}
/**
The function logs an empty group with GTID and performs cleanup.
Its logic wrt GTID is equivalent to one of binlog_commit().
It's called at the end of statement execution in case binlog_commit()
was skipped.
Such cases are due ineffective binlogging incl an empty group
re-execution.
@param thd The thread handle
@return
nonzero if an error pops up.
*/
int gtid_empty_group_log_and_cleanup(THD *thd)
{
int ret= 1;
binlog_cache_data* cache_data= NULL;
DBUG_ENTER("gtid_empty_group_log_and_cleanup");
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE,
FALSE, TRUE, 0, TRUE);
DBUG_ASSERT(!qinfo.is_using_immediate_logging());
/*
thd->cache_mngr is uninitialized on the first empty transaction.
*/
if (thd->binlog_setup_trx_data())
DBUG_RETURN(1);
cache_data= &thd_get_cache_mngr(thd)->trx_cache;
DBUG_PRINT("debug", ("Writing to trx_cache"));
if (cache_data->write_event(thd, &qinfo) ||
gtid_before_write_cache(thd, cache_data))
goto err;
ret= mysql_bin_log.commit(thd, true);
err:
DBUG_RETURN(ret);
}
/**
This function finalizes the cache preparing for commit or rollback.
The function just writes all the necessary events to the cache but
does not flush the data to the binary log file. That is the role of
the binlog_cache_data::flush function.
@see binlog_cache_data::flush
@param thd The thread whose transaction should be flushed
@param cache_data Pointer to the cache
@param end_ev The end event either commit/rollback
@return
nonzero if an error pops up when flushing the cache.
*/
int
binlog_cache_data::finalize(THD *thd, Log_event *end_event)
{
DBUG_ENTER("binlog_cache_data::finalize");
if (!is_binlog_empty())
{
DBUG_ASSERT(!flags.finalized);
if (int error= flush_pending_event(thd))
DBUG_RETURN(error);
if (int error= write_event(thd, end_event))
DBUG_RETURN(error);
flags.finalized= true;
DBUG_PRINT("debug", ("flags.finalized: %s", YESNO(flags.finalized)));
}
DBUG_RETURN(0);
}
/**
Flush caches to the binary log.
If the cache is finalized, the cache will be flushed to the binary
log file. If the cache is not finalized, nothing will be done.
If flushing fails for any reason, an error will be reported and the
cache will be reset. Flushing can fail in two circumstances:
- It was not possible to write the cache to the file. In this case,
it does not make sense to keep the cache.
- The cache was successfully written to disk but post-flush actions
(such as binary log rotation) failed. In this case, the cache is
already written to disk and there is no reason to keep it.
@see binlog_cache_data::finalize
*/
int
binlog_cache_data::flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid)
{
/*
Doing a commit or a rollback including non-transactional tables,
i.e., ending a transaction where we might write the transaction
cache to the binary log.
We can always end the statement when ending a transaction since
transactions are not allowed inside stored functions. If they
were, we would have to ensure that we're not ending a statement
inside a stored function.
*/
DBUG_ENTER("binlog_cache_data::flush");
DBUG_PRINT("debug", ("flags.finalized: %s", YESNO(flags.finalized)));
int error= 0;
if (flags.finalized)
{
my_off_t bytes_in_cache= my_b_tell(&cache_log);
DBUG_PRINT("debug", ("bytes_in_cache: %llu", bytes_in_cache));
/*
The cache is always reset since subsequent rollbacks of the
transactions might trigger attempts to write to the binary log
if the cache is not reset.
*/
if (!(error= gtid_before_write_cache(thd, this)))
error= mysql_bin_log.write_cache(thd, this);
else
thd->commit_error= THD::CE_FLUSH_ERROR;
if (flags.with_xid && error == 0)
*wrote_xid= true;
/*
Reset have to be after the if above, since it clears the
with_xid flag
*/
reset();
if (bytes_written)
*bytes_written= bytes_in_cache;
}
DBUG_ASSERT(!flags.finalized);
DBUG_RETURN(error);
}
/**
This function truncates the transactional cache upon committing or rolling
back either a transaction or a statement.
@param thd The thread whose transaction should be flushed
@param cache_mngr Pointer to the cache data to be flushed
@param all @c true means truncate the transaction, otherwise the
statement must be truncated.
@return
nonzero if an error pops up when truncating the transactional cache.
*/
int
binlog_trx_cache_data::truncate(THD *thd, bool all)
{
DBUG_ENTER("binlog_trx_cache_data::truncate");
int error=0;
DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s",
FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
all ? "all" : "stmt"));
remove_pending_event();
/*
If rolling back an entire transaction or a single statement not
inside a transaction, we reset the transaction cache.
*/
if (ending_trans(thd, all))
{
if (has_incident())
error= mysql_bin_log.write_incident(thd, true/*need_lock_log=true*/);
reset();
}
/*
If rolling back a statement in a transaction, we truncate the
transaction cache to remove the statement.
*/
else if (get_prev_position() != MY_OFF_T_UNDEF)
{
restore_prev_position();
if (is_binlog_empty())
{
/*
After restoring the previous position, we need to check if
the cache is empty. In such case, the group cache needs to
be cleaned up too because the GTID is removed too from the
cache.
So if any change happens again, the GTID must be rewritten
and this will not happen if the group cache is not cleaned
up.
After integrating this with NDB, we need to check if the
current approach is enough or the group cache needs to
explicitly support rollback to savepoints.
*/
group_cache.clear();
}
}
thd->clear_binlog_table_maps();
DBUG_RETURN(error);
}
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
/*
do nothing.
just pretend we can do 2pc, so that MySQL won't
switch to 1pc.
real work will be done in MYSQL_BIN_LOG::commit()
*/
return 0;
}
/**
This function is called once after each statement.
@todo This function is currently not used any more and will
eventually be eliminated. The real commit job is done in the
MYSQL_BIN_LOG::commit function.
@see MYSQL_BIN_LOG::commit
@param hton The binlog handlerton.
@param thd The client thread that executes the transaction.
@param all This is @c true if this is a real transaction commit, and
@false otherwise.
@see handlerton::commit
*/
static int binlog_commit(handlerton *hton, THD *thd, bool all)
{
DBUG_ENTER("binlog_commit");
/*
Nothing to do (any more) on commit.
*/
DBUG_RETURN(0);
}
/**
This function is called when a transaction or a statement is rolled back.
@internal It is necessary to execute a rollback here if the
transaction was rolled back because of executing a ROLLBACK TO
SAVEPOINT command, but it is not used for normal rollback since
MYSQL_BIN_LOG::rollback is called in that case.
@todo Refactor code to introduce a MYSQL_BIN_LOG::rollback(THD
*thd, SAVEPOINT *sv) function in @c TC_LOG and have that
function execute the necessary work to rollback to a savepoint.
@param hton The binlog handlerton.
@param thd The client thread that executes the transaction.
@param all This is @c true if this is a real transaction rollback, and
@false otherwise.
@see handlerton::rollback
*/
static int binlog_rollback(handlerton *hton, THD *thd, bool all)
{
DBUG_ENTER("binlog_rollback");
int error= 0;
if (thd->lex->sql_command == SQLCOM_ROLLBACK_TO_SAVEPOINT)
error= mysql_bin_log.rollback(thd, all);
DBUG_RETURN(error);
}
bool
Stage_manager::Mutex_queue::append(THD *first, int *slot)
{
DBUG_ENTER("Stage_manager::Mutex_queue::append");
lock();
DBUG_PRINT("enter", ("first: 0x%llx", (ulonglong) first));
DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
(ulonglong) m_first, (ulonglong) &m_first,
(ulonglong) m_last));
DBUG_ASSERT(first->prepared_engine != NULL);
if (unlikely(!group_prepared_engine))
group_prepared_engine= new engine_lsn_map();
if (!first->prepared_engine->is_empty())
group_prepared_engine->compare_and_update(
first->prepared_engine->get_maps());
bool empty= (m_first == NULL);
*m_last= first;
if (empty)
{
DBUG_ASSERT(m_first == first);
if (first->stage_cond_id == UNDEF_COND_SLOT)
{
if (unlikely(cond_index < 0))
{
/* adjust to zero */
cond_index= 0;
}
first->stage_cond_id= ((cond_index++)%MAX_STAGE_COND);
}
}
else
{
first->prev_to_commit= m_last_thd;
}
DBUG_ASSERT(m_first &&
(m_first->stage_cond_id != UNDEF_COND_SLOT));
/* The follower thread will always wait for the leader of
current stage. */
*slot= m_first->stage_cond_id;
DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
(ulonglong) m_first, (ulonglong) &m_first,
(ulonglong) m_last));
/*
Go to the last THD instance of the list. We expect lists to be
moderately short. If they are not, we need to track the end of
the queue as well.
*/
while (first->next_to_commit)
first= first->next_to_commit;
m_last= &first->next_to_commit;
m_last_thd= first;
DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
(ulonglong) m_first, (ulonglong) &m_first,
(ulonglong) m_last));
DBUG_ASSERT(m_first || m_last == &m_first);
DBUG_PRINT("return", ("empty: %s", YESNO(empty)));
unlock();
DBUG_RETURN(empty);
}
bool
Stage_manager::enroll_for(StageID stage, THD *thd, mysql_mutex_t *stage_mutex)
{
// If the queue was empty: we're the leader for this batch
DBUG_PRINT("debug", ("Enqueue 0x%llx to queue for stage %d",
(ulonglong) thd, stage));
int slot= UNDEF_COND_SLOT;
bool leader= m_queue[stage].append(thd, &slot);
DBUG_ASSERT(slot != UNDEF_COND_SLOT);
/*
The stage mutex can be NULL if we are enrolling for the first
stage.
*/
if (stage_mutex)
mysql_mutex_unlock(stage_mutex);
if (leader)
thd->stage_leader= true;
/*
If the queue was not empty, we're a follower and wait for the
leader to process the queue. If we were holding a mutex, we have
to release it before going to sleep.
*/
if (!leader)
{
DEBUG_SYNC(thd, "wait_as_follower");
#ifndef DBUG_OFF
mysql_mutex_lock(&m_lock_preempt);
/*
Leader can be awaiting all-clear to preempt follower's execution.
With setting the status the follower ensures it won't execute anything
including thread-specific code.
*/
thd->transaction.flags.ready_preempt= 1;
if (leader_await_preempt_status)
mysql_cond_signal(&m_cond_preempt);
mysql_mutex_unlock(&m_lock_preempt);
#endif
mutex_enter_slot(slot);
while (thd->transaction.flags.pending)
enter_cond_slot(slot);
mutex_exit_slot(slot);
if (thd->stage_leader)
{
mutex_enter_slot(thd->stage_cond_id);
cond_signal_slot(thd->stage_cond_id);
mutex_exit_slot(thd->stage_cond_id);
}
}
return leader;
}
THD *Stage_manager::Mutex_queue::fetch_and_empty()
{
DBUG_ENTER("Stage_manager::Mutex_queue::fetch_and_empty");
lock();
DBUG_PRINT("enter", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
(ulonglong) m_first, (ulonglong) &m_first,
(ulonglong) m_last));
THD *result= m_first;
result->prev_to_commit= m_last_thd;
m_first= NULL;
m_last_thd= NULL;
m_last= &m_first;
DBUG_PRINT("info", ("m_first: 0x%llx, &m_first: 0x%llx, m_last: 0x%llx",
(ulonglong) m_first, (ulonglong) &m_first,
(ulonglong) m_last));
DBUG_ASSERT(m_first || m_last == &m_first);
DBUG_PRINT("return", ("result: 0x%llx", (ulonglong) result));
/* Restore group_prepared_engine so the caller can extract it from result. */
if (!group_prepared_engine->is_empty())
{
result->prepared_engine->compare_and_update(group_prepared_engine->get_maps());
/* Reset group_prepared_engine when the queue is empty. */
group_prepared_engine->clear();
}
unlock();
DBUG_RETURN(result);
}
#ifndef DBUG_OFF
void Stage_manager::clear_preempt_status(THD *head)
{
DBUG_ASSERT(head);
mysql_mutex_lock(&m_lock_preempt);
while(!head->transaction.flags.ready_preempt)
{
leader_await_preempt_status= true;
mysql_cond_wait(&m_cond_preempt, &m_lock_preempt);
}
leader_await_preempt_status= false;
mysql_mutex_unlock(&m_lock_preempt);
}
#endif
/**
Write a rollback record of the transaction to the binary log.
For binary log group commit, the rollback is separated into three
parts:
1. First part consists of filling the necessary caches and
finalizing them (if they need to be finalized). After a cache is
finalized, nothing can be added to the cache.
2. Second part execute an ordered flush and commit. This will be
done using the group commit functionality in @c ordered_commit.
Since we roll back the transaction early, we call @c
ordered_commit with the @c skip_commit flag set. The @c
ha_commit_low call inside @c ordered_commit will then not be
called.
3. Third part checks any errors resulting from the flush and handles
them appropriately.
@see MYSQL_BIN_LOG::ordered_commit
@see ha_commit_low
@see ha_rollback_low
@param thd Session to commit
@param all This is @c true if this is a real transaction rollback, and
@false otherwise.
@return Error code, or zero if there were no error.
*/
int MYSQL_BIN_LOG::rollback(THD *thd, bool all)
{
int error= 0;
bool stuff_logged= false;
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
DBUG_ENTER("MYSQL_BIN_LOG::rollback(THD *thd, bool all)");
DBUG_PRINT("enter", ("all: %s, cache_mngr: 0x%llx, thd->is_error: %s",
YESNO(all), (ulonglong) cache_mngr, YESNO(thd->is_error())));
/*
We roll back the transaction in the engines early since this will
release locks and allow other transactions to start executing.
If we are executing a ROLLBACK TO SAVEPOINT, we should only clear
the caches since this function is called as part of the engine
rollback.
*/
if (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT)
if ((error= ha_rollback_low(thd, all)))
goto end;
/*
If there is no cache manager, or if there is nothing in the
caches, there are no caches to roll back, so we're trivially done.
*/
if (cache_mngr == NULL || cache_mngr->is_binlog_empty())
goto end;
DBUG_PRINT("debug",
("all.cannot_safely_rollback(): %s, trx_cache_empty: %s",
YESNO(thd->transaction.all.cannot_safely_rollback()),
YESNO(cache_mngr->trx_cache.is_binlog_empty())));
DBUG_PRINT("debug",
("stmt.cannot_safely_rollback(): %s, stmt_cache_empty: %s",
YESNO(thd->transaction.stmt.cannot_safely_rollback()),
YESNO(cache_mngr->stmt_cache.is_binlog_empty())));
/*
If an incident event is set we do not flush the content of the statement
cache because it may be corrupted.
*/
if (cache_mngr->stmt_cache.has_incident())
{
error= write_incident(thd, true/*need_lock_log=true*/);
cache_mngr->stmt_cache.reset();
}
else if (!cache_mngr->stmt_cache.is_binlog_empty())
{
if ((error= cache_mngr->stmt_cache.finalize(thd)))
goto end;
stuff_logged= true;
}
if (ending_trans(thd, all))
{
if (trans_cannot_safely_rollback(thd))
{
/*
If the transaction is being rolled back and contains changes that
cannot be rolled back, the trx-cache's content is flushed.
*/
Query_log_event
end_evt(thd, STRING_WITH_LEN("ROLLBACK"), true, false, true, 0, true);
error= cache_mngr->trx_cache.finalize(thd, &end_evt);
stuff_logged= true;
}
else
{
/*
If the transaction is being rolled back and its changes can be
rolled back, the trx-cache's content is truncated.
*/
error= cache_mngr->trx_cache.truncate(thd, all);
/*
If this is a CREATE TABLE ..SEELCT statement, we also need
to truncate and reset the stmt cache.
*/
if (!cache_mngr->stmt_cache.is_binlog_empty() && // stmt_cache is not empty
thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
!(thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
thd->lex->select_lex.item_list.elements)
{
cache_mngr->stmt_cache.reset();
stuff_logged= false;
}
}
}
else
{
/*
If a statement is being rolled back, it is necessary to know
exactly why a statement may not be safely rolled back as in
some specific situations the trx-cache can be truncated.
If a temporary table is created or dropped, the trx-cache is not
truncated. Note that if the stmt-cache is used, there is nothing
to truncate in the trx-cache.
If a non-transactional table is updated and the binlog format is
statement, the trx-cache is not truncated. The trx-cache is used
when the direct option is off and a transactional table has been
updated before the current statement in the context of the
current transaction. Note that if the stmt-cache is used there is
nothing to truncate in the trx-cache.
If other binlog formats are used, updates to non-transactional
tables are written to the stmt-cache and trx-cache can be safely
truncated, if necessary.
*/
if (thd->transaction.stmt.has_dropped_temp_table() ||
thd->transaction.stmt.has_created_temp_table() ||
(thd->transaction.stmt.has_modified_non_trans_table() &&
thd->variables.binlog_format == BINLOG_FORMAT_STMT))
{
/*
If the statement is being rolled back and dropped or created a
temporary table or modified a non-transactional table and the
statement-based replication is in use, the statement's changes
in the trx-cache are preserved.
*/
cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
}
else
{
/*
Otherwise, the statement's changes in the trx-cache are
truncated.
*/
error= cache_mngr->trx_cache.truncate(thd, all);
}
}
DBUG_PRINT("debug", ("error: %d", error));
if (error == 0 && stuff_logged)
error= ordered_commit(thd, all, /* skip_commit */ true);
if (check_write_error(thd))
{
/*
"all == true" means that a "rollback statement" triggered the error and
this function was called. However, this must not happen as a rollback
is written directly to the binary log. And in auto-commit mode, a single
statement that is rolled back has the flag all == false.
*/
DBUG_ASSERT(!all);
/*
We reach this point if the effect of a statement did not properly get into
a cache and need to be rolled back.
*/
error |= cache_mngr->trx_cache.truncate(thd, all);
}
end:
/*
When a statement errors out on auto-commit mode it is rollback
implicitly, so the same should happen to its GTID.
*/
if (!thd->in_active_multi_stmt_transaction())
gtid_rollback(thd);
DBUG_PRINT("return", ("error: %d", error));
DBUG_RETURN(error);
}
/**
@note
How do we handle this (unlikely but legal) case:
@verbatim
[transaction] + [update to non-trans table] + [rollback to savepoint] ?
@endverbatim
The problem occurs when a savepoint is before the update to the
non-transactional table. Then when there's a rollback to the savepoint, if we
simply truncate the binlog cache, we lose the part of the binlog cache where
the update is. If we want to not lose it, we need to write the SAVEPOINT
command and the ROLLBACK TO SAVEPOINT command to the binlog cache. The latter
is easy: it's just write at the end of the binlog cache, but the former
should be *inserted* to the place where the user called SAVEPOINT. The
solution is that when the user calls SAVEPOINT, we write it to the binlog
cache (so no need to later insert it). As transactions are never intermixed
in the binary log (i.e. they are serialized), we won't have conflicts with
savepoint names when using mysqlbinlog or in the slave SQL thread.
Then when ROLLBACK TO SAVEPOINT is called, if we updated some
non-transactional table, we don't truncate the binlog cache but instead write
ROLLBACK TO SAVEPOINT to it; otherwise we truncate the binlog cache (which
will chop the SAVEPOINT command from the binlog cache, which is good as in
that case there is no need to have it in the binlog).
*/
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
{
DBUG_ENTER("binlog_savepoint_set");
int error= 1;
String log_query;
if (log_query.append(STRING_WITH_LEN("SAVEPOINT ")))
DBUG_RETURN(error);
else
append_identifier(thd, &log_query, thd->lex->ident.str,
thd->lex->ident.length);
int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
TRUE, FALSE, TRUE, errcode);
/*
We cannot record the position before writing the statement
because a rollback to a savepoint (.e.g. consider it "S") would
prevent the savepoint statement (i.e. "SAVEPOINT S") from being
written to the binary log despite the fact that the server could
still issue other rollback statements to the same savepoint (i.e.
"S").
Given that the savepoint is valid until the server releases it,
ie, until the transaction commits or it is released explicitly,
we need to log it anyway so that we don't have "ROLLBACK TO S"
or "RELEASE S" without the preceding "SAVEPOINT S" in the binary
log.
*/
if (!(error= mysql_bin_log.write_event(&qinfo)))
binlog_trans_log_savepos(thd, (my_off_t*) sv);
DBUG_RETURN(error);
}
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
{
DBUG_ENTER("binlog_savepoint_rollback");
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
my_off_t pos= *(my_off_t*) sv;
DBUG_ASSERT(pos != ~(my_off_t) 0);
/*
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
if (trans_cannot_safely_rollback(thd))
{
String log_query;
if (log_query.append(STRING_WITH_LEN("ROLLBACK TO ")) ||
log_query.append("`") ||
log_query.append(thd->lex->ident.str, thd->lex->ident.length) ||
log_query.append("`"))
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write_event(&qinfo));
}
// Otherwise, we truncate the cache
cache_mngr->trx_cache.restore_savepoint(pos);
/*
When a SAVEPOINT is executed inside a stored function/trigger we force the
pending event to be flushed with a STMT_END_F flag and clear the table maps
as well to ensure that following DMLs will have a clean state to start
with. ROLLBACK inside a stored routine has to finalize possibly existing
current row-based pending event with cleaning up table maps. That ensures
that following DMLs will have a clean state to start with.
*/
if (thd->in_sub_stmt)
thd->clear_binlog_table_maps();
if (cache_mngr->trx_cache.is_binlog_empty())
cache_mngr->trx_cache.group_cache.clear();
DBUG_RETURN(0);
}
/**
Check whether binlog state allows to safely release MDL locks after
rollback to savepoint.
@param hton The binlog handlerton.
@param thd The client thread that executes the transaction.
@return true - It is safe to release MDL locks.
false - If it is not.
*/
static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
THD *thd)
{
DBUG_ENTER("binlog_savepoint_rollback_can_release_mdl");
/*
If we have not updated any non-transactional tables rollback
to savepoint will simply truncate binlog cache starting from
SAVEPOINT command. So it should be safe to release MDL acquired
after SAVEPOINT command in this case.
*/
DBUG_RETURN(!trans_cannot_safely_rollback(thd));
}
#ifdef HAVE_REPLICATION
/*
Adjust the position pointer in the binary log file for all running slaves
SYNOPSIS
adjust_linfo_offsets()
purge_offset Number of bytes removed from start of log index file
NOTES
- This is called when doing a PURGE when we delete lines from the
index log file
REQUIREMENTS
- Before calling this function, we have to ensure that no threads are
using any binary log file before purge_offset.a
TODO
- Inform the slave threads that they should sync the position
in the binary log file with flush_relay_log_info.
Now they sync is done for next read.
*/
static void adjust_linfo_offsets(my_off_t purge_offset)
{
mysql_mutex_lock(&LOCK_thread_count);
Thread_iterator it= global_thread_list_begin();
Thread_iterator end= global_thread_list_end();
for (; it != end; ++it)
{
LOG_INFO* linfo;
if ((linfo = (*it)->current_linfo))
{
mysql_mutex_lock(&linfo->lock);
/*
Index file offset can be less that purge offset only if
we just started reading the index file. In that case
we have nothing to adjust
*/
if (linfo->index_file_offset < purge_offset)
linfo->fatal = (linfo->index_file_offset != 0);
else
linfo->index_file_offset -= purge_offset;
mysql_mutex_unlock(&linfo->lock);
}
}
mysql_mutex_unlock(&LOCK_thread_count);
}
static int log_in_use(const char* log_name)
{
size_t log_name_len = strlen(log_name) + 1;
int thread_count=0;
#ifndef DBUG_OFF
if (current_thd)
DEBUG_SYNC(current_thd,"purge_logs_after_lock_index_before_thread_count");
#endif
mysql_mutex_lock(&LOCK_thread_count);
Thread_iterator it= global_thread_list_begin();
Thread_iterator end= global_thread_list_end();
for (; it != end; ++it)
{
LOG_INFO* linfo;
if ((linfo = (*it)->current_linfo))
{
mysql_mutex_lock(&linfo->lock);
if(!memcmp(log_name, linfo->log_file_name, log_name_len))
{
thread_count++;
sql_print_warning("file %s was not purged because it was being read"
"by thread number %llu", log_name,
(ulonglong)(*it)->thread_id);
}
mysql_mutex_unlock(&linfo->lock);
}
}
mysql_mutex_unlock(&LOCK_thread_count);
return thread_count;
}
static bool purge_error_message(THD* thd, int res)
{
uint errcode;
if ((errcode= purge_log_get_error_code(res)) != 0)
{
my_message(errcode, ER(errcode), MYF(0));
return TRUE;
}
my_ok(thd);
return FALSE;
}
#endif /* HAVE_REPLICATION */
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
char magic[4];
DBUG_ASSERT(my_b_tell(log) == 0);
if (my_b_read(log, (uchar*) magic, sizeof(magic)))
{
*errmsg = "I/O error reading the header from the binary log";
sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
log->error);
return 1;
}
if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
{
*errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
return 1;
}
return 0;
}
File open_binlog_file(IO_CACHE *log, const char *log_file_name, const char **errmsg)
{
File file;
DBUG_ENTER("open_binlog_file");
if ((file= mysql_file_open(key_file_binlog,
log_file_name, O_RDONLY | O_BINARY | O_SHARE,
MYF(MY_WME))) < 0)
{
sql_print_error("Failed to open log (file '%s', errno %d)",
log_file_name, my_errno);
*errmsg = "Could not open log file";
goto err;
}
if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
MYF(MY_WME|MY_DONT_CHECK_FILESIZE)))
{
sql_print_error("Failed to create a cache on log (file '%s')",
log_file_name);
*errmsg = "Could not open log file";
goto err;
}
if (check_binlog_magic(log,errmsg))
goto err;
DBUG_RETURN(file);
err:
if (file >= 0)
{
mysql_file_close(file, MYF(0));
end_io_cache(log);
}
DBUG_RETURN(-1);
}
/**
This function checks if a transactional table was updated by the
current transaction.
@param thd The client thread that executed the current statement.
@return
@c true if a transactional table was updated, @c false otherwise.
*/
bool
trans_has_updated_trans_table(const THD* thd)
{
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
return (cache_mngr ? !cache_mngr->trx_cache.is_binlog_empty() : 0);
}
/**
This function checks if a transactional table was updated by the
current statement.
@param ha_list Registered storage engine handler list.
@return
@c true if a transactional table was updated, @c false otherwise.
*/
bool
stmt_has_updated_trans_table(Ha_trx_info* ha_list)
{
Ha_trx_info *ha_info;
for (ha_info= ha_list; ha_info; ha_info= ha_info->next())
{
if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
return (TRUE);
}
return (FALSE);
}
/**
This function checks if a transaction, either a multi-statement
or a single statement transaction is about to commit or not.
@param thd The client thread that executed the current statement.
@param all Committing a transaction (i.e. TRUE) or a statement
(i.e. FALSE).
@return
@c true if committing a transaction, otherwise @c false.
*/
bool ending_trans(THD* thd, const bool all)
{
return (all || ending_single_stmt_trans(thd, all));
}
/**
This function checks if a single statement transaction is about
to commit or not.
@param thd The client thread that executed the current statement.
@param all Committing a transaction (i.e. TRUE) or a statement
(i.e. FALSE).
@return
@c true if committing a single statement transaction, otherwise
@c false.
*/
bool ending_single_stmt_trans(THD* thd, const bool all)
{
return (!all && !thd->in_multi_stmt_transaction_mode());
}
/**
This function checks if a transaction cannot be rolled back safely.
@param thd The client thread that executed the current statement.
@return
@c true if cannot be safely rolled back, @c false otherwise.
*/
bool trans_cannot_safely_rollback(const THD* thd)
{
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
return cache_mngr->trx_cache.cannot_rollback();
}
/**
This function checks if current statement cannot be rollded back safely.
@param thd The client thread that executed the current statement.
@return
@c true if cannot be safely rolled back, @c false otherwise.
*/
bool stmt_cannot_safely_rollback(const THD* thd)
{
return thd->transaction.stmt.cannot_safely_rollback();
}
#ifndef EMBEDDED_LIBRARY
/**
Execute a PURGE BINARY LOGS TO command.
@param thd Pointer to THD object for the client thread executing the
statement.
@param to_log Name of the last log to purge.
@retval FALSE success
@retval TRUE failure
*/
bool purge_master_logs(THD* thd, const char* to_log)
{
char search_file_name[FN_REFLEN];
if (!mysql_bin_log.is_open())
{
my_ok(thd);
return FALSE;
}
mysql_bin_log.make_log_name(search_file_name, to_log);
return purge_error_message(thd,
mysql_bin_log.purge_logs(search_file_name, false,
true/*need_lock_index=true*/,
true/*need_update_threads=true*/,
NULL, false));
}
/**
Execute a PURGE BINARY LOGS BEFORE command.
@param thd Pointer to THD object for the client thread executing the
statement.
@param purge_time Date before which logs should be purged.
@retval FALSE success
@retval TRUE failure
*/
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
{
if (!mysql_bin_log.is_open())
{
my_ok(thd);
return 0;
}
return purge_error_message(thd,
mysql_bin_log.purge_logs_before_date(purge_time,
false));
}
#endif /* EMBEDDED_LIBRARY */
/*
Helper function to get the error code of the query to be binlogged.
*/
int query_error_code(THD *thd, bool not_killed)
{
int error;
if (not_killed || (thd->killed == THD::KILL_BAD_DATA))
{
error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
/* thd->get_stmt_da()->sql_errno() might be ER_SERVER_SHUTDOWN or
ER_QUERY_INTERRUPTED, So here we need to make sure that error
is not set to these errors when specified not_killed by the
caller.
*/
if (error == ER_SERVER_SHUTDOWN || error == ER_QUERY_INTERRUPTED)
error= 0;
}
else
{
/* killed status for DELAYED INSERT thread should never be used */
DBUG_ASSERT(!(thd->system_thread & SYSTEM_THREAD_DELAYED_INSERT));
error= thd->killed_errno();
}
return error;
}
/**
Copy content of 'from' file from offset to 'to' file.
- We do the copy outside of the IO_CACHE as the cache
buffers would just make things slower and more complicated.
In most cases the copy loop should only do one read.
@param from File to copy.
@param to File to copy to.
@param offset Offset in 'from' file.
@retval
0 ok
@retval
-1 error
*/
static bool copy_file(IO_CACHE *from, IO_CACHE *to, my_off_t offset)
{
int bytes_read;
uchar io_buf[IO_SIZE*2];
DBUG_ENTER("copy_file");
mysql_file_seek(from->file, offset, MY_SEEK_SET, MYF(0));
while(TRUE)
{
if ((bytes_read= (int) mysql_file_read(from->file, io_buf, sizeof(io_buf),
MYF(MY_WME)))
< 0)
goto err;
if (DBUG_EVALUATE_IF("fault_injection_copy_part_file", 1, 0))
bytes_read= bytes_read/2;
if (!bytes_read)
break; // end of file
if (mysql_file_write(to->file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
goto err;
}
DBUG_RETURN(0);
err:
DBUG_RETURN(1);
}
#ifdef HAVE_REPLICATION
/**
Load data's io cache specific hook to be executed
before a chunk of data is being read into the cache's buffer
The fuction instantianates and writes into the binlog
replication events along LOAD DATA processing.
@param file pointer to io-cache
@retval 0 success
@retval 1 failure
*/
int log_loaded_block(IO_CACHE* file)
{
DBUG_ENTER("log_loaded_block");
LOAD_FILE_INFO *lf_info;
uint block_len;
/* buffer contains position where we started last read */
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
uint max_event_size= current_thd->variables.max_allowed_packet;
lf_info= (LOAD_FILE_INFO*) file->arg;
if (lf_info->thd->is_current_stmt_binlog_format_row())
DBUG_RETURN(0);
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
DBUG_RETURN(0);
for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
buffer += min(block_len, max_event_size),
block_len -= min(block_len, max_event_size))
{
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
if (lf_info->wrote_create_file)
{
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
min(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write_event(&a))
DBUG_RETURN(1);
}
else
{
Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
buffer,
min(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write_event(&b))
DBUG_RETURN(1);
lf_info->wrote_create_file= 1;
}
}
DBUG_RETURN(0);
}
/* Helper function for SHOW BINLOG/RELAYLOG EVENTS */
bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log)
{
Protocol *protocol= thd->protocol;
List- field_list;
const char *errmsg = 0;
bool ret = TRUE;
IO_CACHE log;
File file = -1;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
LOG_INFO linfo;
DBUG_ENTER("show_binlog_events");
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
Format_description_log_event *description_event= new
Format_description_log_event(3); /* MySQL 4.0 by default */
if (binary_log->is_open())
{
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
SELECT_LEX_UNIT *unit= &thd->lex->unit;
ha_rows event_count, limit_start, limit_end;
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
char search_file_name[FN_REFLEN], *name;
const char *log_file_name = lex_mi->log_file_name;
mysql_mutex_t *log_lock = binary_log->get_log_lock();
Log_event* ev;
unit->set_limit(thd->lex->current_select);
limit_start= unit->offset_limit_cnt;
limit_end= unit->select_limit_cnt;
name= search_file_name;
if (log_file_name)
binary_log->make_log_name(search_file_name, log_file_name);
else
name=0; // Find first log
linfo.index_file_offset = 0;
if (binary_log->find_log_pos(&linfo, name, true/*need_lock_index=true*/))
{
errmsg = "Could not find target log";
goto err;
}
mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = &linfo;
mysql_mutex_unlock(&LOCK_thread_count);
if ((file=open_binlog_file(&log, linfo.log_file_name, &errmsg)) < 0)
goto err;
my_off_t end_pos;
/*
Acquire LOCK_log only for the duration to calculate the
log's end position. LOCK_log should be acquired even while
we are checking whether the log is active log or not.
*/
mysql_mutex_lock(log_lock);
if (binary_log->is_active(linfo.log_file_name))
{
LOG_INFO li;
binary_log->get_current_log(&li, false /*LOCK_log is already acquired*/);
end_pos= li.pos;
}
else
{
end_pos= my_b_filelength(&log);
}
mysql_mutex_unlock(log_lock);
/*
to account binlog event header size
*/
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
DEBUG_SYNC(thd, "after_show_binlog_event_found_file");
/*
open_binlog_file() sought to position 4.
Read the first event in case it's a Format_description_log_event, to
know the format. If there's no such event, we are 3.23 or 4.x. This
code, like before, can't read 3.23 binlogs.
This code will fail on a mixed relay log (one which has Format_desc then
Rotate then Format_desc).
*/
ev= Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event,
opt_master_verify_checksum);
if (ev)
{
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
{
delete description_event;
description_event= (Format_description_log_event*) ev;
}
else
delete ev;
}
my_b_seek(&log, pos);
if (!description_event->is_valid())
{
errmsg="Invalid Format_description event; could be out of memory";
goto err;
}
for (event_count = 0;
(ev = Log_event::read_log_event(&log, (mysql_mutex_t*) 0,
description_event,
opt_master_verify_checksum)); )
{
DEBUG_SYNC(thd, "wait_in_show_binlog_events_loop");
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
description_event->checksum_alg= ev->checksum_alg;
if (event_count >= limit_start &&
ev->net_send(protocol, linfo.log_file_name, pos))
{
errmsg = "Net error";
delete ev;
goto err;
}
pos = my_b_tell(&log);
delete ev;
if (++event_count >= limit_end || pos >= end_pos)
break;
}
if (event_count < limit_end && log.error)
{
errmsg = "Wrong offset or I/O error";
goto err;
}
}
// Check that linfo is still on the function scope.
DEBUG_SYNC(thd, "after_show_binlog_events");
ret= FALSE;
err:
delete description_event;
if (file >= 0)
{
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
}
if (errmsg)
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
"SHOW BINLOG EVENTS", errmsg);
else
my_eof(thd);
mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
DBUG_RETURN(ret);
}
/**
Execute a SHOW BINLOG EVENTS statement.
@param thd Pointer to THD object for the client thread executing the
statement.
@retval FALSE success
@retval TRUE failure
*/
bool mysql_show_binlog_events(THD* thd)
{
Protocol *protocol= thd->protocol;
List
- field_list;
DBUG_ENTER("mysql_show_binlog_events");
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS);
Log_event::init_show_field_list(&field_list);
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
/*
Wait for handlers to insert any pending information
into the binlog. For e.g. ndb which updates the binlog asynchronously
this is needed so that the uses sees all its own commands in the binlog
*/
ha_binlog_wait(thd);
DBUG_RETURN(show_binlog_events(thd, &mysql_bin_log));
}
#endif /* HAVE_REPLICATION */
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
:bytes_written(0), file_id(1), open_count(1),
sync_period_ptr(sync_period), sync_counter(0),
m_prep_xids(0),
is_relay_log(0), signal_cnt(0),
checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
previous_gtid_set(0)
{
/*
We don't want to initialize locks here as such initialization depends on
safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is
called only in main(). Doing initialization here would make it happen
before main().
*/
index_file_name[0] = 0;
memset(&index_file, 0, sizeof(index_file));
memset(&purge_index_file, 0, sizeof(purge_index_file));
memset(&crash_safe_index_file, 0, sizeof(crash_safe_index_file));
}
/* this is called only once */
void MYSQL_BIN_LOG::cleanup()
{
DBUG_ENTER("cleanup");
if (inited)
{
inited= 0;
close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
mysql_mutex_destroy(&LOCK_log);
mysql_mutex_destroy(&LOCK_index);
mysql_mutex_destroy(&LOCK_commit);
mysql_mutex_destroy(&LOCK_sync);
mysql_mutex_destroy(&LOCK_xids);
mysql_cond_destroy(&update_cond);
my_atomic_rwlock_destroy(&m_prep_xids_lock);
mysql_cond_destroy(&m_prep_xids_cond);
stage_manager.deinit();
}
DBUG_VOID_RETURN;
}
void MYSQL_BIN_LOG::init_pthread_objects()
{
MYSQL_LOG::init_pthread_objects();
mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(m_key_LOCK_commit, &LOCK_commit, MY_MUTEX_INIT_FAST);
mysql_mutex_init(m_key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(m_key_LOCK_xids, &LOCK_xids, MY_MUTEX_INIT_FAST);
mysql_cond_init(m_key_update_cond, &update_cond, 0);
my_atomic_rwlock_init(&m_prep_xids_lock);
mysql_cond_init(m_key_prep_xids_cond, &m_prep_xids_cond, NULL);
stage_manager.init(
#ifdef HAVE_PSI_INTERFACE
m_key_LOCK_flush_queue,
m_key_LOCK_sync_queue,
m_key_LOCK_commit_queue,
m_key_LOCK_done, m_key_COND_done
#endif
);
}
bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg,
const char *log_name, bool need_lock_index)
{
File index_file_nr= -1;
DBUG_ASSERT(!my_b_inited(&index_file));
/*
First open of this class instance
Create an index file that will hold all file names uses for logging.
Add new entries to the end of it.
*/
myf opt= MY_UNPACK_FILENAME;
if (!index_file_name_arg)
{
index_file_name_arg= log_name; // Use same basename for index file
opt= MY_UNPACK_FILENAME | MY_REPLACE_EXT;
}
fn_format(index_file_name, index_file_name_arg, mysql_data_home,
".index", opt);
if (set_crash_safe_index_file_name(index_file_name_arg))
{
sql_print_error("MYSQL_BIN_LOG::set_crash_safe_index_file_name failed.");
return TRUE;
}
/*
We need move crash_safe_index_file to index_file if the index_file
does not exist and crash_safe_index_file exists when mysqld server
restarts.
*/
if (my_access(index_file_name, F_OK) &&
!my_access(crash_safe_index_file_name, F_OK) &&
my_rename(crash_safe_index_file_name, index_file_name, MYF(MY_WME)))
{
sql_print_error("MYSQL_BIN_LOG::open_index_file failed to "
"move crash_safe_index_file to index file.");
return TRUE;
}
if ((index_file_nr= mysql_file_open(m_key_file_log_index,
index_file_name,
O_RDWR | O_CREAT | O_BINARY,
MYF(MY_WME))) < 0 ||
mysql_file_sync(index_file_nr, MYF(MY_WME)) ||
init_io_cache(&index_file, index_file_nr,
IO_SIZE, READ_CACHE,
mysql_file_seek(index_file_nr, 0L, MY_SEEK_END, MYF(0)),
0, MYF(MY_WME | MY_WAIT_IF_FULL)) ||
DBUG_EVALUATE_IF("fault_injection_openning_index", 1, 0))
{
/*
TODO: all operations creating/deleting the index file or a log, should
call my_sync_dir() or my_sync_dir_by_file() to be durable.
TODO: file creation should be done with mysql_file_create()
not mysql_file_open().
*/
if (index_file_nr >= 0)
mysql_file_close(index_file_nr, MYF(0));
return TRUE;
}
#ifdef HAVE_REPLICATION
/*
Sync the index by purging any binary log file that is not registered.
In other words, either purge binary log files that were removed from
the index but not purged from the file system due to a crash or purge
any binary log file that was created but not register in the index
due to a crash.
*/
if (set_purge_index_file_name(index_file_name_arg) ||
open_purge_index_file(FALSE) ||
purge_index_entry(NULL, NULL, need_lock_index) ||
close_purge_index_file() ||
DBUG_EVALUATE_IF("fault_injection_recovering_index", 1, 0))
{
sql_print_error("MYSQL_BIN_LOG::open_index_file failed to sync the index "
"file.");
return TRUE;
}
#endif
return FALSE;
}
/**
Reads GTIDs from the given binlog file.
@param filename File to read from.
@param all_gtids If not NULL, then the GTIDs from the
Previous_gtids_log_event and from all Gtid_log_events are stored in
this object.
@param prev_gtids If not NULL, then the GTIDs from the
Previous_gtids_log_events are stored in this object.
@param first_gtid If not NULL, then the first GTID information from the
file will be stored in this object.
@param last_gtid If not NULL, then the last GTID information from the
file will be stored in this object.
@param sid_map The sid_map object to use in the rpl_sidno generation
of the Gtid_log_event. If lock is needed in the sid_map, the caller
must hold it.
@param verify_checksum Set to true to verify event checksums.
@retval GOT_GTIDS The file was successfully read and it contains
both Gtid_log_events and Previous_gtids_log_events.
@retval GOT_PREVIOUS_GTIDS The file was successfully read and it
contains Previous_gtids_log_events but no Gtid_log_events.
@retval NO_GTIDS The file was successfully read and it does not
contain GTID events.
@retval ERROR Out of memory, or the file contains GTID events
when GTID_MODE = OFF, or the file is malformed (e.g., contains
Gtid_log_events but no Previous_gtids_log_event).
@retval TRUNCATED The file was truncated before the end of the
first Previous_gtids_log_event.
*/
enum enum_read_gtids_from_binlog_status
{ GOT_GTIDS, GOT_PREVIOUS_GTIDS, NO_GTIDS, ERROR, TRUNCATED };
static enum_read_gtids_from_binlog_status
read_gtids_from_binlog(const char *filename, Gtid_set *all_gtids,
Gtid_set *prev_gtids, Gtid *first_gtid,
Gtid *last_gtid,
Sid_map* sid_map,
bool verify_checksum)
{
DBUG_ENTER("read_gtids_from_binlog");
DBUG_PRINT("info", ("Opening file %s", filename));
/*
Create a Format_description_log_event that is used to read the
first event of the log.
*/
Format_description_log_event fd_ev(BINLOG_VERSION), *fd_ev_p= &fd_ev;
if (!fd_ev.is_valid())
DBUG_RETURN(ERROR);
File file;
IO_CACHE log;
/*
We assert here that both all_gtids and prev_gtids, if specified,
uses the same sid_map as the one passed as a parameter. This is just
to ensure that, if the sid_map needed some lock and was locked by
the caller, the lock applies to all the GTID sets this function is
dealing with.
*/
#ifndef DBUG_OFF
if (all_gtids)
DBUG_ASSERT(all_gtids->get_sid_map() == sid_map);
if (prev_gtids)
DBUG_ASSERT(prev_gtids->get_sid_map() == sid_map);
#endif
const char *errmsg= NULL;
if ((file= open_binlog_file(&log, filename, &errmsg)) < 0)
{
sql_print_error("%s", errmsg);
/*
We need to revisit the recovery procedure for relay log
files. Currently, it is called after this routine.
/Alfranio
*/
DBUG_RETURN(TRUNCATED);
}
/*
Seek for Previous_gtids_log_event and Gtid_log_event events to
gather information what has been processed so far.
*/
my_b_seek(&log, BIN_LOG_HEADER_SIZE);
Log_event *ev= NULL;
enum_read_gtids_from_binlog_status ret= NO_GTIDS;
bool done= false;
bool seen_first_gtid= false;
while (!done &&
(ev= Log_event::read_log_event(&log, 0, fd_ev_p, verify_checksum)) !=
NULL)
{
DBUG_PRINT("info", ("Read event of type %s", ev->get_type_str()));
switch (ev->get_type_code())
{
case FORMAT_DESCRIPTION_EVENT:
if (fd_ev_p != &fd_ev)
delete fd_ev_p;
fd_ev_p= (Format_description_log_event *)ev;
break;
case ROTATE_EVENT:
// do nothing; just accept this event and go to next
break;
case PREVIOUS_GTIDS_LOG_EVENT:
{
if (gtid_mode == 0)
{
my_error(ER_FOUND_GTID_EVENT_WHEN_GTID_MODE_IS_OFF, MYF(0));
ret= ERROR;
}
ret= GOT_PREVIOUS_GTIDS;
// add events to sets
Previous_gtids_log_event *prev_gtids_ev=
(Previous_gtids_log_event *)ev;
if (all_gtids != NULL && prev_gtids_ev->add_to_set(all_gtids) != 0)
ret= ERROR, done= true;
else if (prev_gtids != NULL && prev_gtids_ev->add_to_set(prev_gtids) != 0)
ret= ERROR, done= true;
#ifndef DBUG_OFF
char* prev_buffer= prev_gtids_ev->get_str(NULL, NULL);
DBUG_PRINT("info", ("Got Previous_gtids from file '%s': Gtid_set='%s'.",
filename, prev_buffer));
my_free(prev_buffer);
#endif
break;
}
case GTID_LOG_EVENT:
{
DBUG_EXECUTE_IF("inject_fault_bug16502579", {
DBUG_PRINT("debug", ("GTID_LOG_EVENT found. Injected ret=NO_GTIDS."));
ret=NO_GTIDS;
});
if (ret != GOT_GTIDS)
{
if (ret != GOT_PREVIOUS_GTIDS)
{
/*
Since this routine is run on startup, there may not be a
THD instance. Therefore, ER(X) cannot be used.
*/
const char* msg_fmt= (current_thd != NULL) ?
ER(ER_BINLOG_LOGICAL_CORRUPTION) :
ER_DEFAULT(ER_BINLOG_LOGICAL_CORRUPTION);
my_printf_error(ER_BINLOG_LOGICAL_CORRUPTION,
msg_fmt, MYF(0),
filename,
"The first global transaction identifier was read, but "
"no other information regarding identifiers existing "
"on the previous log files was found.");
ret= ERROR, done= true;
break;
}
else
ret= GOT_GTIDS;
}
/*
When all_gtids, first_gtid and last_gtid are all NULL,
we just check if the binary log contains at least one Gtid_log_event,
so that we can distinguish the return values GOT_GTID and
GOT_PREVIOUS_GTIDS. We don't need to read anything else from the
binary log.
If all_gtids or last_gtid is requested (i.e., NOT NULL), we should
continue to read all gtids.
If just first_gtid was requested, we will be done after storing this
Gtid_log_event info on it.
*/
if (all_gtids == NULL && first_gtid == NULL && last_gtid == NULL)
{
ret= GOT_GTIDS, done= true;
}
else
{
Gtid_log_event *gtid_ev= (Gtid_log_event *)ev;
rpl_sidno sidno= gtid_ev->get_sidno(sid_map);
if (sidno < 0)
ret= ERROR, done= true;
else
{
if (all_gtids)
{
if (all_gtids->ensure_sidno(sidno) != RETURN_STATUS_OK)
ret= ERROR, done= true;
else if (all_gtids->_add_gtid(sidno, gtid_ev->get_gno()) !=
RETURN_STATUS_OK)
ret= ERROR, done= true;
DBUG_PRINT("info", ("Got Gtid from file '%s': Gtid(%d, %lld).",
filename, sidno, gtid_ev->get_gno()));
}
/* If the first GTID was requested, stores it */
if (first_gtid && !seen_first_gtid)
{
first_gtid->set(sidno, gtid_ev->get_gno());
seen_first_gtid= true;
/* If the first_gtid was the only thing requested, we are done */
if (all_gtids == NULL && last_gtid == NULL)
ret= GOT_GTIDS, done= true;
}
if (last_gtid)
last_gtid->set(sidno, gtid_ev->get_gno());
}
}
break;
}
case ANONYMOUS_GTID_LOG_EVENT:
default:
// if we found any other event type without finding a
// previous_gtids_log_event, then the rest of this binlog
// cannot contain gtids
if (ret != GOT_GTIDS && ret != GOT_PREVIOUS_GTIDS)
done= true;
break;
}
if (ev != fd_ev_p)
delete ev;
DBUG_PRINT("info", ("done=%d", done));
}
if (log.error < 0)
{
// This is not a fatal error; the log may just be truncated.
// @todo but what other errors could happen? IO error?
sql_print_warning("Error reading GTIDs from binary log: %d", log.error);
}
if (fd_ev_p != &fd_ev)
{
delete fd_ev_p;
fd_ev_p= &fd_ev;
}
mysql_file_close(file, MYF(MY_WME));
end_io_cache(&log);
DBUG_PRINT("info", ("returning %d", ret));
DBUG_RETURN(ret);
}
bool MYSQL_BIN_LOG::find_first_log_not_in_gtid_set(char *binlog_file_name,
const Gtid_set *gtid_set,
Gtid *first_gtid,
const char **errmsg)
{
DBUG_ENTER("MYSQL_BIN_LOG::gtid_read_start_binlog");
/*
Gather the set of files to be accessed.
*/
list filename_list;
LOG_INFO linfo;
int error;
list::reverse_iterator rit;
Gtid_set previous_gtid_set(gtid_set->get_sid_map());
mysql_mutex_lock(&LOCK_index);
for (error= find_log_pos(&linfo, NULL, false/*need_lock_index=false*/);
!error; error= find_next_log(&linfo, false/*need_lock_index=false*/))
{
DBUG_PRINT("info", ("read log filename '%s'", linfo.log_file_name));
filename_list.push_back(string(linfo.log_file_name));
}
mysql_mutex_unlock(&LOCK_index);
if (error != LOG_INFO_EOF)
{
*errmsg= "Failed to read the binary log index file while "
"looking for the oldest binary log that contains any GTID "
"that is not in the given gtid set";
error= -1;
goto end;
}
if (filename_list.empty())
{
*errmsg= "Could not find first log file name in binary log index file "
"while looking for the oldest binary log that contains any GTID "
"that is not in the given gtid set";
error= -2;
goto end;
}
/*
Iterate over all the binary logs in reverse order, and read only
the Previous_gtids_log_event, to find the first one, that is the
subset of the given gtid set. Since every binary log begins with
a Previous_gtids_log_event, that contains all GTIDs in all
previous binary logs.
We also ask for the first GTID in the binary log to know if we
should send the FD event with the "created" field cleared or not.
*/
DBUG_PRINT("info", ("Iterating backwards through binary logs, and reading "
"only the Previous_gtids_log_event, to find the first "
"one, that is the subset of the given gtid set."));
rit= filename_list.rbegin();
error= 0;
while (rit != filename_list.rend())
{
const char *filename= rit->c_str();
DBUG_PRINT("info", ("Read Previous_gtids_log_event from filename='%s'",
filename));
switch (read_gtids_from_binlog(filename, NULL, &previous_gtid_set,
first_gtid, NULL/* last_gtid */,
previous_gtid_set.get_sid_map(),
opt_master_verify_checksum))
{
case ERROR:
*errmsg= "Error reading header of binary log while looking for "
"the oldest binary log that contains any GTID that is not in "
"the given gtid set";
error= -3;
goto end;
case NO_GTIDS:
*errmsg= "Found old binary log without GTIDs while looking for "
"the oldest binary log that contains any GTID that is not in "
"the given gtid set";
error= -4;
goto end;
case GOT_GTIDS:
case GOT_PREVIOUS_GTIDS:
if (previous_gtid_set.is_subset(gtid_set))
{
strcpy(binlog_file_name, filename);
/*
Verify that the selected binlog is not the first binlog,
*/
DBUG_EXECUTE_IF("slave_reconnect_with_gtid_set_executed",
DBUG_ASSERT(strcmp(filename_list.begin()->c_str(),
binlog_file_name) != 0););
goto end;
}
case TRUNCATED:
break;
}
previous_gtid_set.clear();
rit++;
}
if (rit == filename_list.rend())
{
*errmsg= ER(ER_MASTER_HAS_PURGED_REQUIRED_GTIDS);
error= -5;
}
end:
if (error)
DBUG_PRINT("error", ("'%s'", *errmsg));
filename_list.clear();
DBUG_PRINT("info", ("returning %d", error));
DBUG_RETURN(error != 0 ? true : false);
}
bool MYSQL_BIN_LOG::init_gtid_sets(Gtid_set *all_gtids, Gtid_set *lost_gtids,
Gtid *last_gtid, bool verify_checksum,
bool need_lock, bool is_server_starting)
{
DBUG_ENTER("MYSQL_BIN_LOG::init_gtid_sets");
DBUG_PRINT("info", ("lost_gtids=%p; so we are recovering a %s log",
lost_gtids, lost_gtids == NULL ? "relay" : "binary"));
/*
Acquires the necessary locks to ensure that logs are not either
removed or updated when we are reading from it.
*/
if (need_lock)
{
// We don't need LOCK_log if we are only going to read the initial
// Prevoius_gtids_log_event and ignore the Gtid_log_events.
if (all_gtids != NULL)
mysql_mutex_lock(&LOCK_log);
mysql_mutex_lock(&LOCK_index);
global_sid_lock->wrlock();
}
else
{
if (all_gtids != NULL)
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_index);
global_sid_lock->assert_some_wrlock();
}
// Gather the set of files to be accessed.
list filename_list;
LOG_INFO linfo;
int error;
list::iterator it;
list::reverse_iterator rit;
bool reached_first_file= false;
/* Initialize the sid_map to be used in read_gtids_from_binlog */
Sid_map *sid_map= NULL;
if (all_gtids)
sid_map= all_gtids->get_sid_map();
else if (lost_gtids)
sid_map= lost_gtids->get_sid_map();
for (error= find_log_pos(&linfo, NULL, false/*need_lock_index=false*/); !error;
error= find_next_log(&linfo, false/*need_lock_index=false*/))
{
DBUG_PRINT("info", ("read log filename '%s'", linfo.log_file_name));
filename_list.push_back(string(linfo.log_file_name));
}
if (error != LOG_INFO_EOF)
{
DBUG_PRINT("error", ("Error reading binlog index"));
goto end;
}
/*
On server starting, one new empty binlog file is created and
its file name is put into index file before initializing
GLOBAL.GTID_EXECUTED AND GLOBAL.GTID_PURGED, it is not the
last binlog file before the server restarts, so we remove
its file name from filename_list.
*/
if (is_server_starting && !is_relay_log && !filename_list.empty())
filename_list.pop_back();
error= 0;
if (all_gtids != NULL)
{
DBUG_PRINT("info", ("Iterating backwards through binary logs, looking for the last binary log that contains a Previous_gtids_log_event."));
// Iterate over all files in reverse order until we find one that
// contains a Previous_gtids_log_event.
rit= filename_list.rbegin();
bool got_gtids= false;
reached_first_file= (rit == filename_list.rend());
DBUG_PRINT("info", ("filename='%s' reached_first_file=%d",
rit->c_str(), reached_first_file));
while ((!got_gtids || (last_gtid && last_gtid->empty()))
&& !reached_first_file)
{
const char *filename= rit->c_str();
rit++;
reached_first_file= (rit == filename_list.rend());
DBUG_PRINT("info", ("filename='%s' got_gtids=%d reached_first_file=%d",
filename, got_gtids, reached_first_file));
switch (read_gtids_from_binlog(filename, got_gtids ? NULL : all_gtids,
reached_first_file ? lost_gtids : NULL,
NULL/* first_gtid */, last_gtid,
sid_map, verify_checksum))
{
case ERROR:
{
error= 1;
goto end;
}
case GOT_GTIDS:
case GOT_PREVIOUS_GTIDS:
{
got_gtids= true;
break;
}
case NO_GTIDS:
{
/*
If the binlog_gtid_simple_recovery is enabled, and the
last binary log does not contain any GTID event, do not
read any more binary logs, GLOBAL.GTID_EXECUTED and
GLOBAL.GTID_PURGED should be empty in the case. Otherwise,
initialize GTID_EXECUTED as usual.
*/
if (binlog_gtid_simple_recovery && !is_relay_log)
{
DBUG_ASSERT(all_gtids->is_empty() && lost_gtids->is_empty());
goto end;
}
/*FALLTHROUGH*/
}
case TRUNCATED:
{
break;
}
}
}
}
if (lost_gtids != NULL && !reached_first_file)
{
DBUG_PRINT("info", ("Iterating forwards through binary logs, looking for the first binary log that contains a Previous_gtids_log_event."));
for (it= filename_list.begin(); it != filename_list.end(); it++)
{
const char *filename= it->c_str();
DBUG_PRINT("info", ("filename='%s'", filename));
switch (read_gtids_from_binlog(filename, NULL, lost_gtids,
NULL/* first_gtid */, NULL/* last_gtid */,
sid_map, verify_checksum))
{
case ERROR:
{
error= 1;
/*FALLTHROUGH*/
}
case GOT_GTIDS:
{
goto end;
}
case NO_GTIDS:
{
/*
If the binlog_gtid_simple_recovery is enabled, and the
first binary log does not contain any GTID event, do not
read any more binary logs, GLOBAL.GTID_PURGED should be
empty in the case.
*/
if (binlog_gtid_simple_recovery && !is_relay_log)
{
DBUG_ASSERT(lost_gtids->is_empty());
goto end;
}
/*FALLTHROUGH*/
}
case GOT_PREVIOUS_GTIDS:
case TRUNCATED:
{
break;
}
}
}
}
end:
if (all_gtids)
all_gtids->dbug_print("all_gtids");
if (lost_gtids)
lost_gtids->dbug_print("lost_gtids");
if (need_lock)
{
global_sid_lock->unlock();
mysql_mutex_unlock(&LOCK_index);
if (all_gtids != NULL)
mysql_mutex_unlock(&LOCK_log);
}
filename_list.clear();
DBUG_PRINT("info", ("returning %d", error));
DBUG_RETURN(error != 0 ? true : false);
}
/**
Open a (new) binlog file.
- Open the log file and the index file. Register the new
file name in it
- When calling this when the file is in use, you must have a locks
on LOCK_log and LOCK_index.
@retval
0 ok
@retval
1 error
*/
bool MYSQL_BIN_LOG::open_binlog(const char *log_name,
const char *new_name,
enum cache_type io_cache_type_arg,
ulong max_size_arg,
bool null_created_arg,
bool need_lock_index,
bool need_sid_lock,
Format_description_log_event *extra_description_event)
{
File file= -1;
// lock_index must be acquired *before* sid_lock.
DBUG_ASSERT(need_sid_lock || !need_lock_index);
DBUG_ENTER("MYSQL_BIN_LOG::open_binlog(const char *, ...)");
DBUG_PRINT("enter",("name: %s", log_name));
if (init_and_set_log_file_name(log_name, new_name, LOG_BIN,
io_cache_type_arg))
{
sql_print_error("MYSQL_BIN_LOG::open failed to generate new file name.");
DBUG_RETURN(1);
}
#ifdef HAVE_REPLICATION
if (open_purge_index_file(TRUE) ||
register_create_index_entry(log_file_name) ||
sync_purge_index_file() ||
DBUG_EVALUATE_IF("fault_injection_registering_index", 1, 0))
{
/**
@todo: although this was introduced to appease valgrind
when injecting emulated faults using fault_injection_registering_index
it may be good to consider what actually happens when
open_purge_index_file succeeds but register or sync fails.
Perhaps we might need the code below in MYSQL_LOG_BIN::cleanup
for "real life" purposes as well?
*/
DBUG_EXECUTE_IF("fault_injection_registering_index", {
if (my_b_inited(&purge_index_file))
{
end_io_cache(&purge_index_file);
my_close(purge_index_file.file, MYF(0));
}
});
sql_print_error("MYSQL_BIN_LOG::open failed to sync the index file.");
DBUG_RETURN(1);
}
DBUG_EXECUTE_IF("crash_create_non_critical_before_update_index", DBUG_SUICIDE(););
#endif
write_error= 0;
/* open the main log file */
if (MYSQL_LOG::open(
#ifdef HAVE_PSI_INTERFACE
m_key_file_log,
#endif
log_name, LOG_BIN, new_name, io_cache_type_arg))
{
#ifdef HAVE_REPLICATION
close_purge_index_file();
#endif
DBUG_RETURN(1); /* all warnings issued */
}
max_size= max_size_arg;
open_count++;
bool write_file_name_to_index_file=0;
/* This must be before goto err. */
Format_description_log_event s(BINLOG_VERSION);
if (!my_b_filelength(&log_file))
{
/*
The binary log file was empty (probably newly created)
This is the normal case and happens when the user doesn't specify
an extension for the binary log files.
In this case we write a standard header to it.
*/
if (my_b_safe_write(&log_file, (uchar*) BINLOG_MAGIC,
BIN_LOG_HEADER_SIZE))
goto err;
bytes_written+= BIN_LOG_HEADER_SIZE;
write_file_name_to_index_file= 1;
}
/*
don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache
as we won't be able to reset it later
*/
if (io_cache_type == WRITE_CACHE)
s.flags |= LOG_EVENT_BINLOG_IN_USE_F;
s.checksum_alg= is_relay_log ?
/* relay-log */
/* inherit master's A descriptor if one has been received */
(relay_log_checksum_alg=
(relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
relay_log_checksum_alg :
/* otherwise use slave's local preference of RL events verification */
(opt_slave_sql_verify_checksum == 0) ?
(uint8) BINLOG_CHECKSUM_ALG_OFF : binlog_checksum_options):
/* binlog */
binlog_checksum_options;
DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if (!s.is_valid())
goto err;
s.dont_set_created= null_created_arg;
/* Set LOG_EVENT_RELAY_LOG_F flag for relay log's FD */
if (is_relay_log)
s.set_relay_log_event();
if (s.write(&log_file))
goto err;
bytes_written+= s.data_written;
/*
We need to revisit this code and improve it.
See further comments in the mysqld.
/Alfranio
*/
if (current_thd && gtid_mode > 0)
{
if (need_sid_lock)
global_sid_lock->wrlock();
else
global_sid_lock->assert_some_wrlock();
Previous_gtids_log_event prev_gtids_ev(previous_gtid_set);
if (is_relay_log)
prev_gtids_ev.set_relay_log_event();
if (need_sid_lock)
global_sid_lock->unlock();
prev_gtids_ev.checksum_alg= s.checksum_alg;
if (prev_gtids_ev.write(&log_file))
goto err;
bytes_written+= prev_gtids_ev.data_written;
}
if (extra_description_event &&
extra_description_event->binlog_version>=4)
{
/*
This is a relay log written to by the I/O slave thread.
Write the event so that others can later know the format of this relay
log.
Note that this event is very close to the original event from the
master (it has binlog version of the master, event types of the
master), so this is suitable to parse the next relay log's event. It
has been produced by
Format_description_log_event::Format_description_log_event(char* buf,).
Why don't we want to write the mi_description_event if this
event is for format<4 (3.23 or 4.x): this is because in that case, the
mi_description_event describes the data received from the
master, but not the data written to the relay log (*conversion*),
which is in format 4 (slave's).
*/
/*
Set 'created' to 0, so that in next relay logs this event does not
trigger cleaning actions on the slave in
Format_description_log_event::apply_event_impl().
*/
extra_description_event->created= 0;
/* Don't set log_pos in event header */
extra_description_event->set_artificial_event();
if (extra_description_event->write(&log_file))
goto err;
bytes_written+= extra_description_event->data_written;
}
if (flush_io_cache(&log_file) ||
mysql_file_sync(log_file.file, MYF(MY_WME)))
goto err;
if (write_file_name_to_index_file)
{
#ifdef HAVE_REPLICATION
DBUG_EXECUTE_IF("crash_create_critical_before_update_index", DBUG_SUICIDE(););
#endif
DBUG_ASSERT(my_b_inited(&index_file) != 0);
/*
The new log file name is appended into crash safe index file after
all the content of index file is copyed into the crash safe index
file. Then move the crash safe index file to index file.
*/
DBUG_EXECUTE_IF("simulate_disk_full_on_open_binlog",
{DBUG_SET("+d,simulate_no_free_space_error");});
if (DBUG_EVALUATE_IF("fault_injection_updating_index", 1, 0) ||
add_log_to_index((uchar*) log_file_name, strlen(log_file_name),
need_lock_index))
{
DBUG_EXECUTE_IF("simulate_disk_full_on_open_binlog",
{
DBUG_SET("-d,simulate_file_write_error");
DBUG_SET("-d,simulate_no_free_space_error");
DBUG_SET("-d,simulate_disk_full_on_open_binlog");
});
goto err;
}
#ifdef HAVE_REPLICATION
DBUG_EXECUTE_IF("crash_create_after_update_index", DBUG_SUICIDE(););
#endif
}
log_state= LOG_OPENED;
#ifdef HAVE_REPLICATION
close_purge_index_file();
#endif
DBUG_RETURN(0);
err:
#ifdef HAVE_REPLICATION
if (is_inited_purge_index_file())
purge_index_entry(NULL, NULL, need_lock_index);
close_purge_index_file();
#endif
if (file >= 0)
mysql_file_close(file, MYF(0));
end_io_cache(&log_file);
end_io_cache(&index_file);
my_free(name);
name= NULL;
log_state= LOG_CLOSED;
if (binlog_error_action == ABORT_SERVER)
{
exec_binlog_error_action_abort("Either disk is full or file system is read "
"only while opening the binlog. Aborting the"
" server.");
}
else
sql_print_error("Could not use %s for logging (error %d). "
"Turning logging off for the whole duration of the MySQL "
"server process. To turn it on again: fix the cause, "
"shutdown the MySQL server and restart it.", name, errno);
DBUG_RETURN(1);
}
/**
Move crash safe index file to index file.
@param need_lock_index If true, LOCK_index will be acquired;
otherwise it should already be held.
@retval 0 ok
@retval -1 error
*/
int MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file(bool need_lock_index)
{
int error= 0;
File fd= -1;
DBUG_ENTER("MYSQL_BIN_LOG::move_crash_safe_index_file_to_index_file");
if (need_lock_index)
mysql_mutex_lock(&LOCK_index);
else
mysql_mutex_assert_owner(&LOCK_index);
if (my_b_inited(&index_file))
{
end_io_cache(&index_file);
if (mysql_file_close(index_file.file, MYF(0)) < 0)
{
error= -1;
sql_print_error("While rebuilding index file %s: "
"Failed to close the index file.", index_file_name);
/*
Delete Crash safe index file here and recover the binlog.index
state(index_file io_cache) from old binlog.index content.
*/
mysql_file_delete(key_file_binlog_index, crash_safe_index_file_name,
MYF(0));
goto recoverable_err;
}
if (DBUG_EVALUATE_IF("force_index_file_delete_failure", 1, 0) ||
mysql_file_delete(key_file_binlog_index, index_file_name, MYF(MY_WME)))
{
error= -1;
sql_print_error("While rebuilding index file %s: "
"Failed to delete the existing index file. It could be "
"that file is being used by some other process.",
index_file_name);
/*
Delete Crash safe file index file here and recover the binlog.index
state(index_file io_cache) from old binlog.index content.
*/
mysql_file_delete(key_file_binlog_index, crash_safe_index_file_name,
MYF(0));
goto recoverable_err;
}
}
DBUG_EXECUTE_IF("crash_create_before_rename_index_file", DBUG_SUICIDE(););
if (my_rename(crash_safe_index_file_name, index_file_name, MYF(MY_WME)))
{
error= -1;
sql_print_error("While rebuilding index file %s: "
"Failed to rename the new index file to the existing "
"index file.", index_file_name);
goto fatal_err;
}
DBUG_EXECUTE_IF("crash_create_after_rename_index_file", DBUG_SUICIDE(););
recoverable_err:
if ((fd= mysql_file_open(key_file_binlog_index,
index_file_name,
O_RDWR | O_CREAT | O_BINARY,
MYF(MY_WME))) < 0 ||
mysql_file_sync(fd, MYF(MY_WME)) ||
init_io_cache(&index_file, fd, IO_SIZE, READ_CACHE,
mysql_file_seek(fd, 0L, MY_SEEK_END, MYF(0)),
0, MYF(MY_WME | MY_WAIT_IF_FULL)))
{
sql_print_error("After rebuilding the index file %s: "
"Failed to open the index file.", index_file_name);
goto fatal_err;
}
if (need_lock_index)
mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
fatal_err:
/*
This situation is very very rare to happen (unless there is some serious
memory related issues like OOM) and should be treated as fatal error.
Hence it is better to bring down the server without respecting
'binlog_error_action' value here.
*/
exec_binlog_error_action_abort("MySQL server failed to update the "
"binlog.index file's content properly. "
"It might not be in sync with available "
"binlogs and the binlog.index file state is in "
"unrecoverable state. Aborting the server.");
/*
Server is aborted in the above function.
This is dead code to make compiler happy.
*/
DBUG_RETURN(error);
}
/**
Append log file name to index file.
- To make crash safe, we copy all the content of index file
to crash safe index file firstly and then append the log
file name to the crash safe index file. Finally move the
crash safe index file to index file.
@retval
0 ok
@retval
-1 error
*/
int MYSQL_BIN_LOG::add_log_to_index(uchar* log_name,
int log_name_len, bool need_lock_index)
{
DBUG_ENTER("MYSQL_BIN_LOG::add_log_to_index");
if (open_crash_safe_index_file())
{
sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
"open the crash safe index file.");
goto err;
}
if (copy_file(&index_file, &crash_safe_index_file, 0))
{
sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
"copy index file to crash safe index file.");
goto err;
}
if (my_b_write(&crash_safe_index_file, log_name, log_name_len) ||
my_b_write(&crash_safe_index_file, (uchar*) "\n", 1) ||
flush_io_cache(&crash_safe_index_file) ||
mysql_file_sync(crash_safe_index_file.file, MYF(MY_WME)))
{
sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
"append log file name: %s, to crash "
"safe index file.", log_name);
goto err;
}
if (close_crash_safe_index_file())
{
sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
"close the crash safe index file.");
goto err;
}
if (move_crash_safe_index_file_to_index_file(need_lock_index))
{
sql_print_error("MYSQL_BIN_LOG::add_log_to_index failed to "
"move crash safe index file to index file.");
goto err;
}
DBUG_RETURN(0);
err:
DBUG_RETURN(-1);
}
int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo, bool need_lock_log/*true*/)
{
if (need_lock_log)
mysql_mutex_lock(&LOCK_log);
int ret = raw_get_current_log(linfo);
if (need_lock_log)
mysql_mutex_unlock(&LOCK_log);
return ret;
}
int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
{
strmake(linfo->log_file_name, log_file_name, sizeof(linfo->log_file_name)-1);
linfo->pos = my_b_safe_tell(&log_file);
return 0;
}
bool MYSQL_BIN_LOG::check_write_error(THD *thd)
{
DBUG_ENTER("MYSQL_BIN_LOG::check_write_error");
bool checked= FALSE;
if (!thd->is_error())
DBUG_RETURN(checked);
switch (thd->get_stmt_da()->sql_errno())
{
case ER_TRANS_CACHE_FULL:
case ER_STMT_CACHE_FULL:
case ER_ERROR_ON_WRITE:
case ER_BINLOG_LOGGING_IMPOSSIBLE:
checked= TRUE;
break;
}
DBUG_PRINT("return", ("checked: %s", YESNO(checked)));
DBUG_RETURN(checked);
}
void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::set_write_error");
write_error= 1;
if (check_write_error(thd))
DBUG_VOID_RETURN;
if (my_errno == EFBIG)
{
if (is_transactional)
{
my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME));
}
else
{
my_message(ER_STMT_CACHE_FULL, ER(ER_STMT_CACHE_FULL), MYF(MY_WME));
}
}
else
{
char errbuf[MYSYS_STRERROR_SIZE];
my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), name,
errno, my_strerror(errbuf, sizeof(errbuf), errno));
}
DBUG_VOID_RETURN;
}
/**
Find the position in the log-index-file for the given log name.
@param[out] linfo The found log file name will be stored here, along
with the byte offset of the next log file name in the index file.
@param log_name Filename to find in the index file, or NULL if we
want to read the first entry.
@param need_lock_index If false, this function acquires LOCK_index;
otherwise the lock should already be held by the caller.
@note
On systems without the truncate function the file will end with one or
more empty lines. These will be ignored when reading the file.
@retval
0 ok
@retval
LOG_INFO_EOF End of log-index-file found
@retval
LOG_INFO_IO Got IO error while reading file
*/
int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
bool need_lock_index)
{
int error= 0;
char *full_fname= linfo->log_file_name;
char full_log_name[FN_REFLEN], fname[FN_REFLEN];
uint log_name_len= 0, fname_len= 0;
DBUG_ENTER("find_log_pos");
full_log_name[0]= full_fname[0]= 0;
/*
Mutex needed because we need to make sure the file pointer does not
move from under our feet
*/
if (need_lock_index)
mysql_mutex_lock(&LOCK_index);
else
mysql_mutex_assert_owner(&LOCK_index);
// extend relative paths for log_name to be searched
if (log_name)
{
if(normalize_binlog_name(full_log_name, log_name, is_relay_log))
{
error= LOG_INFO_EOF;
goto end;
}
}
log_name_len= log_name ? (uint) strlen(full_log_name) : 0;
DBUG_PRINT("enter", ("log_name: %s, full_log_name: %s",
log_name ? log_name : "NULL", full_log_name));
/* As the file is flushed, we can't get an error here */
my_b_seek(&index_file, (my_off_t) 0);
for (;;)
{
uint length;
my_off_t offset= my_b_tell(&index_file);
DBUG_EXECUTE_IF("simulate_find_log_pos_error",
error= LOG_INFO_EOF; break;);
/* If we get 0 or 1 characters, this is the end of the file */
if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
{
/* Did not find the given entry; Return not found or error */
error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
break;
}
// extend relative paths and match against full path
if (normalize_binlog_name(full_fname, fname, is_relay_log))
{
error= LOG_INFO_EOF;
break;
}
fname_len= (uint) strlen(full_fname);
// if the log entry matches, null string matching anything
if (!log_name ||
(log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' &&
!memcmp(full_fname, full_log_name, log_name_len)))
{
DBUG_PRINT("info", ("Found log file entry"));
full_fname[fname_len-1]= 0; // remove last \n
linfo->index_file_start_offset= offset;
linfo->index_file_offset = my_b_tell(&index_file);
break;
}
linfo->entry_index++;
}
end:
if (need_lock_index)
mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
/**
Find the position in the log-index-file for the given log name.
@param[out] linfo The filename will be stored here, along with the
byte offset of the next filename in the index file.
@param need_lock_index If true, LOCK_index will be acquired;
otherwise it should already be held by the caller.
@note
- Before calling this function, one has to call find_log_pos()
to set up 'linfo'
- Mutex needed because we need to make sure the file pointer does not move
from under our feet
@retval 0 ok
@retval LOG_INFO_EOF End of log-index-file found
@retval LOG_INFO_IO Got IO error while reading file
*/
int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock_index)
{
int error= 0;
uint length;
char fname[FN_REFLEN];
char *full_fname= linfo->log_file_name;
if (need_lock_index)
mysql_mutex_lock(&LOCK_index);
else
mysql_mutex_assert_owner(&LOCK_index);
/* As the file is flushed, we can't get an error here */
my_b_seek(&index_file, linfo->index_file_offset);
linfo->index_file_start_offset= linfo->index_file_offset;
if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
{
error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
goto err;
}
if (fname[0] != 0)
{
if(normalize_binlog_name(full_fname, fname, is_relay_log))
{
error= LOG_INFO_EOF;
goto err;
}
length= strlen(full_fname);
}
full_fname[length-1]= 0; // kill \n
linfo->index_file_offset= my_b_tell(&index_file);
err:
if (need_lock_index)
mysql_mutex_unlock(&LOCK_index);
return error;
}
/**
Removes files, as part of a RESET MASTER or RESET SLAVE statement,
by deleting all logs refered to in the index file. Then, it starts
writing to a new log file.
The new index file will only contain this file.
@param thd Thread
@note
If not called from slave thread, write start event to new log
@retval
0 ok
@retval
1 error
*/
bool MYSQL_BIN_LOG::reset_logs(THD* thd)
{
LOG_INFO linfo;
bool error=0;
int err;
const char* save_name;
DBUG_ENTER("reset_logs");
/*
Flush logs for storage engines, so that the last transaction
is fsynced inside storage engines.
*/
if (ha_flush_logs(NULL))
DBUG_RETURN(1);
ha_reset_logs(thd);
/*
We need to get both locks to be sure that no one is trying to
write to the index log file.
*/
mysql_mutex_lock(&LOCK_log);
mysql_mutex_lock(&LOCK_index);
/*
The following mutex is needed to ensure that no threads call
'delete thd' as we would then risk missing a 'rollback' from this
thread. If the transaction involved MyISAM tables, it should go
into binlog even on rollback.
*/
mysql_mutex_lock(&LOCK_thread_count);
global_sid_lock->wrlock();
/* Save variables so that we can reopen the log */
save_name=name;
name=0; // Protect against free
close(LOG_CLOSE_TO_BE_OPENED);
/*
First delete all old log files and then update the index file.
As we first delete the log files and do not use sort of logging,
a crash may lead to an inconsistent state where the index has
references to non-existent files.
We need to invert the steps and use the purge_index_file methods
in order to make the operation safe.
*/
if ((err= find_log_pos(&linfo, NullS, false/*need_lock_index=false*/)) != 0)
{
uint errcode= purge_log_get_error_code(err);
sql_print_error("Failed to locate old binlog or relay log files");
my_message(errcode, ER(errcode), MYF(0));
error= 1;
goto err;
}
for (;;)
{
if ((error= my_delete_allow_opened(linfo.log_file_name, MYF(0))) != 0)
{
if (my_errno == ENOENT)
{
push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
linfo.log_file_name);
sql_print_information("Failed to delete file '%s'",
linfo.log_file_name);
my_errno= 0;
error= 0;
}
else
{
push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s; "
"consider examining correspondence "
"of your binlog index file "
"to the actual binlog files",
linfo.log_file_name);
error= 1;
goto err;
}
}
if (find_next_log(&linfo, false/*need_lock_index=false*/))
break;
}
/* Start logging with a new file */
close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED);
if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update)
{
if (my_errno == ENOENT)
{
push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
index_file_name);
sql_print_information("Failed to delete file '%s'",
index_file_name);
my_errno= 0;
error= 0;
}
else
{
push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s; "
"consider examining correspondence "
"of your binlog index file "
"to the actual binlog files",
index_file_name);
error= 1;
goto err;
}
}
#ifdef HAVE_REPLICATION
if (is_relay_log)
{
DBUG_ASSERT(active_mi != NULL);
DBUG_ASSERT(active_mi->rli != NULL);
(const_cast(active_mi->rli->get_gtid_set()))->clear();
}
else
{
gtid_state->clear();
// don't clear global_sid_map because it's used by the relay log too
if (gtid_state->init() != 0)
goto err;
}
#endif
if (!open_index_file(index_file_name, 0, false/*need_lock_index=false*/))
if ((error= open_binlog(save_name, 0, io_cache_type,
max_size, false,
false/*need_lock_index=false*/,
false/*need_sid_lock=false*/,
NULL)))
goto err;
my_free((void *) save_name);
err:
if (error == 1)
name= const_cast(save_name);
global_sid_lock->unlock();
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_unlock(&LOCK_index);
mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
/**
Set the name of crash safe index file.
@retval
0 ok
@retval
1 error
*/
int MYSQL_BIN_LOG::set_crash_safe_index_file_name(const char *base_file_name)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::set_crash_safe_index_file_name");
if (fn_format(crash_safe_index_file_name, base_file_name, mysql_data_home,
".index_crash_safe", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH |
MY_REPLACE_EXT)) == NULL)
{
error= 1;
sql_print_error("MYSQL_BIN_LOG::set_crash_safe_index_file_name failed "
"to set file name.");
}
DBUG_RETURN(error);
}
/**
Open a (new) crash safe index file.
@note
The crash safe index file is a special file
used for guaranteeing index file crash safe.
@retval
0 ok
@retval
1 error
*/
int MYSQL_BIN_LOG::open_crash_safe_index_file()
{
int error= 0;
File file= -1;
DBUG_ENTER("MYSQL_BIN_LOG::open_crash_safe_index_file");
if (!my_b_inited(&crash_safe_index_file))
{
if ((file= my_open(crash_safe_index_file_name, O_RDWR | O_CREAT | O_BINARY,
MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&crash_safe_index_file, file, IO_SIZE, WRITE_CACHE,
0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
{
error= 1;
sql_print_error("MYSQL_BIN_LOG::open_crash_safe_index_file failed "
"to open temporary index file.");
}
}
DBUG_RETURN(error);
}
/**
Close the crash safe index file.
@note
The crash safe file is just closed, is not deleted.
Because it is moved to index file later on.
@retval
0 ok
@retval
1 error
*/
int MYSQL_BIN_LOG::close_crash_safe_index_file()
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::close_crash_safe_index_file");
if (my_b_inited(&crash_safe_index_file))
{
end_io_cache(&crash_safe_index_file);
error= my_close(crash_safe_index_file.file, MYF(0));
}
memset(&crash_safe_index_file, 0, sizeof(crash_safe_index_file));
DBUG_RETURN(error);
}
/**
Delete relay log files prior to rli->group_relay_log_name
(i.e. all logs which are not involved in a non-finished group
(transaction)), remove them from the index file and start on next
relay log.
IMPLEMENTATION
- You must hold rli->data_lock before calling this function, since
it writes group_relay_log_pos and similar fields of
Relay_log_info.
- Protects index file with LOCK_index
- Delete relevant relay log files
- Copy all file names after these ones to the front of the index file
- If the OS has truncate, truncate the file, else fill it with \n'
- Read the next file name from the index file and store in rli->linfo
@param rli Relay log information
@param included If false, all relay logs that are strictly before
rli->group_relay_log_name are deleted ; if true, the
latter is deleted too (i.e. all relay logs
read by the SQL slave thread are deleted).
@note
- This is only called from the slave SQL thread when it has read
all commands from a relay log and want to switch to a new relay log.
- When this happens, we can be in an active transaction as
a transaction can span over two relay logs
(although it is always written as a single block to the master's binary
log, hence cannot span over two master's binary logs).
@retval
0 ok
@retval
LOG_INFO_EOF End of log-index-file found
@retval
LOG_INFO_SEEK Could not allocate IO cache
@retval
LOG_INFO_IO Got IO error while reading file
*/
#ifdef HAVE_REPLICATION
int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
{
int error;
char *to_purge_if_included= NULL;
DBUG_ENTER("purge_first_log");
DBUG_ASSERT(current_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
DBUG_ASSERT(is_relay_log);
DBUG_ASSERT(is_open());
DBUG_ASSERT(rli->slave_running == 1);
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->get_event_relay_log_name()));
mysql_mutex_assert_owner(&rli->data_lock);
mysql_mutex_lock(&LOCK_index);
to_purge_if_included= my_strdup(rli->get_group_relay_log_name(), MYF(0));
/*
Read the next log file name from the index file and pass it back to
the caller.
*/
if((error=find_log_pos(&rli->linfo, rli->get_event_relay_log_name(),
false/*need_lock_index=false*/)) ||
(error=find_next_log(&rli->linfo, false/*need_lock_index=false*/)))
{
char buff[22];
sql_print_error("next log error: %d offset: %s log: %s included: %d",
error,
llstr(rli->linfo.index_file_offset,buff),
rli->get_event_relay_log_name(),
included);
goto err;
}
/*
Reset rli's coordinates to the current log.
*/
rli->set_event_relay_log_pos(BIN_LOG_HEADER_SIZE);
rli->set_event_relay_log_name(rli->linfo.log_file_name);
/*
If we removed the rli->group_relay_log_name file,
we must update the rli->group* coordinates, otherwise do not touch it as the
group's execution is not finished (e.g. COMMIT not executed)
*/
if (included)
{
rli->set_group_relay_log_pos(BIN_LOG_HEADER_SIZE);
rli->set_group_relay_log_name(rli->linfo.log_file_name);
rli->notify_group_relay_log_name_update();
}
/*
Store where we are in the new file for the execution thread.
If we are in the middle of a group), then we should not store
the position in the repository, instead in that case set a flag
to true which indicates that a 'forced flush' is postponed due
to transaction split across the relaylogs.
*/
if (!rli->is_in_group())
rli->flush_info(TRUE);
else
rli->force_flush_postponed_due_to_split_trans= true;
DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE(););
mysql_mutex_lock(&rli->log_space_lock);
rli->relay_log.purge_logs(to_purge_if_included, included,
false/*need_lock_index=false*/,
false/*need_update_threads=false*/,
&rli->log_space_total, true);
// Tell the I/O thread to take the relay_log_space_limit into account
rli->ignore_log_space_limit= 0;
mysql_mutex_unlock(&rli->log_space_lock);
/*
Ok to broadcast after the critical region as there is no risk of
the mutex being destroyed by this thread later - this helps save
context switches
*/
mysql_cond_broadcast(&rli->log_space_cond);
/*
* Need to update the log pos because purge logs has been called
* after fetching initially the log pos at the begining of the method.
*/
if((error=find_log_pos(&rli->linfo, rli->get_event_relay_log_name(),
false/*need_lock_index=false*/)))
{
char buff[22];
sql_print_error("next log error: %d offset: %s log: %s included: %d",
error,
llstr(rli->linfo.index_file_offset,buff),
rli->get_group_relay_log_name(),
included);
goto err;
}
/* If included was passed, rli->linfo should be the first entry. */
DBUG_ASSERT(!included || rli->linfo.index_file_start_offset == 0);
err:
my_free(to_purge_if_included);
mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
/**
Remove logs from index file.
- To make crash safe, we copy the content of index file
from index_file_start_offset recored in log_info to
crash safe index file firstly and then move the crash
safe index file to index file.
@param linfo Store here the found log file name and
position to the NEXT log file name in
the index file.
@param need_update_threads If we want to update the log coordinates
of all threads. False for relay logs,
true otherwise.
@retval
0 ok
@retval
LOG_INFO_IO Got IO error while reading/writing file
*/
int MYSQL_BIN_LOG::remove_logs_from_index(LOG_INFO* log_info, bool need_update_threads)
{
if (open_crash_safe_index_file())
{
sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
"open the crash safe index file.");
goto err;
}
if (copy_file(&index_file, &crash_safe_index_file,
log_info->index_file_start_offset))
{
sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
"copy index file to crash safe index file.");
goto err;
}
if (close_crash_safe_index_file())
{
sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
"close the crash safe index file.");
goto err;
}
DBUG_EXECUTE_IF("fault_injection_copy_part_file", DBUG_SUICIDE(););
if (move_crash_safe_index_file_to_index_file(false/*need_lock_index=false*/))
{
sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to "
"move crash safe index file to index file.");
goto err;
}
// now update offsets in index file for running threads
if (need_update_threads)
adjust_linfo_offsets(log_info->index_file_start_offset);
return 0;
err:
return LOG_INFO_IO;
}
/**
Remove all logs before the given log from disk and from the index file.
@param to_log Delete all log file name before this file.
@param included If true, to_log is deleted too.
@param need_lock_index
@param need_update_threads If we want to update the log coordinates of
all threads. False for relay logs, true otherwise.
@param freed_log_space If not null, decrement this variable of
the amount of log space freed
@param auto_purge True if this is an automatic purge.
@note
If any of the logs before the deleted one is in use,
only purge logs up to this one.
@retval
0 ok
@retval
LOG_INFO_EOF to_log not found
LOG_INFO_EMFILE too many files opened
LOG_INFO_FATAL if any other than ENOENT error from
mysql_file_stat() or mysql_file_delete()
*/
int MYSQL_BIN_LOG::purge_logs(const char *to_log,
bool included,
bool need_lock_index,
bool need_update_threads,
ulonglong *decrease_log_space,
bool auto_purge)
{
int error= 0, no_of_log_files_to_purge= 0, no_of_log_files_purged= 0;
int no_of_threads_locking_log= 0;
bool exit_loop= 0;
LOG_INFO log_info;
THD *thd= current_thd;
DBUG_ENTER("purge_logs");
DBUG_PRINT("info",("to_log= %s",to_log));
if (need_lock_index)
mysql_mutex_lock(&LOCK_index);
else
mysql_mutex_assert_owner(&LOCK_index);
if ((error=find_log_pos(&log_info, to_log, false/*need_lock_index=false*/)))
{
sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not "
"listed in the index.", to_log);
goto err;
}
no_of_log_files_to_purge= log_info.entry_index;
if ((error= open_purge_index_file(TRUE)))
{
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to sync the index file.");
goto err;
}
/*
File name exists in index file; delete until we find this file
or a file that is used.
*/
if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/)))
goto err;
while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)))
{
if(is_active(log_info.log_file_name))
{
if(!auto_purge)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IS_ACTIVE,
ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
log_info.log_file_name);
break;
}
if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
{
if(!auto_purge)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IN_USE,
ER(ER_WARN_PURGE_LOG_IN_USE),
log_info.log_file_name, no_of_threads_locking_log,
no_of_log_files_purged, no_of_log_files_to_purge);
break;
}
no_of_log_files_purged++;
if ((error= register_purge_index_entry(log_info.log_file_name)))
{
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to copy %s to register file.",
log_info.log_file_name);
goto err;
}
if (find_next_log(&log_info, false/*need_lock_index=false*/) || exit_loop)
break;
}
DBUG_EXECUTE_IF("crash_purge_before_update_index", DBUG_SUICIDE(););
if ((error= sync_purge_index_file()))
{
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to flush register file.");
goto err;
}
/* We know how many files to delete. Update index file. */
if ((error=remove_logs_from_index(&log_info, need_update_threads)))
{
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to update the index file");
goto err;
}
// Update gtid_state->lost_gtids
if (gtid_mode > 0 && !is_relay_log)
{
global_sid_lock->wrlock();
error= init_gtid_sets(NULL,
const_cast(gtid_state->get_lost_gtids()),
NULL,
opt_master_verify_checksum,
false/*false=don't need lock*/);
global_sid_lock->unlock();
if (error)
goto err;
}
DBUG_EXECUTE_IF("crash_purge_critical_after_update_index", DBUG_SUICIDE(););
err:
int error_index= 0, close_error_index= 0;
/* Read each entry from purge_index_file and delete the file. */
if (!error && is_inited_purge_index_file() &&
(error_index= purge_index_entry(thd, decrease_log_space, false/*need_lock_index=false*/)))
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to process registered files"
" that would be purged.");
close_error_index= close_purge_index_file();
DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICIDE(););
if (need_lock_index)
mysql_mutex_unlock(&LOCK_index);
/*
Error codes from purge logs take precedence.
Then error codes from purging the index entry.
Finally, error codes from closing the purge index file.
*/
error= error ? error : (error_index ? error_index :
close_error_index);
DBUG_RETURN(error);
}
int MYSQL_BIN_LOG::set_purge_index_file_name(const char *base_file_name)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::set_purge_index_file_name");
if (fn_format(purge_index_file_name, base_file_name, mysql_data_home,
".~rec~", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH |
MY_REPLACE_EXT)) == NULL)
{
error= 1;
sql_print_error("MYSQL_BIN_LOG::set_purge_index_file_name failed to set "
"file name.");
}
DBUG_RETURN(error);
}
int MYSQL_BIN_LOG::open_purge_index_file(bool destroy)
{
int error= 0;
File file= -1;
DBUG_ENTER("MYSQL_BIN_LOG::open_purge_index_file");
if (destroy)
close_purge_index_file();
if (!my_b_inited(&purge_index_file))
{
if ((file= my_open(purge_index_file_name, O_RDWR | O_CREAT | O_BINARY,
MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&purge_index_file, file, IO_SIZE,
(destroy ? WRITE_CACHE : READ_CACHE),
0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
{
error= 1;
sql_print_error("MYSQL_BIN_LOG::open_purge_index_file failed to open register "
" file.");
}
}
DBUG_RETURN(error);
}
int MYSQL_BIN_LOG::close_purge_index_file()
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::close_purge_index_file");
if (my_b_inited(&purge_index_file))
{
end_io_cache(&purge_index_file);
error= my_close(purge_index_file.file, MYF(0));
}
my_delete(purge_index_file_name, MYF(0));
memset(&purge_index_file, 0, sizeof(purge_index_file));
DBUG_RETURN(error);
}
bool MYSQL_BIN_LOG::is_inited_purge_index_file()
{
DBUG_ENTER("MYSQL_BIN_LOG::is_inited_purge_index_file");
DBUG_RETURN (my_b_inited(&purge_index_file));
}
int MYSQL_BIN_LOG::sync_purge_index_file()
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::sync_purge_index_file");
if ((error= flush_io_cache(&purge_index_file)) ||
(error= my_sync(purge_index_file.file, MYF(MY_WME))))
DBUG_RETURN(error);
DBUG_RETURN(error);
}
int MYSQL_BIN_LOG::register_purge_index_entry(const char *entry)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::register_purge_index_entry");
if ((error=my_b_write(&purge_index_file, (const uchar*)entry, strlen(entry))) ||
(error=my_b_write(&purge_index_file, (const uchar*)"\n", 1)))
DBUG_RETURN (error);
DBUG_RETURN(error);
}
int MYSQL_BIN_LOG::register_create_index_entry(const char *entry)
{
DBUG_ENTER("MYSQL_BIN_LOG::register_create_index_entry");
DBUG_RETURN(register_purge_index_entry(entry));
}
int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space,
bool need_lock_index)
{
MY_STAT s;
int error= 0;
LOG_INFO log_info;
LOG_INFO check_log_info;
DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry");
DBUG_ASSERT(my_b_inited(&purge_index_file));
if ((error=reinit_io_cache(&purge_index_file, READ_CACHE, 0, 0, 0)))
{
sql_print_error("MYSQL_BIN_LOG::purge_index_entry failed to reinit register file "
"for read");
goto err;
}
for (;;)
{
uint length;
if ((length=my_b_gets(&purge_index_file, log_info.log_file_name,
FN_REFLEN)) <= 1)
{
if (purge_index_file.error)
{
error= purge_index_file.error;
sql_print_error("MYSQL_BIN_LOG::purge_index_entry error %d reading from "
"register file.", error);
goto err;
}
/* Reached EOF */
break;
}
/* Get rid of the trailing '\n' */
log_info.log_file_name[length-1]= 0;
if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &s, MYF(0)))
{
if (my_errno == ENOENT)
{
/*
It's not fatal if we can't stat a log file that does not exist;
If we could not stat, we won't delete.
*/
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
log_info.log_file_name);
}
sql_print_information("Failed to execute mysql_file_stat on file '%s'",
log_info.log_file_name);
my_errno= 0;
}
else
{
/*
Other than ENOENT are fatal
*/
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with getting info on being purged %s; "
"consider examining correspondence "
"of your binlog index file "
"to the actual binlog files",
log_info.log_file_name);
}
else
{
sql_print_information("Failed to delete log file '%s'; "
"consider examining correspondence "
"of your binlog index file "
"to the actual binlog files",
log_info.log_file_name);
}
error= LOG_INFO_FATAL;
goto err;
}
}
else
{
if ((error= find_log_pos(&check_log_info, log_info.log_file_name,
need_lock_index)))
{
if (error != LOG_INFO_EOF)
{
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s and "
"reading the binlog index file",
log_info.log_file_name);
}
else
{
sql_print_information("Failed to delete file '%s' and "
"read the binlog index file",
log_info.log_file_name);
}
goto err;
}
error= 0;
if (!need_lock_index)
{
/*
This is to avoid triggering an error in NDB.
@todo: This is weird, what does NDB errors have to do with
need_lock_index? Explain better or refactor /Sven
*/
ha_binlog_index_purge_file(current_thd, log_info.log_file_name);
}
DBUG_PRINT("info",("purging %s",log_info.log_file_name));
if (!mysql_file_delete(key_file_binlog, log_info.log_file_name, MYF(0)))
{
if (decrease_log_space)
*decrease_log_space-= s.st_size;
}
else
{
if (my_errno == ENOENT)
{
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
log_info.log_file_name);
}
sql_print_information("Failed to delete file '%s'",
log_info.log_file_name);
my_errno= 0;
}
else
{
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s; "
"consider examining correspondence "
"of your binlog index file "
"to the actual binlog files",
log_info.log_file_name);
}
else
{
sql_print_information("Failed to delete file '%s'; "
"consider examining correspondence "
"of your binlog index file "
"to the actual binlog files",
log_info.log_file_name);
}
if (my_errno == EMFILE)
{
DBUG_PRINT("info",
("my_errno: %d, set ret = LOG_INFO_EMFILE", my_errno));
error= LOG_INFO_EMFILE;
goto err;
}
error= LOG_INFO_FATAL;
goto err;
}
}
}
}
}
err:
DBUG_RETURN(error);
}
/**
Remove all logs before the given file date from disk and from the
index file.
@param thd Thread pointer
@param purge_time Delete all log files before given date.
@param auto_purge True if this is an automatic purge.
@note
If any of the logs before the deleted one is in use,
only purge logs up to this one.
@retval
0 ok
@retval
LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated
LOG_INFO_FATAL if any other than ENOENT error from
mysql_file_stat() or mysql_file_delete()
*/
int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time, bool auto_purge)
{
int error;
int no_of_threads_locking_log= 0, no_of_log_files_purged= 0;
bool log_is_active= false, log_is_in_use= false;
char to_log[FN_REFLEN], copy_log_in_use[FN_REFLEN];
LOG_INFO log_info;
MY_STAT stat_area;
THD *thd= current_thd;
DBUG_ENTER("purge_logs_before_date");
mysql_mutex_lock(&LOCK_index);
to_log[0]= 0;
if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/)))
goto err;
while (!(log_is_active= is_active(log_info.log_file_name)))
{
if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
{
if (!auto_purge)
{
log_is_in_use= true;
strcpy(copy_log_in_use, log_info.log_file_name);
}
break;
}
no_of_log_files_purged++;
if (!mysql_file_stat(m_key_file_log,
log_info.log_file_name, &stat_area, MYF(0)))
{
if (my_errno == ENOENT)
{
/*
It's not fatal if we can't stat a log file that does not exist.
*/
my_errno= 0;
}
else
{
/*
Other than ENOENT are fatal
*/
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with getting info on being purged %s; "
"consider examining correspondence "
"of your binlog index file "
"to the actual binlog files",
log_info.log_file_name);
}
else
{
sql_print_information("Failed to delete log file '%s'",
log_info.log_file_name);
}
error= LOG_INFO_FATAL;
goto err;
}
}
else
{
if (stat_area.st_mtime < purge_time)
strmake(to_log,
log_info.log_file_name,
sizeof(log_info.log_file_name) - 1);
else
break;
}
if (find_next_log(&log_info, false/*need_lock_index=false*/))
break;
}
if (log_is_active)
{
if(!auto_purge)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IS_ACTIVE,
ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
log_info.log_file_name);
}
if (log_is_in_use)
{
int no_of_log_files_to_purge= no_of_log_files_purged+1;
while (strcmp(log_file_name, log_info.log_file_name))
{
if (mysql_file_stat(m_key_file_log, log_info.log_file_name,
&stat_area, MYF(0)))
{
if (stat_area.st_mtime < purge_time)
no_of_log_files_to_purge++;
else
break;
}
if (find_next_log(&log_info, false/*need_lock_index=false*/))
{
no_of_log_files_to_purge++;
break;
}
}
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IN_USE,
ER(ER_WARN_PURGE_LOG_IN_USE),
copy_log_in_use, no_of_threads_locking_log,
no_of_log_files_purged, no_of_log_files_to_purge);
}
error= (to_log[0] ? purge_logs(to_log, true,
false/*need_lock_index=false*/,
true/*need_update_threads=true*/,
(ulonglong *) 0, auto_purge) : 0);
err:
mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
#endif /* HAVE_REPLICATION */
/**
Create a new log file name.
@param buf buf of at least FN_REFLEN where new name is stored
@note
If file name will be longer then FN_REFLEN it will be truncated
*/
void MYSQL_BIN_LOG::make_log_name(char* buf, const char* log_ident)
{
uint dir_len = dirname_length(log_file_name);
if (dir_len >= FN_REFLEN)
dir_len=FN_REFLEN-1;
strnmov(buf, log_file_name, dir_len);
strmake(buf+dir_len, log_ident, FN_REFLEN - dir_len -1);
}
/**
Check if we are writing/reading to the given log file.
*/
bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg)
{
return !strcmp(log_file_name, log_file_name_arg);
}
/*
Wrappers around new_file_impl to avoid using argument
to control locking. The argument 1) less readable 2) breaks
incapsulation 3) allows external access to the class without
a lock (which is not possible with private new_file_without_locking
method).
@retval
nonzero - error
*/
int MYSQL_BIN_LOG::new_file(Format_description_log_event *extra_description_event)
{
return new_file_impl(true/*need_lock_log=true*/, extra_description_event);
}
/*
@retval
nonzero - error
*/
int MYSQL_BIN_LOG::new_file_without_locking(Format_description_log_event *extra_description_event)
{
return new_file_impl(false/*need_lock_log=false*/, extra_description_event);
}
/**
Start writing to a new log file or reopen the old file.
@param need_lock_log If true, this function acquires LOCK_log;
otherwise the caller should already have acquired it.
@retval 0 success
@retval nonzero - error
@note The new file name is stored last in the index file
*/
int MYSQL_BIN_LOG::new_file_impl(bool need_lock_log, Format_description_log_event *extra_description_event)
{
int error= 0, close_on_error= FALSE;
char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open;
DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl");
if (!is_open())
{
DBUG_PRINT("info",("log is closed"));
DBUG_RETURN(error);
}
if (need_lock_log)
mysql_mutex_lock(&LOCK_log);
else
mysql_mutex_assert_owner(&LOCK_log);
DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
DEBUG_SYNC(current_thd, "before_rotate_binlog"););
mysql_mutex_lock(&LOCK_xids);
/*
We need to ensure that the number of prepared XIDs are 0.
If m_prep_xids is not zero:
- We wait for storage engine commit, hence decrease m_prep_xids
- We keep the LOCK_log to block new transactions from being
written to the binary log.
*/
while (get_prep_xids() > 0)
{
DEBUG_SYNC(current_thd, "before_rotate_binlog_file");
mysql_cond_wait(&m_prep_xids_cond, &LOCK_xids);
}
mysql_mutex_unlock(&LOCK_xids);
mysql_mutex_lock(&LOCK_index);
if (DBUG_EVALUATE_IF("expire_logs_always", 0, 1)
&& (error= ha_flush_logs(NULL)))
goto end;
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_index);
/*
If user hasn't specified an extension, generate a new log name
We have to do this here and not in open as we want to store the
new file name in the current binary log file.
*/
new_name_ptr= new_name;
if ((error= generate_new_name(new_name, name)))
{
// Use the old name if generation of new name fails.
strcpy(new_name, name);
close_on_error= TRUE;
goto end;
}
else
{
/*
We log the whole file name for log file as the user may decide
to change base names at some point.
*/
Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET,
is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
/*
The current relay-log's closing Rotate event must have checksum
value computed with an algorithm of the last relay-logged FD event.
*/
if (is_relay_log)
r.checksum_alg= relay_log_checksum_alg;
DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
(error= r.write(&log_file)))
{
char errbuf[MYSYS_STRERROR_SIZE];
DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;);
close_on_error= TRUE;
my_printf_error(ER_ERROR_ON_WRITE, ER(ER_CANT_OPEN_FILE),
MYF(ME_FATALERROR), name,
errno, my_strerror(errbuf, sizeof(errbuf), errno));
goto end;
}
bytes_written += r.data_written;
}
/*
Update needs to be signalled even if there is no rotate event
log rotation should give the waiting thread a signal to
discover EOF and move on to the next log.
*/
signal_update();
old_name=name;
name=0; // Don't free name
close(LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX);
if (checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF)
{
DBUG_ASSERT(!is_relay_log);
DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset);
binlog_checksum_options= checksum_alg_reset;
}
/*
Note that at this point, log_state != LOG_CLOSED (important for is_open()).
*/
DEBUG_SYNC(current_thd, "before_rotate_binlog_file");
/*
new_file() is only used for rotation (in FLUSH LOGS or because size >
max_binlog_size or max_relay_log_size).
If this is a binary log, the Format_description_log_event at the beginning of
the new file should have created=0 (to distinguish with the
Format_description_log_event written at server startup, which should
trigger temp tables deletion on slaves.
*/
/* reopen index binlog file, BUG#34582 */
file_to_open= index_file_name;
error= open_index_file(index_file_name, 0, false/*need_lock_index=false*/);
if (!error)
{
/* reopen the binary log file. */
file_to_open= new_name_ptr;
error= open_binlog(old_name, new_name_ptr, io_cache_type,
max_size, true/*null_created_arg=true*/,
false/*need_lock_index=false*/,
true/*need_sid_lock=true*/,
extra_description_event);
}
/* handle reopening errors */
if (error)
{
char errbuf[MYSYS_STRERROR_SIZE];
my_printf_error(ER_CANT_OPEN_FILE, ER(ER_CANT_OPEN_FILE),
MYF(ME_FATALERROR), file_to_open,
error, my_strerror(errbuf, sizeof(errbuf), error));
close_on_error= TRUE;
}
my_free(old_name);
end:
if (error && close_on_error /* rotate or reopen failed */)
{
/*
Close whatever was left opened.
We are keeping the behavior as it exists today, ie,
we disable logging and move on (see: BUG#51014).
TODO: as part of WL#1790 consider other approaches:
- kill mysql (safety);
- try multiple locations for opening a log file;
- switch server to protected/readonly mode
- ...
*/
close(LOG_CLOSE_INDEX);
if (binlog_error_action == ABORT_SERVER)
{
exec_binlog_error_action_abort("Either disk is full or file system is"
" read only while rotating the binlog."
" Aborting the server.");
}
else
sql_print_error("Could not open %s for logging (error %d). "
"Turning logging off for the whole duration "
"of the MySQL server process. To turn it on "
"again: fix the cause, shutdown the MySQL "
"server and restart it.",
new_name_ptr, errno);
}
mysql_mutex_unlock(&LOCK_index);
if (need_lock_log)
mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
#ifdef HAVE_REPLICATION
/**
Called after an event has been written to the relay log by the IO
thread. This flushes and possibly syncs the file (according to the
sync options), rotates the file if it has grown over the limit, and
finally calls signal_update().
@note The caller must hold LOCK_log before invoking this function.
@param mi Master_info for the IO thread.
@param need_data_lock If true, mi->data_lock will be acquired if a
rotation is needed. Otherwise, mi->data_lock must be held by the
caller.
@retval false success
@retval true error
*/
bool MYSQL_BIN_LOG::after_append_to_relay_log(Master_info *mi)
{
DBUG_ENTER("MYSQL_BIN_LOG::after_append_to_relay_log");
DBUG_PRINT("info",("max_size: %lu",max_size));
// Check pre-conditions
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_owner(&mi->data_lock);
DBUG_ASSERT(is_relay_log);
DBUG_ASSERT(current_thd->system_thread == SYSTEM_THREAD_SLAVE_IO);
// Flush and sync
bool error= false;
if (flush_and_sync(0) == 0)
{
DBUG_EXECUTE_IF ("set_max_size_zero",
{max_size=0;});
// If relay log is too big, rotate
if ((uint) my_b_append_tell(&log_file) >
DBUG_EVALUATE_IF("rotate_slave_debug_group", 500, max_size))
{
error= new_file_without_locking(mi->get_mi_description_event());
DBUG_EXECUTE_IF ("set_max_size_zero",
{
max_size=1073741824;
DBUG_SET("-d,set_max_size_zero");
DBUG_SET("-d,flush_after_reading_gtid_event");
});
}
}
signal_update();
DBUG_RETURN(error);
}
bool MYSQL_BIN_LOG::append_event(Log_event* ev, Master_info *mi)
{
DBUG_ENTER("MYSQL_BIN_LOG::append");
// check preconditions
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
DBUG_ASSERT(is_relay_log);
// acquire locks
mysql_mutex_lock(&LOCK_log);
// write data
bool error = false;
if (ev->write(&log_file) == 0)
{
bytes_written+= ev->data_written;
error= after_append_to_relay_log(mi);
}
else
error= true;
mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
bool MYSQL_BIN_LOG::append_buffer(const char* buf, uint len, Master_info *mi)
{
DBUG_ENTER("MYSQL_BIN_LOG::append_buffer");
// check preconditions
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
DBUG_ASSERT(is_relay_log);
mysql_mutex_assert_owner(&LOCK_log);
// write data
bool error= false;
if (my_b_append(&log_file,(uchar*) buf,len) == 0)
{
bytes_written += len;
error= after_append_to_relay_log(mi);
}
else
error= true;
DBUG_RETURN(error);
}
#endif // ifdef HAVE_REPLICATION
bool MYSQL_BIN_LOG::flush_and_sync(const bool force)
{
mysql_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file))
return 1;
std::pair result= sync_binlog_file(force);
return result.first;
}
void MYSQL_BIN_LOG::start_union_events(THD *thd, query_id_t query_id_param)
{
DBUG_ASSERT(!thd->binlog_evt_union.do_union);
thd->binlog_evt_union.do_union= TRUE;
thd->binlog_evt_union.unioned_events= FALSE;
thd->binlog_evt_union.unioned_events_trans= FALSE;
thd->binlog_evt_union.first_query_id= query_id_param;
}
void MYSQL_BIN_LOG::stop_union_events(THD *thd)
{
DBUG_ASSERT(thd->binlog_evt_union.do_union);
thd->binlog_evt_union.do_union= FALSE;
}
bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param)
{
return (thd->binlog_evt_union.do_union &&
query_id_param >= thd->binlog_evt_union.first_query_id);
}
/*
Updates thd's position-of-next-event variables
after a *real* write a file.
*/
void MYSQL_BIN_LOG::update_thd_next_event_pos(THD* thd)
{
if (likely(thd != NULL))
{
thd->set_next_event_pos(log_file_name,
my_b_tell(&log_file));
}
}
/*
Moves the last bunch of rows from the pending Rows event to a cache (either
transactional cache if is_transaction is @c true, or the non-transactional
cache otherwise. Sets a new pending event.
@param thd a pointer to the user thread.
@param evt a pointer to the row event.
@param is_transactional @c true indicates a transactional cache,
otherwise @c false a non-transactional.
*/
int
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
Rows_log_event* event,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
int error= 0;
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(thd);
DBUG_ASSERT(cache_mngr);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(is_transactional);
DBUG_PRINT("info", ("cache_mngr->pending(): 0x%lx", (long) cache_data->pending()));
if (Rows_log_event* pending= cache_data->pending())
{
/*
Write pending event to the cache.
*/
if (cache_data->write_event(thd, pending))
{
set_write_error(thd, is_transactional);
if (check_write_error(thd) && cache_data &&
stmt_cannot_safely_rollback(thd))
cache_data->set_incident();
delete pending;
cache_data->set_pending(NULL);
DBUG_RETURN(1);
}
delete pending;
}
cache_data->set_pending(event);
DBUG_RETURN(error);
}
/**
Write an event to the binary log.
*/
bool MYSQL_BIN_LOG::write_event(Log_event *event_info)
{
THD *thd= event_info->thd;
bool error= 1;
DBUG_ENTER("MYSQL_BIN_LOG::write_event(Log_event *)");
if (thd->binlog_evt_union.do_union)
{
/*
In Stored function; Remember that function call caused an update.
We will log the function call to the binary log on function exit
*/
thd->binlog_evt_union.unioned_events= TRUE;
thd->binlog_evt_union.unioned_events_trans |=
event_info->is_using_trans_cache();
DBUG_RETURN(0);
}
/*
We only end the statement if we are in a top-level statement. If
we are inside a stored function, we do not end the statement since
this will close all tables on the slave. But there can be a special case
where we are inside a stored function/trigger and a SAVEPOINT is being
set in side the stored function/trigger. This SAVEPOINT execution will
force the pending event to be flushed without an STMT_END_F flag. This
will result in a case where following DMLs will be considered as part of
same statement and result in data loss on slave. Hence in this case we
force the end_stmt to be true.
*/
bool const end_stmt= (thd->in_sub_stmt && thd->lex->sql_command ==
SQLCOM_SAVEPOINT)? true:
(thd->locked_tables_mode && thd->lex->requires_prelocking());
if (thd->binlog_flush_pending_rows_event(end_stmt,
event_info->is_using_trans_cache()))
DBUG_RETURN(error);
/*
In most cases this is only called if 'is_open()' is true; in fact this is
mostly called if is_open() *was* true a few instructions before, but it
could have changed since.
*/
if (likely(is_open()))
{
#ifdef HAVE_REPLICATION
/*
In the future we need to add to the following if tests like
"do the involved tables match (to be implemented)
binlog_[wild_]{do|ignore}_table?" (WL#1049)"
*/
const char *local_db= event_info->get_db();
if ((thd && !(thd->variables.option_bits & OPTION_BIN_LOG)) ||
(thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT &&
thd->lex->sql_command != SQLCOM_SAVEPOINT &&
(!event_info->is_no_filter_event() &&
!binlog_filter->db_ok(local_db))))
DBUG_RETURN(0);
#endif /* HAVE_REPLICATION */
DBUG_ASSERT(event_info->is_using_trans_cache() || event_info->is_using_stmt_cache());
if (binlog_start_trans_and_stmt(thd, event_info))
DBUG_RETURN(error);
bool is_trans_cache= event_info->is_using_trans_cache();
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(is_trans_cache);
DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
/*
No check for auto events flag here - this write method should
never be called if auto-events are enabled.
Write first log events which describe the 'run environment'
of the SQL command. If row-based binlogging, Insert_id, Rand
and other kind of "setting context" events are not needed.
*/
if (thd)
{
if (!thd->is_current_stmt_binlog_format_row())
{
if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
{
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog,
event_info->event_cache_type, event_info->event_logging_type);
if (cache_data->write_event(thd, &e))
goto err;
}
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
{
DBUG_PRINT("info",("number of auto_inc intervals: %u",
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
nb_elements()));
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum(), event_info->event_cache_type,
event_info->event_logging_type);
if (cache_data->write_event(thd, &e))
goto err;
}
if (thd->rand_used)
{
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
event_info->event_cache_type,
event_info->event_logging_type);
if (cache_data->write_event(thd, &e))
goto err;
}
if (thd->user_var_events.elements)
{
for (uint i= 0; i < thd->user_var_events.elements; i++)
{
BINLOG_USER_VAR_EVENT *user_var_event;
get_dynamic(&thd->user_var_events,(uchar*) &user_var_event, i);
/* setting flags for user var log event */
uchar flags= User_var_log_event::UNDEF_F;
if (user_var_event->unsigned_flag)
flags|= User_var_log_event::UNSIGNED_F;
User_var_log_event e(thd,
user_var_event->user_var_event->entry_name.ptr(),
user_var_event->user_var_event->entry_name.length(),
user_var_event->value,
user_var_event->length,
user_var_event->type,
user_var_event->charset_number, flags,
event_info->event_cache_type,
event_info->event_logging_type);
if (cache_data->write_event(thd, &e))
goto err;
}
}
}
}
/*
Write the event.
*/
if (cache_data->write_event(thd, event_info) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
goto err;
/*
After writing the event, if the trx-cache was used and any unsafe
change was written into it, the cache is marked as cannot safely
roll back.
*/
if (is_trans_cache && stmt_cannot_safely_rollback(thd))
cache_mngr->trx_cache.set_cannot_rollback();
error= 0;
err:
if (error)
{
set_write_error(thd, is_trans_cache);
if (check_write_error(thd) && cache_data &&
stmt_cannot_safely_rollback(thd))
cache_data->set_incident();
}
}
DBUG_RETURN(error);
}
/**
The method executes rotation when LOCK_log is already acquired
by the caller.
@param force_rotate caller can request the log rotation
@param check_purge is set to true if rotation took place
@note
If rotation fails, for instance the server was unable
to create a new log file, we still try to write an
incident event to the current log.
@note The caller must hold LOCK_log when invoking this function.
@retval
nonzero - error in rotating routine.
*/
int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::rotate");
DBUG_ASSERT(!is_relay_log);
mysql_mutex_assert_owner(&LOCK_log);
*check_purge= false;
if (DBUG_EVALUATE_IF("force_rotate", 1, 0) || force_rotate ||
(my_b_tell(&log_file) >= (my_off_t) max_size))
{
error= new_file_without_locking(NULL);
*check_purge= true;
}
DBUG_RETURN(error);
}
/**
The method executes logs purging routine.
@retval
nonzero - error in rotating routine.
*/
void MYSQL_BIN_LOG::purge()
{
#ifdef HAVE_REPLICATION
if (expire_logs_days)
{
DEBUG_SYNC(current_thd, "at_purge_logs_before_date");
time_t purge_time= my_time(0) - expire_logs_days*24*60*60;
DBUG_EXECUTE_IF("expire_logs_always",
{ purge_time= my_time(0);});
if (purge_time >= 0)
{
/*
Flush logs for storage engines, so that the last transaction
is fsynced inside storage engines.
*/
ha_flush_logs(NULL);
purge_logs_before_date(purge_time, true);
}
}
#endif
}
/**
The method is a shortcut of @c rotate() and @c purge().
LOCK_log is acquired prior to rotate and is released after it.
@param force_rotate caller can request the log rotation
@retval
nonzero - error in rotating routine.
*/
int MYSQL_BIN_LOG::rotate_and_purge(THD* thd, bool force_rotate)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge");
bool check_purge= false;
/*
Wait for handlerton to insert any pending information into the binlog.
For e.g. ha_ndbcluster which updates the binlog asynchronously this is
needed so that the user see its own commands in the binlog.
*/
ha_binlog_wait(thd);
DBUG_ASSERT(!is_relay_log);
mysql_mutex_lock(&LOCK_log);
error= rotate(force_rotate, &check_purge);
/*
NOTE: Run purge_logs wo/ holding LOCK_log because it does not need
the mutex. Otherwise causes various deadlocks.
*/
mysql_mutex_unlock(&LOCK_log);
if (!error && check_purge)
purge();
DBUG_RETURN(error);
}
uint MYSQL_BIN_LOG::next_file_id()
{
uint res;
mysql_mutex_lock(&LOCK_log);
res = file_id++;
mysql_mutex_unlock(&LOCK_log);
return res;
}
/**
Calculate checksum of possibly a part of an event containing at least
the whole common header.
@param buf the pointer to trans cache's buffer
@param off the offset of the beginning of the event in the buffer
@param event_len no-checksum length of the event
@param length the current size of the buffer
@param crc [in-out] the checksum
Event size in incremented by @c BINLOG_CHECKSUM_LEN.
@return 0 or number of unprocessed yet bytes of the event excluding
the checksum part.
*/
static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
uint length, ha_checksum *crc)
{
ulong ret;
uchar *event_begin= buf + off;
uint16 flags= uint2korr(event_begin + FLAGS_OFFSET);
DBUG_ASSERT(length >= off + LOG_EVENT_HEADER_LEN); //at least common header in
int2store(event_begin + FLAGS_OFFSET, flags);
ret= length >= off + event_len ? 0 : off + event_len - length;
*crc= my_checksum(*crc, event_begin, event_len - ret);
return ret;
}
/*
Write the contents of a cache to the binary log.
SYNOPSIS
do_write_cache()
cache Cache to write to the binary log
lock_log True if the LOCK_log mutex should be aquired, false otherwise
DESCRIPTION
Write the contents of the cache to the binary log. The cache will
be reset as a READ_CACHE to be able to read the contents from it.
Reading from the trans cache with possible (per @c binlog_checksum_options)
adding checksum value and then fixing the length and the end_log_pos of
events prior to fill in the binlog cache.
*/
int MYSQL_BIN_LOG::do_write_cache(IO_CACHE *cache)
{
DBUG_ENTER("MYSQL_BIN_LOG::do_write_cache(IO_CACHE *)");
DBUG_EXECUTE_IF("simulate_do_write_cache_failure",
{
/*
see binlog_cache_data::write_event() that reacts on
@c simulate_disk_full_at_flush_pending.
*/
DBUG_SET("-d,simulate_do_write_cache_failure");
DBUG_RETURN(ER_ERROR_ON_WRITE);
});
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
DBUG_RETURN(ER_ERROR_ON_WRITE);
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
ulong remains= 0; // part of unprocessed yet netto length of the event
long val;
ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy
my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
uchar buf[BINLOG_CHECKSUM_LEN];
// while there is just one alg the following must hold:
DBUG_ASSERT(!do_checksum ||
binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
/*
The events in the buffer have incorrect end_log_pos data
(relative to beginning of group rather than absolute),
so we'll recalculate them in situ so the binlog is always
correct, even in the middle of a group. This is possible
because we now know the start position of the group (the
offset of this cache in the log, if you will); all we need
to do is to find all event-headers, and add the position of
the group to the end_log_pos of each event. This is pretty
straight forward, except that we read the cache in segments,
so an event-header might end up on the cache-border and get
split.
*/
group= (uint)my_b_tell(&log_file);
DBUG_PRINT("debug", ("length: %llu, group: %llu",
(ulonglong) length, (ulonglong) group));
hdr_offs= carry= 0;
if (do_checksum)
crc= crc_0= my_checksum(0L, NULL, 0);
if (DBUG_EVALUATE_IF("fault_injection_crc_value", 1, 0))
crc= crc - 1;
do
{
/*
if we only got a partial header in the last iteration,
get the other half now and process a full header.
*/
if (unlikely(carry > 0))
{
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
/* assemble both halves */
memcpy(&header[carry], (char *)cache->read_pos,
LOG_EVENT_HEADER_LEN - carry);
/* fix end_log_pos */
val=uint4korr(header + LOG_POS_OFFSET);
val+= group +
(end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(&header[LOG_POS_OFFSET], val);
if (do_checksum)
{
ulong len= uint4korr(header + EVENT_LEN_OFFSET);
/* fix len */
int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
}
/* write the first half of the split header */
if (my_b_write(&log_file, header, carry))
DBUG_RETURN(ER_ERROR_ON_WRITE);
/*
copy fixed second half of header to cache so the correct
version will be written later.
*/
memcpy((char *)cache->read_pos, &header[carry],
LOG_EVENT_HEADER_LEN - carry);
/* next event header at ... */
hdr_offs= uint4korr(header + EVENT_LEN_OFFSET) - carry -
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
if (do_checksum)
{
DBUG_ASSERT(crc == crc_0 && remains == 0);
crc= my_checksum(crc, header, carry);
remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
BINLOG_CHECKSUM_LEN;
}
carry= 0;
}
/* if there is anything to write, process it. */
if (likely(length > 0))
{
/*
process all event-headers in this (partial) cache.
if next header is beyond current read-buffer,
we'll get it later (though not necessarily in the
very next iteration, just "eventually").
*/
/* crc-calc the whole buffer */
if (do_checksum && hdr_offs >= length)
{
DBUG_ASSERT(remains != 0 && crc != crc_0);
crc= my_checksum(crc, cache->read_pos, length);
remains -= length;
if (my_b_write(&log_file, cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0)
{
int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0;
}
}
while (hdr_offs < length)
{
/*
partial header only? save what we can get, process once
we get the rest.
*/
if (do_checksum)
{
if (remains != 0)
{
/*
finish off with remains of the last event that crawls
from previous into the current buffer
*/
DBUG_ASSERT(crc != crc_0);
crc= my_checksum(crc, cache->read_pos, hdr_offs);
int4store(buf, crc);
remains -= hdr_offs;
DBUG_ASSERT(remains == 0);
if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0;
}
}
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
memcpy(header, (char *)cache->read_pos + hdr_offs, carry);
length= hdr_offs;
}
else
{
/* we've got a full event-header, and it came in one piece */
uchar *ev= (uchar *)cache->read_pos + hdr_offs;
uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
uchar *log_pos= ev + LOG_POS_OFFSET;
/* fix end_log_pos */
val= uint4korr(log_pos) + group +
(end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(log_pos, val);
/* fix CRC */
if (do_checksum)
{
/* fix length */
int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN);
remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
length, &crc);
if (my_b_write(&log_file, ev,
remains == 0 ? event_len : length - hdr_offs))
DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0)
{
int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0; // crc is complete
}
}
/* next event header at ... */
hdr_offs += event_len; // incr by the netto len
DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
}
}
/*
Adjust hdr_offs. Note that it may still point beyond the segment
read in the next iteration; if the current event is very long,
it may take a couple of read-iterations (and subsequent adjustments
of hdr_offs) for it to point into the then-current segment.
If we have a split header (!carry), hdr_offs will be set at the
beginning of the next iteration, overwriting the value we set here:
*/
hdr_offs -= length;
}
/* Write the entire buf to the binary log file */
if (!do_checksum)
if (my_b_write(&log_file, cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE);
cache->read_pos=cache->read_end; // Mark buffer used up
} while ((length= my_b_fill(cache)));
DBUG_ASSERT(carry == 0);
DBUG_ASSERT(!do_checksum || remains == 0);
DBUG_ASSERT(!do_checksum || crc == crc_0);
DBUG_RETURN(0); // All OK
}
/**
Writes an incident event to the binary log.
@param ev Incident event to be written
@param need_lock_log If true, will acquire LOCK_log; otherwise the
caller should already have acquired LOCK_log.
@do_flush_and_sync If true, will call flush_and_sync(), rotate() and
purge().
@retval false error
@retval true success
*/
bool MYSQL_BIN_LOG::write_incident(Incident_log_event *ev, bool need_lock_log,
bool do_flush_and_sync)
{
uint error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
if (!is_open())
DBUG_RETURN(error);
if (need_lock_log)
mysql_mutex_lock(&LOCK_log);
else
mysql_mutex_assert_owner(&LOCK_log);
// @todo make this work with the group log. /sven
error= ev->write(&log_file);
if (do_flush_and_sync)
{
if (!error && !(error= flush_and_sync()))
{
bool check_purge= false;
signal_update();
error= rotate(true, &check_purge);
if (!error && check_purge)
purge();
}
}
if (need_lock_log)
mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
/**
Creates an incident event and writes it to the binary log.
@param thd Thread variable
@param ev Incident event to be written
@param lock If the binary lock should be locked or not
@retval
0 error
@retval
1 success
*/
bool MYSQL_BIN_LOG::write_incident(THD *thd, bool need_lock_log,
bool do_flush_and_sync)
{
DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
if (!is_open())
DBUG_RETURN(0);
LEX_STRING const write_error_msg=
{ C_STRING_WITH_LEN("error writing to the binary log") };
Incident incident= INCIDENT_LOST_EVENTS;
Incident_log_event ev(thd, incident, write_error_msg);
DBUG_RETURN(write_incident(&ev, need_lock_log, do_flush_and_sync));
}
/**
Write a cached log entry to the binary log.
@param thd Thread variable
@param cache The cache to copy to the binlog
@param incident Defines if an incident event should be created to
notify that some non-transactional changes did
not get into the binlog.
@param prepared Defines if a transaction is part of a 2-PC.
@note
We only come here if there is something in the cache.
@note
The thing in the cache is always a complete transaction.
@note
'cache' needs to be reinitialized after this functions returns.
*/
bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data)
{
DBUG_ENTER("MYSQL_BIN_LOG::write_cache(THD *, binlog_cache_data *, bool)");
IO_CACHE *cache= &cache_data->cache_log;
bool incident= cache_data->has_incident();
DBUG_EXECUTE_IF("simulate_binlog_flush_error",
{
if (rand() % 3 == 0)
{
write_error=1;
thd->commit_error= THD::CE_FLUSH_ERROR;
DBUG_RETURN(0);
}
};);
mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
/*
We only bother to write to the binary log if there is anything
to write.
*/
if (my_b_tell(cache) > 0)
{
DBUG_EXECUTE_IF("crash_before_writing_xid",
{
if ((write_error= do_write_cache(cache)))
DBUG_PRINT("info", ("error writing binlog cache: %d",
write_error));
flush_and_sync(true);
DBUG_PRINT("info", ("crashing before writing xid"));
DBUG_SUICIDE();
});
if ((write_error= do_write_cache(cache)))
goto err;
if (incident && write_incident(thd, false/*need_lock_log=false*/,
false/*do_flush_and_sync==false*/))
goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE(););
if (cache->error) // Error on read
{
char errbuf[MYSYS_STRERROR_SIZE];
sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name,
errno, my_strerror(errbuf, sizeof(errbuf), errno));
write_error=1; // Don't give more errors
goto err;
}
if (!thd->gtid_precommit)
{
global_sid_lock->rdlock();
if (gtid_state->update_on_flush(thd) != RETURN_STATUS_OK)
{
global_sid_lock->unlock();
goto err;
}
global_sid_lock->unlock();
}
}
update_thd_next_event_pos(thd);
}
DBUG_RETURN(0);
err:
if (!write_error)
{
char errbuf[MYSYS_STRERROR_SIZE];
write_error= 1;
sql_print_error(ER(ER_ERROR_ON_WRITE), name,
errno, my_strerror(errbuf, sizeof(errbuf), errno));
}
thd->commit_error= THD::CE_FLUSH_ERROR;
/* Remove gtid from logged_gtid set if failed. */
if (write_error && thd->gtid_precommit)
{
global_sid_lock->rdlock();
gtid_state->remove_gtid_on_failure(thd);
global_sid_lock->unlock();
}
DBUG_RETURN(1);
}
/**
Wait until we get a signal that the relay log has been updated.
@param[in] thd Thread variable
@param[in] timeout a pointer to a timespec;
NULL means to wait w/o timeout.
@retval 0 if got signalled on update
@retval non-0 if wait timeout elapsed
@note
One must have a lock on LOCK_log before calling this function.
*/
int MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd, const struct timespec *timeout)
{
int ret= 0;
PSI_stage_info old_stage;
DBUG_ENTER("wait_for_update_relay_log");
thd->ENTER_COND(&update_cond, &LOCK_log,
&stage_slave_has_read_all_relay_log,
&old_stage);
if (!timeout)
mysql_cond_wait(&update_cond, &LOCK_log);
else
ret= mysql_cond_timedwait(&update_cond, &LOCK_log,
const_cast(timeout));
thd->EXIT_COND(&old_stage);
DBUG_RETURN(ret);
}
/**
Wait until we get a signal that the binary log has been updated.
Applies to master only.
NOTES
@param[in] thd a THD struct
@param[in] timeout a pointer to a timespec;
NULL means to wait w/o timeout.
@retval 0 if got signalled on update
@retval non-0 if wait timeout elapsed
@note
LOCK_log must be taken before calling this function.
LOCK_log is being released while the thread is waiting.
LOCK_log is released by the caller.
*/
int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
const struct timespec *timeout)
{
int ret= 0;
DBUG_ENTER("wait_for_update_bin_log");
if (!timeout)
mysql_cond_wait(&update_cond, &LOCK_log);
else
ret= mysql_cond_timedwait(&update_cond, &LOCK_log,
const_cast(timeout));
DBUG_RETURN(ret);
}
/**
Close the log file.
@param exiting Bitmask for one or more of the following bits:
- LOG_CLOSE_INDEX : if we should close the index file
- LOG_CLOSE_TO_BE_OPENED : if we intend to call open
at once after close.
- LOG_CLOSE_STOP_EVENT : write a 'stop' event to the log
@note
One can do an open on the object at once after doing a close.
The internal structures are not freed until cleanup() is called
*/
void MYSQL_BIN_LOG::close(uint exiting)
{ // One can't set log_type here!
DBUG_ENTER("MYSQL_BIN_LOG::close");
DBUG_PRINT("enter",("exiting: %d", (int) exiting));
if (log_state == LOG_OPENED)
{
#ifdef HAVE_REPLICATION
if ((exiting & LOG_CLOSE_STOP_EVENT) != 0)
{
Stop_log_event s;
// the checksumming rule for relay-log case is similar to Rotate
s.checksum_alg= is_relay_log ?
relay_log_checksum_alg : binlog_checksum_options;
DBUG_ASSERT(!is_relay_log ||
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
}
#endif /* HAVE_REPLICATION */
/* don't pwrite in a file opened with O_APPEND - it doesn't work */
if (log_file.type == WRITE_CACHE)
{
my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
my_off_t org_position= mysql_file_tell(log_file.file, MYF(0));
uchar flags= 0; // clearing LOG_EVENT_BINLOG_IN_USE_F
mysql_file_pwrite(log_file.file, &flags, 1, offset, MYF(0));
/*
Restore position so that anything we have in the IO_cache is written
to the correct position.
We need the seek here, as mysql_file_pwrite() is not guaranteed to keep the
original position on system that doesn't support pwrite().
*/
mysql_file_seek(log_file.file, org_position, MY_SEEK_SET, MYF(0));
}
/* this will cleanup IO_CACHE, sync and close the file */
MYSQL_LOG::close(exiting);
}
/*
The following test is needed even if is_open() is not set, as we may have
called a not complete close earlier and the index file is still open.
*/
if ((exiting & LOG_CLOSE_INDEX) && my_b_inited(&index_file))
{
end_io_cache(&index_file);
if (mysql_file_close(index_file.file, MYF(0)) < 0 && ! write_error)
{
char errbuf[MYSYS_STRERROR_SIZE];
write_error= 1;
sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name,
errno, my_strerror(errbuf, sizeof(errbuf), errno));
}
}
log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
my_free(name);
name= NULL;
DBUG_VOID_RETURN;
}
void MYSQL_BIN_LOG::set_max_size(ulong max_size_arg)
{
/*
We need to take locks, otherwise this may happen:
new_file() is called, calls open(old_max_size), then before open() starts,
set_max_size() sets max_size to max_size_arg, then open() starts and
uses the old_max_size argument, so max_size_arg has been overwritten and
it's like if the SET command was never run.
*/
DBUG_ENTER("MYSQL_BIN_LOG::set_max_size");
mysql_mutex_lock(&LOCK_log);
if (is_open())
max_size= max_size_arg;
mysql_mutex_unlock(&LOCK_log);
DBUG_VOID_RETURN;
}
void MYSQL_BIN_LOG::signal_update()
{
DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
signal_cnt++;
mysql_cond_broadcast(&update_cond);
DBUG_VOID_RETURN;
}
/****** transaction coordinator log for 2pc - binlog() based solution ******/
/**
@todo
keep in-memory list of prepared transactions
(add to list in log(), remove on unlog())
and copy it to the new binlog if rotated
but let's check the behaviour of tc_log_page_waits first!
*/
int MYSQL_BIN_LOG::open_binlog(const char *opt_name)
{
LOG_INFO log_info;
int error= 1;
/*
This function is used for 2pc transaction coordination. Hence, it
is never used for relay logs.
*/
DBUG_ASSERT(!is_relay_log);
DBUG_ASSERT(total_ha_2pc > 1 || (1 == total_ha_2pc && opt_bin_log));
DBUG_ASSERT(opt_name && opt_name[0]);
if (!my_b_inited(&index_file))
{
/* There was a failure to open the index file, can't open the binlog */
cleanup();
return 1;
}
if (using_heuristic_recover())
{
/* generate a new binlog to mask a corrupted one */
open_binlog(opt_name, 0, WRITE_CACHE, max_binlog_size, false,
true/*need_lock_index=true*/,
true/*need_sid_lock=true*/,
NULL);
cleanup();
return 1;
}
if ((error= find_log_pos(&log_info, NullS, true/*need_lock_index=true*/)))
{
if (error != LOG_INFO_EOF)
sql_print_error("find_log_pos() failed (error: %d)", error);
else
error= 0;
goto err;
}
{
const char *errmsg;
IO_CACHE log;
File file;
Log_event *ev=0;
Format_description_log_event fdle(BINLOG_VERSION);
char log_name[FN_REFLEN];
my_off_t valid_pos= 0;
my_off_t binlog_size;
MY_STAT s;
if (! fdle.is_valid())
goto err;
do
{
strmake(log_name, log_info.log_file_name, sizeof(log_name)-1);
} while (!(error= find_next_log(&log_info, true/*need_lock_index=true*/)));
if (error != LOG_INFO_EOF)
{
sql_print_error("find_log_pos() failed (error: %d)", error);
goto err;
}
if ((file= open_binlog_file(&log, log_name, &errmsg)) < 0)
{
sql_print_error("%s", errmsg);
goto err;
}
my_stat(log_name, &s, MYF(0));
binlog_size= s.st_size;
if ((ev= Log_event::read_log_event(&log, 0, &fdle,
opt_master_verify_checksum)) &&
ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
{
sql_print_information("Recovering after a crash using %s", opt_name);
valid_pos= my_b_tell(&log);
error= recover(&log, (Format_description_log_event *)ev, &valid_pos);
}
else
error=0;
delete ev;
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
if (error)
goto err;
/* Trim the crashed binlog file to last valid transaction
or event (non-transaction) base on valid_pos. */
if (valid_pos > 0)
{
if ((file= mysql_file_open(key_file_binlog, log_name,
O_RDWR | O_BINARY, MYF(MY_WME))) < 0)
{
sql_print_error("Failed to open the crashed binlog file "
"when master server is recovering it.");
return -1;
}
/* Change binlog file size to valid_pos */
if (valid_pos < binlog_size)
{
if (my_chsize(file, valid_pos, 0, MYF(MY_WME)))
{
sql_print_error("Failed to trim the crashed binlog file "
"when master server is recovering it.");
mysql_file_close(file, MYF(MY_WME));
return -1;
}
else
{
sql_print_information("Crashed binlog file %s size is %llu, "
"but recovered up to %llu. Binlog trimmed to %llu bytes.",
log_name, binlog_size, valid_pos, valid_pos);
}
}
/* Clear LOG_EVENT_BINLOG_IN_USE_F */
my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
uchar flags= 0;
if (mysql_file_pwrite(file, &flags, 1, offset, MYF(0)) != 1)
{
sql_print_error("Failed to clear LOG_EVENT_BINLOG_IN_USE_F "
"for the crashed binlog file when master "
"server is recovering it.");
mysql_file_close(file, MYF(MY_WME));
return -1;
}
mysql_file_close(file, MYF(MY_WME));
} //end if
}
err:
return error;
}
/** This is called on shutdown, after ha_panic. */
void MYSQL_BIN_LOG::close()
{
}
/*
Prepare the transaction in the transaction coordinator.
This function will prepare the transaction in the storage engines
(by calling @c ha_prepare_low) what will write a prepare record
to the log buffers.
@retval 0 success
@retval 1 error
*/
int MYSQL_BIN_LOG::prepare(THD *thd, bool all)
{
DBUG_ENTER("MYSQL_BIN_LOG::prepare");
/*
Set HA_IGNORE_DURABILITY to not flush the prepared record of the
transaction to the log of storage engine (for example, InnoDB
redo log) during the prepare phase. So that we can flush prepared
records of transactions to the log of storage engine in a group
right before flushing them to binary log during binlog group
commit flush stage. Reset to HA_REGULAR_DURABILITY at the
beginning of parsing next command.
*/
thd->durability_property= HA_IGNORE_DURABILITY;
int error= ha_prepare_low(thd, all);
DBUG_RETURN(error);
}
/**
Commit the transaction in the transaction coordinator.
This function will commit the sessions transaction in the binary log
and in the storage engines (by calling @c ha_commit_low). If the
transaction was successfully logged (or not successfully unlogged)
but the commit in the engines did not succed, there is a risk of
inconsistency between the engines and the binary log.
For binary log group commit, the commit is separated into three
parts:
1. First part consists of filling the necessary caches and
finalizing them (if they need to be finalized). After this,
nothing is added to any of the caches.
2. Second part execute an ordered flush and commit. This will be
done using the group commit functionality in ordered_commit.
3. Third part checks any errors resulting from the ordered commit
and handles them appropriately.
@retval 0 success
@retval 1 error, transaction was neither logged nor committed
@retval 2 error, transaction was logged but not committed
*/
TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all)
{
DBUG_ENTER("MYSQL_BIN_LOG::commit");
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
int error= RESULT_SUCCESS;
bool stuff_logged= false;
DBUG_PRINT("enter", ("thd: 0x%llx, all: %s, xid: %llu, cache_mngr: 0x%llx",
(ulonglong) thd, YESNO(all), (ulonglong) xid,
(ulonglong) cache_mngr));
/*
No cache manager means nothing to log, but we still have to commit
the transaction.
*/
if (cache_mngr == NULL)
{
if (ha_commit_low(thd, all))
DBUG_RETURN(RESULT_ABORTED);
DBUG_RETURN(RESULT_SUCCESS);
}
THD_TRANS *trans= all ? &thd->transaction.all : &thd->transaction.stmt;
DBUG_PRINT("debug", ("in_transaction: %s, no_2pc: %s, rw_ha_count: %d",
YESNO(thd->in_multi_stmt_transaction_mode()),
YESNO(trans->no_2pc),
trans->rw_ha_count));
DBUG_PRINT("debug",
("all.cannot_safely_rollback(): %s, trx_cache_empty: %s",
YESNO(thd->transaction.all.cannot_safely_rollback()),
YESNO(cache_mngr->trx_cache.is_binlog_empty())));
DBUG_PRINT("debug",
("stmt.cannot_safely_rollback(): %s, stmt_cache_empty: %s",
YESNO(thd->transaction.stmt.cannot_safely_rollback()),
YESNO(cache_mngr->stmt_cache.is_binlog_empty())));
/*
If there are no handlertons registered, there is nothing to
commit. Note that DDLs are written earlier in this case (inside
binlog_query).
TODO: This can be a problem in those cases that there are no
handlertons registered. DDLs are one example, but the other case
is MyISAM. In this case, we could register a dummy handlerton to
trigger the commit.
Any statement that requires logging will call binlog_query before
trans_commit_stmt, so an alternative is to use the condition
"binlog_query called or stmt.ha_list != 0".
*/
if (!all && trans->ha_list == 0 &&
cache_mngr->stmt_cache.is_binlog_empty())
DBUG_RETURN(RESULT_SUCCESS);
/*
If there is anything in the stmt cache, and GTIDs are enabled,
then this is a single statement outside a transaction and it is
impossible that there is anything in the trx cache. Hence, we
write any empty group(s) to the stmt cache.
Otherwise, we write any empty group(s) to the trx cache at the end
of the transaction.
*/
if (!cache_mngr->stmt_cache.is_binlog_empty())
{
error= write_empty_groups_to_cache(thd, &cache_mngr->stmt_cache);
if (error == 0)
{
if (cache_mngr->stmt_cache.finalize(thd))
DBUG_RETURN(RESULT_ABORTED);
stuff_logged= true;
}
}
/*
We commit the transaction if:
- We are not in a transaction and committing a statement, or
- We are in a transaction and a full transaction is committed.
Otherwise, we accumulate the changes.
*/
if (!error && !cache_mngr->trx_cache.is_binlog_empty() &&
ending_trans(thd, all))
{
const bool real_trans= (all || thd->transaction.all.ha_list == 0);
/*
We are committing an XA transaction if it is a "real" transaction
and have an XID assigned (because some handlerton registered). A
transaction is "real" if either 'all' is true or the 'all.ha_list'
is empty.
Note: This is kind of strange since registering the binlog
handlerton will then make the transaction XA, which is not really
true. This occurs for example if a MyISAM statement is executed
with row-based replication on.
*/
if (real_trans && xid && trans->rw_ha_count > 1 && !trans->no_2pc)
{
Xid_log_event end_evt(thd, xid);
if (cache_mngr->trx_cache.finalize(thd, &end_evt))
DBUG_RETURN(RESULT_ABORTED);
}
else
{
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
true, FALSE, TRUE, 0, TRUE);
if (cache_mngr->trx_cache.finalize(thd, &end_evt))
DBUG_RETURN(RESULT_ABORTED);
}
stuff_logged= true;
}
/*
This is part of the stmt rollback.
*/
if (!all)
cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
DBUG_PRINT("debug", ("error: %d", error));
if (error)
DBUG_RETURN(RESULT_ABORTED);
/*
Now all the events are written to the caches, so we will commit
the transaction in the engines. This is done using the group
commit logic in ordered_commit, which will return when the
transaction is committed.
If the commit in the engines fail, we still have something logged
to the binary log so we have to report this as a "bad" failure
(failed to commit, but logged something).
*/
if (stuff_logged)
{
if (ordered_commit(thd, all))
DBUG_RETURN(RESULT_INCONSISTENT);
}
else
{
if (ha_commit_low(thd, all))
DBUG_RETURN(RESULT_INCONSISTENT);
}
DBUG_RETURN(error ? RESULT_INCONSISTENT : RESULT_SUCCESS);
}
/**
Flush caches for session.
@note @c set_trans_pos is called with a pointer to the file name
that the binary log currently use and a rotation will change the
contents of the variable.
The position is used when calling the after_flush, after_commit,
and after_rollback hooks, but these have been placed so that they
occur before a rotation is executed.
It is the responsibility of any plugin that use this position to
copy it if they need it after the hook has returned.
*/
std::pair
MYSQL_BIN_LOG::flush_thread_caches(THD *thd)
{
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
my_off_t bytes= 0;
bool wrote_xid= false;
int error= cache_mngr->flush(thd, &bytes, &wrote_xid);
if (!error && bytes > 0)
{
/*
Note that set_trans_pos does not copy the file name. See
this function documentation for more info.
*/
thd->set_trans_pos(log_file_name, my_b_tell(&log_file));
if (wrote_xid)
inc_prep_xids(thd);
}
DBUG_PRINT("debug", ("bytes: %llu", bytes));
return std::make_pair(error, bytes);
}
/**
Execute the flush stage.
@param total_bytes_var Pointer to variable that will be set to total
number of bytes flushed, or NULL.
@param rotate_var Pointer to variable that will be set to true if
binlog rotation should be performed after releasing locks. If rotate
is not necessary, the variable will not be touched.
@param curr is the THD passed from the caller.
@return Error code on error, zero on success
*/
int
MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var,
THD **out_queue_var)
{
DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var);
my_off_t total_bytes= 0;
int flush_error= 1;
mysql_mutex_assert_owner(&LOCK_log);
DEBUG_SYNC(current_thd, "process_as_leader");
/*
Fetch the entire flush queue and empty it, so that the next batch
has a leader. We must do this before invoking ha_flush_logs(...)
for guaranteeing to flush prepared records of transactions before
flushing them to binary log, which is required by crash recovery.
*/
THD *first_seen= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
DBUG_ASSERT(first_seen != NULL);
/* Do an explicit transaction log group write before flushing binary log
* cache to file*/
if (!first_seen->prepared_engine->is_empty())
ha_flush_logs(NULL, first_seen->prepared_engine);
#ifndef DBUG_OFF
for (THD *head= first_seen ; head ; head = head->next_to_commit)
{
DBUG_ASSERT(head->prepared_engine->compare_lt(
first_seen->prepared_engine->get_maps()));
}
#endif
DBUG_EXECUTE_IF("crash_after_flush_engine_log", DBUG_SUICIDE(););
/* Flush thread caches to binary log. */
for (THD *head= first_seen ; head ; head = head->next_to_commit)
{
std::pair result= flush_thread_caches(head);
total_bytes+= result.second;
if (flush_error == 1)
flush_error= result.first;
/* Reset prepared_engine for every thd in the queue. */
head->prepared_engine->clear();
}
*out_queue_var= first_seen;
*total_bytes_var= total_bytes;
if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size)
*rotate_var= true;
return flush_error;
}
/**
Commit a sequence of sessions.
This function commit an entire queue of sessions starting with the
session in @c first. If there were an error in the flushing part of
the ordered commit, the error code is passed in and all the threads
are marked accordingly (but not committed).
@see MYSQL_BIN_LOG::ordered_commit
@param thd The "master" thread
@param first First thread in the queue of threads to commit
*/
void
MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first)
{
mysql_mutex_assert_owner(&LOCK_commit);
Thread_excursion excursion(thd);
#ifndef DBUG_OFF
thd->transaction.flags.ready_preempt= 1; // formality by the leader
#endif
for (THD *head= first ; head ; head = head->next_to_commit)
{
DBUG_PRINT("debug", ("Thread ID: %lu, commit_error: %d, flags.pending: %s",
head->thread_id, head->commit_error,
YESNO(head->transaction.flags.pending)));
/*
If flushing failed, set commit_error for the session, skip the
transaction and proceed with the next transaction instead. This
will mark all threads as failed, since the flush failed.
If flush succeeded, attach to the session and commit it in the
engines.
*/
#ifndef DBUG_OFF
stage_manager.clear_preempt_status(head);
#endif
/*
Flush/Sync error should be ignored and continue
to commit phase. And thd->commit_error cannot be
COMMIT_ERROR at this moment.
*/
DBUG_ASSERT(head->commit_error != THD::CE_COMMIT_ERROR);
excursion.try_to_attach_to(head);
bool all= head->transaction.flags.real_commit;
if (head->transaction.flags.commit_low)
{
/* head is parked to have exited append() */
DBUG_ASSERT(head->transaction.flags.ready_preempt);
/*
storage engine commit
*/
if (ha_commit_low(head, all, false))
head->commit_error= THD::CE_COMMIT_ERROR;
}
DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s",
head->commit_error,
YESNO(head->transaction.flags.pending)));
/*
Decrement the prepared XID counter after storage engine commit.
We also need decrement the prepared XID when encountering a
flush error or session attach error for avoiding 3-way deadlock
among user thread, rotate thread and dump thread.
*/
if (head->transaction.flags.xid_written)
dec_prep_xids(head);
}
}
/**
Process after commit for a sequence of sessions.
@param thd The "master" thread
@param first First thread in the queue of threads to commit
*/
void
MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
{
Thread_excursion excursion(thd);
for (THD *head= first; head; head= head->next_to_commit)
{
if (head->transaction.flags.run_hooks &&
head->commit_error != THD::CE_COMMIT_ERROR)
{
/*
TODO: This hook here should probably move outside/below this
if and be the only after_commit invocation left in the
code.
*/
excursion.try_to_attach_to(head);
bool all= head->transaction.flags.real_commit;
(void) RUN_HOOK(transaction, after_commit, (head, all));
/*
When after_commit finished for the transaction, clear the run_hooks flag.
This allow other parts of the system to check if after_commit was called.
*/
head->transaction.flags.run_hooks= false;
}
}
}
#ifndef DBUG_OFF
/** Names for the stages. */
static const char* g_stage_name[] = {
"FLUSH",
"SYNC",
"COMMIT",
};
#endif
/**
Enter a stage of the ordered commit procedure.
Entering is stage is done by:
- Atomically enqueueing a queue of processes (which is just one for
the first phase).
- If the queue was empty, the thread is the leader for that stage
and it should process the entire queue for that stage.
- If the queue was not empty, the thread is a follower and can go
waiting for the commit to finish.
The function will lock the stage mutex if it was designated the
leader for the phase.
@param thd Session structure
@param stage The stage to enter
@param queue Queue of threads to enqueue for the stage
@param stage_mutex Mutex for the stage
@retval true The thread should "bail out" and go waiting for the
commit to finish
@retval false The thread is the leader for the stage and should do
the processing.
*/
bool
MYSQL_BIN_LOG::change_stage(THD *thd,
Stage_manager::StageID stage, THD *queue,
mysql_mutex_t *leave_mutex,
mysql_mutex_t *enter_mutex)
{
DBUG_ENTER("MYSQL_BIN_LOG::change_stage");
DBUG_PRINT("enter", ("thd: 0x%llx, stage: %s, queue: 0x%llx",
(ulonglong) thd, g_stage_name[stage], (ulonglong) queue));
DBUG_ASSERT(0 <= stage && stage < Stage_manager::STAGE_COUNTER);
DBUG_ASSERT(enter_mutex);
DBUG_ASSERT(queue);
/*
enroll_for will release the leave_mutex once the sessions are
queued.
*/
if (!stage_manager.enroll_for(stage, queue, leave_mutex))
{
DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized());
DBUG_RETURN(true);
}
mysql_mutex_lock(enter_mutex);
DBUG_RETURN(false);
}
/**
Flush the I/O cache to file.
Flush the binary log to the binlog file if any byte where written
and signal that the binary log file has been updated if the flush
succeeds.
*/
int
MYSQL_BIN_LOG::flush_cache_to_file(my_off_t *end_pos_var)
{
if (flush_io_cache(&log_file))
{
THD *thd= current_thd;
thd->commit_error= THD::CE_FLUSH_ERROR;
return ER_ERROR_ON_WRITE;
}
*end_pos_var= my_b_tell(&log_file);
return 0;
}
/**
Call fsync() to sync the file to disk.
*/
std::pair
MYSQL_BIN_LOG::sync_binlog_file(bool force)
{
bool synced= false;
unsigned int sync_period= get_sync_period();
if (force || (sync_period && ++sync_counter >= sync_period))
{
sync_counter= 0;
/**
On *pure non-transactional* workloads there is a small window
in time where a concurrent rotate might be able to close
the file before the sync is actually done. In that case,
ignore the bad file descriptor errors.
Transactional workloads (InnoDB) are not affected since the
the rotation will not happen until all transactions have
committed to the storage engine, thence decreased the XID
counters.
TODO: fix this properly even for non-transactional storage
engines.
*/
if (DBUG_EVALUATE_IF("simulate_error_during_sync_binlog_file", 1,
mysql_file_sync(log_file.file,
MYF(MY_WME | MY_IGNORE_BADFD))))
{
THD *thd= current_thd;
thd->commit_error= THD::CE_SYNC_ERROR;
return std::make_pair(true, synced);
}
synced= true;
}
return std::make_pair(false, synced);
}
/**
Helper function executed when leaving @c ordered_commit.
This function contain the necessary code for fetching the error
code, doing post-commit checks, and wrapping up the commit if
necessary.
It is typically called when enter_stage indicates that the thread
should bail out, and also when the ultimate leader thread finishes
executing @c ordered_commit.
It is typically used in this manner:
@code
if (enter_stage(thd, Thread_queue::FLUSH_STAGE, thd, &LOCK_log))
return finish_commit(thd);
@endcode
@return Error code if the session commit failed, or zero on
success.
*/
int
MYSQL_BIN_LOG::finish_commit(THD *thd)
{
/*
In some unlikely situations, it can happen that binary
log is closed before the thread flushes it's cache.
In that case, clear the caches before doing commit.
*/
if (unlikely(!is_open()))
{
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
if (cache_mngr)
cache_mngr->reset();
}
if (thd->transaction.flags.commit_low)
{
const bool all= thd->transaction.flags.real_commit;
/*
storage engine commit
*/
DBUG_ASSERT(thd->commit_error != THD::CE_COMMIT_ERROR);
if (ha_commit_low(thd, all, false))
thd->commit_error= THD::CE_COMMIT_ERROR;
/*
Decrement the prepared XID counter after storage engine commit
*/
if (thd->transaction.flags.xid_written)
dec_prep_xids(thd);
/*
If commit succeeded, we call the after_commit hook
TODO: This hook here should probably move outside/below this
if and be the only after_commit invocation left in the
code.
*/
if ((thd->commit_error != THD::CE_COMMIT_ERROR ) && thd->transaction.flags.run_hooks)
{
(void) RUN_HOOK(transaction, after_commit, (thd, all));
thd->transaction.flags.run_hooks= false;
}
}
else if (thd->transaction.flags.xid_written)
dec_prep_xids(thd);
if (!thd->gtid_precommit)
{
/*
Remove committed GTID from owned_gtids, it was already logged on
MYSQL_BIN_LOG::write_cache().
*/
global_sid_lock->rdlock();
gtid_state->update_on_commit(thd);
global_sid_lock->unlock();
}
else
{
/* Reset gtid_precommit. */
thd->gtid_precommit= false;
/* Clear gtid owned by current THD. */
thd->clear_owned_gtids();
thd->variables.gtid_next.set_undefined();
}
DBUG_ASSERT(thd->commit_error || !thd->transaction.flags.run_hooks);
DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized());
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
thd->thread_id, thd->commit_error));
/*
flush or sync errors are handled by the leader of the group
(using binlog_error_action). Hence treat only COMMIT_ERRORs as errors.
*/
return (thd->commit_error == THD::CE_COMMIT_ERROR);
}
/**
Helper function to handle flush or sync stage errors.
If binlog_error_action= ABORT_SERVER, server will be aborted
after reporting the error to the client.
If binlog_error_action= IGNORE_ERROR, binlog will be closed
for the life time of the server. close() call is protected
with LOCK_log to avoid any parallel operations on binary log.
@param thd Thread object that faced flush/sync error
@param need_lock_log
> Indicates true if LOCk_log is needed before closing
binlog (happens when we are handling sync error)
> Indicates false if LOCK_log is already acquired
by the thread (happens when we are handling flush
error)
@return void
*/
void MYSQL_BIN_LOG::handle_binlog_flush_or_sync_error(THD *thd,
bool need_lock_log)
{
char errmsg[MYSQL_ERRMSG_SIZE];
sprintf(errmsg, "An error occurred during %s stage of the commit. "
"'binlog_error_action' is set to '%s'.",
thd->commit_error== THD::CE_FLUSH_ERROR ? "flush" : "sync",
binlog_error_action == ABORT_SERVER ? "ABORT_SERVER" : "IGNORE_ERROR");
if (binlog_error_action == ABORT_SERVER)
{
char err_buff[MYSQL_ERRMSG_SIZE];
sprintf(err_buff, "%s Hence aborting the server.", errmsg);
exec_binlog_error_action_abort(err_buff);
}
else
{
DEBUG_SYNC(thd, "before_binlog_closed_due_to_error");
if (need_lock_log)
mysql_mutex_lock(&LOCK_log);
else
mysql_mutex_assert_owner(&LOCK_log);
/*
It can happen that other group leader encountered
error and already closed the binary log. So print
error only if it is in open state. But we should
call close() always just in case if the previous
close did not close index file.
*/
if (is_open())
{
sql_print_error("%s Hence turning logging off for the whole duration "
"of the MySQL server process. To turn it on again: fix "
"the cause, shutdown the MySQL server and restart it.",
errmsg);
}
close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
if (need_lock_log)
mysql_mutex_unlock(&LOCK_log);
DEBUG_SYNC(thd, "after_binlog_closed_due_to_error");
}
}
/**
Flush and commit the transaction.
This will execute an ordered flush and commit of all outstanding
transactions and is the main function for the binary log group
commit logic. The function performs the ordered commit in two
phases.
The first phase flushes the caches to the binary log and under
LOCK_log and marks all threads that were flushed as not pending.
The second phase executes under LOCK_commit and commits all
transactions in order.
The procedure is:
1. Queue ourselves for flushing.
2. Grab the log lock, which might result is blocking if the mutex is
already held by another thread.
3. If we were not committed while waiting for the lock
1. Fetch the queue
2. For each thread in the queue:
a. Attach to it
b. Flush the caches, saving any error code
3. Flush and sync (depending on the value of sync_binlog).
4. Signal that the binary log was updated
4. Release the log lock
5. Grab the commit lock
1. For each thread in the queue:
a. If there were no error when flushing and the transaction shall be committed:
- Commit the transaction, saving the result of executing the commit.
6. Release the commit lock
7. Call purge, if any of the committed thread requested a purge.
8. Return with the saved error code
@todo The use of @c skip_commit is a hack that we use since the @c
TC_LOG Interface does not contain functions to handle
savepoints. Once the binary log is eliminated as a handlerton and
the @c TC_LOG interface is extended with savepoint handling, this
parameter can be removed.
@param thd Session to commit transaction for
@param all This is @c true if this is a real transaction commit, and
@c false otherwise.
@param skip_commit
This is @c true if the call to @c ha_commit_low should
be skipped (it is handled by the caller somehow) and @c
false otherwise (the normal case).
*/
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{
DBUG_ENTER("MYSQL_BIN_LOG::ordered_commit");
int flush_error= 0, sync_error= 0;
my_off_t total_bytes= 0;
bool do_rotate= false;
/*
These values are used while flushing a transaction, so clear
everything.
Notes:
- It would be good if we could keep transaction coordinator
log-specific data out of the THD structure, but that is not the
case right now.
- Everything in the transaction structure is reset when calling
ha_commit_low since that calls st_transaction::cleanup.
*/
thd->transaction.flags.pending= true;
thd->commit_error= THD::CE_NONE;
thd->next_to_commit= NULL;
thd->durability_property= HA_IGNORE_DURABILITY;
thd->transaction.flags.real_commit= all;
thd->transaction.flags.xid_written= false;
thd->transaction.flags.commit_low= !skip_commit;
thd->transaction.flags.run_hooks= !skip_commit;
thd->stage_leader= false;
thd->stage_cond_id= UNDEF_COND_SLOT;
thd->prev_to_commit= NULL;
DBUG_ASSERT(!thd_get_cache_mngr(thd)->all_finalized() ||
(thd->variables.gtid_next.type == AUTOMATIC_GROUP));
thd->gtid_precommit= (gtid_mode &&
((opt_gtid_precommit &&
thd->variables.gtid_next.type == AUTOMATIC_GROUP) ||
thd_get_cache_mngr(thd)->all_finalized()));
#ifndef DBUG_OFF
/*
The group commit Leader may have to wait for follower whose transaction
is not ready to be preempted. Initially the status is pessimistic.
Preemption guarding logics is necessary only when DBUG_ON is set.
It won't be required for the dbug-off case as long as the follower won't
execute any thread-specific write access code in this method, which is
the case as of current.
*/
thd->transaction.flags.ready_preempt= 0;
#endif
DBUG_PRINT("enter", ("flags.pending: %s, commit_error: %d, thread_id: %lu",
YESNO(thd->transaction.flags.pending),
thd->commit_error, thd->thread_id));
/*
Stage #1: flushing transactions to binary log
While flushing, we allow new threads to enter and will process
them in due time. Once the queue was empty, we cannot reap
anything more since it is possible that a thread entered and
appointed itself leader for the flush phase.
*/
DEBUG_SYNC(thd, "waiting_to_enter_flush_stage");
if (change_stage(thd, Stage_manager::FLUSH_STAGE, thd, NULL, &LOCK_log))
{
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
thd->thread_id, thd->commit_error));
DBUG_RETURN(finish_commit(thd));
}
THD *wait_queue= NULL, *final_queue= NULL;
mysql_mutex_t *leave_mutex_before_commit_stage= NULL;
my_off_t flush_end_pos= 0;
bool need_LOCK_log;
if (unlikely(!is_open()))
{
final_queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
leave_mutex_before_commit_stage= &LOCK_log;
/*
binary log is closed, flush stage and sync stage should be
ignored. Binlog cache should be cleared, but instead of doing
it here, do that work in 'finish_commit' function so that
leader and followers thread caches will be cleared.
*/
goto commit_stage;
}
DEBUG_SYNC(thd, "waiting_in_the_middle_of_flush_stage");
flush_error= process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);
if (flush_error == 0 && total_bytes > 0)
flush_error= flush_cache_to_file(&flush_end_pos);
DBUG_EXECUTE_IF("crash_after_flush_binlog", DBUG_SUICIDE(););
/*
If the flush finished successfully, we can call the after_flush
hook. Being invoked here, we have the guarantee that the hook is
executed before the before/after_send_hooks on the dump thread
preventing race conditions among these plug-ins.
*/
if (flush_error == 0)
{
const char *file_name_ptr= log_file_name + dirname_length(log_file_name);
DBUG_ASSERT(flush_end_pos != 0);
if (RUN_HOOK(binlog_storage, after_flush,
(thd, file_name_ptr, flush_end_pos)))
{
sql_print_error("Failed to run 'after_flush' hooks");
flush_error= ER_ERROR_ON_WRITE;
}
signal_update();
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
}
if (flush_error)
{
/*
Handle flush error (if any) after leader finishes it's flush stage.
*/
handle_binlog_flush_or_sync_error(thd, false /* need_lock_log */);
}
/*
Stage #2: Syncing binary log file to disk
*/
need_LOCK_log= (get_sync_period() == 1);
/*
LOCK_log is not released when sync_binlog is 1. It guarantees that the
events are not be replicated by dump threads before they are synced to disk.
*/
if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue,
need_LOCK_log ? NULL : &LOCK_log, &LOCK_sync))
{
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
thd->thread_id, thd->commit_error));
DBUG_RETURN(finish_commit(thd));
}
final_queue= stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);
if (flush_error == 0 && total_bytes > 0)
{
DEBUG_SYNC(thd, "before_sync_binlog_file");
std::pair result= sync_binlog_file(false);
sync_error= result.first;
}
if (need_LOCK_log)
mysql_mutex_unlock(&LOCK_log);
leave_mutex_before_commit_stage= &LOCK_sync;
/*
Stage #3: Commit all transactions in order.
This stage is skipped if we do not need to order the commits and
each thread have to execute the handlerton commit instead.
Howver, since we are keeping the lock from the previous stage, we
need to unlock it if we skip the stage.
*/
commit_stage:
/*
We are delaying the handling of sync error until
all locks are released but we should not enter into
commit stage if binlog_error_action is ABORT_SERVER.
*/
if (opt_binlog_order_commits &&
(sync_error == 0 || binlog_error_action != ABORT_SERVER))
{
if (change_stage(thd, Stage_manager::COMMIT_STAGE,
final_queue, leave_mutex_before_commit_stage,
&LOCK_commit))
{
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
thd->thread_id, thd->commit_error));
DBUG_RETURN(finish_commit(thd));
}
THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);
DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
DEBUG_SYNC(thd, "before_process_commit_stage_queue"););
process_commit_stage_queue(thd, commit_queue);
mysql_mutex_unlock(&LOCK_commit);
/*
Process after_commit after LOCK_commit is released for avoiding
3-way deadlock among user thread, rotate thread and dump thread.
*/
process_after_commit_stage_queue(thd, commit_queue);
final_queue= commit_queue;
}
else if (leave_mutex_before_commit_stage)
mysql_mutex_unlock(leave_mutex_before_commit_stage);
/*
Handle sync error after we release all locks in order to avoid deadlocks
*/
if (sync_error)
handle_binlog_flush_or_sync_error(thd, true /* need_lock_log */);
/* Commit done so signal all waiting threads */
stage_manager.signal_done(final_queue);
/*
Finish the commit before executing a rotate, or run the risk of a
deadlock. We don't need the return value here since it is in
thd->commit_error, which is returned below.
*/
(void) finish_commit(thd);
/*
If we need to rotate, we do it without commit error.
Otherwise the thd->commit_error will be possibly reset.
*/
if (DBUG_EVALUATE_IF("force_rotate", 1, 0) ||
(do_rotate && thd->commit_error == THD::CE_NONE))
{
/*
Do not force the rotate as several consecutive groups may
request unnecessary rotations.
NOTE: Run purge_logs wo/ holding LOCK_log because it does not
need the mutex. Otherwise causes various deadlocks.
*/
DEBUG_SYNC(thd, "ready_to_do_rotation");
bool check_purge= false;
mysql_mutex_lock(&LOCK_log);
/*
If rotate fails then depends on binlog_error_action variable
appropriate action will be taken inside rotate call.
*/
int error= rotate(false, &check_purge);
mysql_mutex_unlock(&LOCK_log);
if (error)
thd->commit_error= THD::CE_COMMIT_ERROR;
else if (check_purge)
purge();
}
/*
flush or sync errors are handled above (using binlog_error_action).
Hence treat only COMMIT_ERRORs as errors.
*/
DBUG_RETURN(thd->commit_error == THD::CE_COMMIT_ERROR);
}
/**
MYSQLD server recovers from last crashed binlog.
@param log IO_CACHE of the crashed binlog.
@param fdle Format_description_log_event of the crashed binlog.
@param valid_pos The position of the last valid transaction or
event(non-transaction) of the crashed binlog.
@retval
0 ok
@retval
1 error
*/
int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle,
my_off_t *valid_pos)
{
Log_event *ev;
HASH xids;
MEM_ROOT mem_root;
/*
The flag is used for handling the case that a transaction
is partially written to the binlog.
*/
bool in_transaction= FALSE;
if (! fdle->is_valid() ||
my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
sizeof(my_xid), 0, 0, MYF(0)))
goto err1;
init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
while ((ev= Log_event::read_log_event(log, 0, fdle, TRUE))
&& ev->is_valid())
{
if (ev->get_type_code() == QUERY_EVENT &&
!strcmp(((Query_log_event*)ev)->query, "BEGIN"))
in_transaction= TRUE;
if (ev->get_type_code() == QUERY_EVENT &&
!strcmp(((Query_log_event*)ev)->query, "COMMIT"))
{
DBUG_ASSERT(in_transaction == TRUE);
in_transaction= FALSE;
}
else if (ev->get_type_code() == XID_EVENT)
{
DBUG_ASSERT(in_transaction == TRUE);
in_transaction= FALSE;
Xid_log_event *xev=(Xid_log_event *)ev;
uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid,
sizeof(xev->xid));
if (!x || my_hash_insert(&xids, x))
goto err2;
}
/*
Recorded valid position for the crashed binlog file
which did not contain incorrect events. The following
positions increase the variable valid_pos:
1 -
...
<---> HERE IS VALID <--->
GTID
BEGIN
...
COMMIT
...
2 -
...
<---> HERE IS VALID <--->
GTID
DDL/UTILITY
...
In other words, the following positions do not increase
the variable valid_pos:
1 -
GTID
<---> HERE IS VALID <--->
...
2 -
GTID
BEGIN
<---> HERE IS VALID <--->
...
*/
if (!log->error && !in_transaction &&
!is_gtid_event(ev))
*valid_pos= my_b_tell(log);
delete ev;
}
if (ha_recover(&xids))
goto err2;
free_root(&mem_root, MYF(0));
my_hash_free(&xids);
return 0;
err2:
free_root(&mem_root, MYF(0));
my_hash_free(&xids);
err1:
sql_print_error("Crash recovery failed. Either correct the problem "
"(if it's, for example, out of memory error) and restart, "
"or delete (or rename) binary log and start mysqld with "
"--tc-heuristic-recover={commit|rollback}");
return 1;
}
Group_cache *THD::get_group_cache(bool is_transactional)
{
DBUG_ENTER("THD::get_group_cache(bool)");
// If opt_bin_log==0, it is not safe to call thd_get_cache_mngr
// because binlog_hton has not been completely set up.
DBUG_ASSERT(opt_bin_log);
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(this);
// cache_mngr is NULL until we call thd->binlog_setup_trx_data, so
// we assert that this has been done.
DBUG_ASSERT(cache_mngr != NULL);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(is_transactional);
DBUG_ASSERT(cache_data != NULL);
DBUG_RETURN(&cache_data->group_cache);
}
/*
These functions are placed in this file since they need access to
binlog_hton, which has internal linkage.
*/
int THD::binlog_setup_trx_data()
{
DBUG_ENTER("THD::binlog_setup_trx_data");
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(this);
if (cache_mngr)
DBUG_RETURN(0); // Already set up
cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL));
if (!cache_mngr ||
open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, binlog_stmt_cache_size, MYF(MY_WME)) ||
open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, binlog_cache_size, MYF(MY_WME)))
{
my_free(cache_mngr);
DBUG_RETURN(1); // Didn't manage to set it up
}
DBUG_PRINT("debug", ("Set ha_data slot %d to 0x%llx", binlog_hton->slot, (ulonglong) cache_mngr));
thd_set_ha_data(this, binlog_hton, cache_mngr);
cache_mngr= new (thd_get_cache_mngr(this))
binlog_cache_mngr(max_binlog_stmt_cache_size,
&binlog_stmt_cache_use,
&binlog_stmt_cache_disk_use,
max_binlog_cache_size,
&binlog_cache_use,
&binlog_cache_disk_use);
DBUG_RETURN(0);
}
/**
*/
void register_binlog_handler(THD *thd, bool trx)
{
DBUG_ENTER("register_binlog_handler");
/*
If this is the first call to this function while processing a statement,
the transactional cache does not have a savepoint defined. So, in what
follows:
. an implicit savepoint is defined;
. callbacks are registered;
. binary log is set as read/write.
The savepoint allows for truncating the trx-cache transactional changes
fail. Callbacks are necessary to flush caches upon committing or rolling
back a statement or a transaction. However, notifications do not happen
if the binary log is set as read/write.
*/
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
if (cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF)
{
/*
Set an implicit savepoint in order to be able to truncate a trx-cache.
*/
my_off_t pos= 0;
binlog_trans_log_savepos(thd, &pos);
cache_mngr->trx_cache.set_prev_position(pos);
/*
Set callbacks in order to be able to call commmit or rollback.
*/
if (trx)
trans_register_ha(thd, TRUE, binlog_hton);
trans_register_ha(thd, FALSE, binlog_hton);
/*
Set the binary log as read/write otherwise callbacks are not called.
*/
thd->ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write();
}
DBUG_VOID_RETURN;
}
/**
Function to start a statement and optionally a transaction for the
binary log.
This function does three things:
- Starts a transaction if not in autocommit mode or if a BEGIN
statement has been seen.
- Start a statement transaction to allow us to truncate the cache.
- Save the currrent binlog position so that we can roll back the
statement by truncating the cache.
We only update the saved position if the old one was undefined,
the reason is that there are some cases (e.g., for CREATE-SELECT)
where the position is saved twice (e.g., both in
select_create::prepare() and THD::binlog_write_table_map()) , but
we should use the first. This means that calls to this function
can be used to start the statement before the first table map
event, to include some extra events.
Note however that IMMEDIATE_LOGGING implies that the statement is
written without BEGIN/COMMIT.
@param thd Thread variable
@param start_event The first event requested to be written into the
binary log
*/
static int binlog_start_trans_and_stmt(THD *thd, Log_event *start_event)
{
DBUG_ENTER("binlog_start_trans_and_stmt");
/*
Initialize the cache manager if this was not done yet.
*/
if (thd->binlog_setup_trx_data())
DBUG_RETURN(1);
/*
Retrieve the appropriated cache.
*/
bool is_transactional= start_event->is_using_trans_cache();
binlog_cache_mngr *cache_mngr= thd_get_cache_mngr(thd);
binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(is_transactional);
/*
If the event is requesting immediatly logging, there is no need to go
further down and set savepoint and register callbacks.
*/
if (start_event->is_using_immediate_logging())
DBUG_RETURN(0);
register_binlog_handler(thd, thd->in_multi_stmt_transaction_mode());
/*
If the cache is empty log "BEGIN" at the beginning of every transaction.
Here, a transaction is either a BEGIN..COMMIT/ROLLBACK block or a single
statement in autocommit mode.
*/
if (cache_data->is_binlog_empty())
{
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"),
is_transactional, FALSE, TRUE, 0, TRUE);
if (cache_data->write_event(thd, &qinfo))
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
/**
This function writes a table map to the binary log.
Note that in order to keep the signature uniform with related methods,
we use a redundant parameter to indicate whether a transactional table
was changed or not.
Sometimes it will write a Rows_query_log_event into binary log before
the table map too.
@param table a pointer to the table.
@param is_transactional @c true indicates a transactional table,
otherwise @c false a non-transactional.
@param binlog_rows_query @c true indicates a Rows_query log event
will be binlogged before table map,
otherwise @c false indicates it will not
be binlogged.
@return
nonzero if an error pops up when writing the table map event
or the Rows_query log event.
*/
int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
bool binlog_rows_query)
{
int error;
DBUG_ENTER("THD::binlog_write_table_map");
DBUG_PRINT("enter", ("table: 0x%lx (%s: #%llu)",
(long) table, table->s->table_name.str,
table->s->table_map_id.id()));
/* Pre-conditions */
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id.is_valid());
Table_map_log_event
the_event(this, table, table->s->table_map_id, is_transactional);
binlog_start_trans_and_stmt(this, &the_event);
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(this);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(is_transactional);
if (binlog_rows_query && this->query())
{
/* Write the Rows_query_log_event into binlog before the table map */
Rows_query_log_event
rows_query_ev(this, this->query(), this->query_length());
if ((error= cache_data->write_event(this, &rows_query_ev)))
DBUG_RETURN(error);
}
if ((error= cache_data->write_event(this, &the_event)))
DBUG_RETURN(error);
binlog_table_maps++;
DBUG_RETURN(0);
}
/**
This function retrieves a pending row event from a cache which is
specified through the parameter @c is_transactional. Respectively, when it
is @c true, the pending event is returned from the transactional cache.
Otherwise from the non-transactional cache.
@param is_transactional @c true indicates a transactional cache,
otherwise @c false a non-transactional.
@return
The row event if any.
*/
Rows_log_event*
THD::binlog_get_pending_rows_event(bool is_transactional) const
{
Rows_log_event* rows= NULL;
binlog_cache_mngr *const cache_mngr= thd_get_cache_mngr(this);
/*
This is less than ideal, but here's the story: If there is no cache_mngr,
prepare_pending_rows_event() has never been called (since the cache_mngr
is set up there). In that case, we just return NULL.
*/
if (cache_mngr)
{
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(is_transactional);
rows= cache_data->pending();
}
return (rows);
}
/**
@param db db name c-string to be inserted into alphabetically sorted
THD::binlog_accessed_db_names list.
Note, that space for both the data and the node
struct are allocated in THD::main_mem_root.
The list lasts for the top-level query time and is reset
in @c THD::cleanup_after_query().
*/
void
THD::add_to_binlog_accessed_dbs(const char *db_param)
{
char *after_db;
/*
binlog_accessed_db_names list is to maintain the database
names which are referenced in a given command.
Prior to bug 17806014 fix, 'main_mem_root' memory root used
to store this list. The 'main_mem_root' scope is till the end
of the query. Hence it caused increasing memory consumption
problem in big procedures like the ones mentioned below.
Eg: CALL p1() where p1 is having 1,00,000 create and drop tables.
'main_mem_root' is freed only at the end of the command CALL p1()'s
execution. But binlog_accessed_db_names list scope is only till the
individual statements specified the procedure(create/drop statements).
Hence the memory allocated in 'main_mem_root' was left uncleared
until the p1's completion, even though it is not required after
completion of individual statements.
Instead of using 'main_mem_root' whose scope is complete query execution,
now the memroot is changed to use 'thd->mem_root' whose scope is until the
individual statement in CALL p1(). 'thd->mem_root' is set to 'execute_mem_root'
in the context of procedure and it's scope is till the individual statement
in CALL p1() and thd->memroot is equal to 'main_mem_root' in the context
of a normal 'top level query'.
Eg: a) create table t1(i int); => If this function is called while
processing this statement, thd->memroot is equal to &main_mem_root
which will be freed immediately after executing this statement.
b) CALL p1() -> p1 contains create table t1(i int); => If this function
is called while processing create table statement which is inside
a stored procedure, then thd->memroot is equal to 'execute_mem_root'
which will be freed immediately after executing this statement.
In both a and b case, thd->memroot will be freed immediately and will not
increase memory consumption.
A special case(stored functions/triggers):
Consider the following example:
create function f1(i int) returns int
begin
insert into db1.t1 values (1);
insert into db2.t1 values (2);
end;
When we are processing SELECT f1(), the list should contain db1, db2 names.
Since thd->mem_root contains 'execute_mem_root' in the context of
stored function, the mem root will be freed after adding db1 in
the list and when we are processing the second statement and when we try
to add 'db2' in the db1's list, it will lead to crash as db1's memory
is already freed. To handle this special case, if in_sub_stmt is set
(which is true incase of stored functions/triggers), we use &main_mem_root,
if not set we will use thd->memroot which changes it's value to
'execute_mem_root' or '&main_mem_root' depends on the context.
*/
MEM_ROOT *db_mem_root= in_sub_stmt ? &main_mem_root : mem_root;
if (!binlog_accessed_db_names)
binlog_accessed_db_names= new (db_mem_root) List;
if (binlog_accessed_db_names->elements > MAX_DBS_IN_EVENT_MTS)
{
push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN,
ER_MTS_UPDATED_DBS_GREATER_MAX,
ER(ER_MTS_UPDATED_DBS_GREATER_MAX),
MAX_DBS_IN_EVENT_MTS);
return;
}
after_db= strdup_root(db_mem_root, db_param);
/*
sorted insertion is implemented with first rearranging data
(pointer to char*) of the links and final appending of the least
ordered data to create a new link in the list.
*/
if (binlog_accessed_db_names->elements != 0)
{
List_iterator it(*get_binlog_accessed_db_names());
while (it++)
{
char *swap= NULL;
char **ref_cur_db= it.ref();
int cmp= strcmp(after_db, *ref_cur_db);
DBUG_ASSERT(!swap || cmp < 0);
if (cmp == 0)
{
after_db= NULL; /* dup to ignore */
break;
}
else if (swap || cmp > 0)
{
swap= *ref_cur_db;
*ref_cur_db= after_db;
after_db= swap;
}
}
}
if (after_db)
binlog_accessed_db_names->push_back(after_db, db_mem_root);
}
/*
Tells if two (or more) tables have auto_increment columns and we want to
lock those tables with a write lock.
SYNOPSIS
has_two_write_locked_tables_with_auto_increment
tables Table list
NOTES:
Call this function only when you have established the list of all tables
which you'll want to update (including stored functions, triggers, views
inside your statement).
*/
static bool
has_write_table_with_auto_increment(TABLE_LIST *tables)
{
for (TABLE_LIST *table= tables; table; table= table->next_global)
{
/* we must do preliminary checks as table->table may be NULL */
if (!table->placeholder() &&
table->table->found_next_number_field &&
(table->lock_type >= TL_WRITE_ALLOW_WRITE))
return 1;
}
return 0;
}
/*
checks if we have select tables in the table list and write tables
with auto-increment column.
SYNOPSIS
has_two_write_locked_tables_with_auto_increment_and_select
tables Table list
RETURN VALUES
-true if the table list has atleast one table with auto-increment column
and atleast one table to select from.
-false otherwise
*/
static bool
has_write_table_with_auto_increment_and_select(TABLE_LIST *tables)
{
bool has_select= false;
bool has_auto_increment_tables = has_write_table_with_auto_increment(tables);
for(TABLE_LIST *table= tables; table; table= table->next_global)
{
if (!table->placeholder() &&
(table->lock_type <= TL_READ_NO_INSERT))
{
has_select= true;
break;
}
}
return(has_select && has_auto_increment_tables);
}
/*
Tells if there is a table whose auto_increment column is a part
of a compound primary key while is not the first column in
the table definition.
@param tables Table list
@return true if the table exists, fais if does not.
*/
static bool
has_write_table_auto_increment_not_first_in_pk(TABLE_LIST *tables)
{
for (TABLE_LIST *table= tables; table; table= table->next_global)
{
/* we must do preliminary checks as table->table may be NULL */
if (!table->placeholder() &&
table->table->found_next_number_field &&
(table->lock_type >= TL_WRITE_ALLOW_WRITE)
&& table->table->s->next_number_keypart != 0)
return 1;
}
return 0;
}
/**
Decide on logging format to use for the statement and issue errors
or warnings as needed. The decision depends on the following
parameters:
- The logging mode, i.e., the value of binlog_format. Can be
statement, mixed, or row.
- The type of statement. There are three types of statements:
"normal" safe statements; unsafe statements; and row injections.
An unsafe statement is one that, if logged in statement format,
might produce different results when replayed on the slave (e.g.,
INSERT DELAYED). A row injection is either a BINLOG statement, or
a row event executed by the slave's SQL thread.
- The capabilities of tables modified by the statement. The
*capabilities vector* for a table is a set of flags associated
with the table. Currently, it only includes two flags: *row
capability flag* and *statement capability flag*.
The row capability flag is set if and only if the engine can
handle row-based logging. The statement capability flag is set if
and only if the table can handle statement-based logging.
Decision table for logging format
---------------------------------
The following table summarizes how the format and generated
warning/error depends on the tables' capabilities, the statement
type, and the current binlog_format.
Row capable N NNNNNNNNN YYYYYYYYY YYYYYYYYY
Statement capable N YYYYYYYYY NNNNNNNNN YYYYYYYYY
Statement type * SSSUUUIII SSSUUUIII SSSUUUIII
binlog_format * SMRSMRSMR SMRSMRSMR SMRSMRSMR
Logged format - SS-S----- -RR-RR-RR SRRSRR-RR
Warning/Error 1 --2732444 5--5--6-- ---7--6--
Legend
------
Row capable: N - Some table not row-capable, Y - All tables row-capable
Stmt capable: N - Some table not stmt-capable, Y - All tables stmt-capable
Statement type: (S)afe, (U)nsafe, or Row (I)njection
binlog_format: (S)TATEMENT, (M)IXED, or (R)OW
Logged format: (S)tatement or (R)ow
Warning/Error: Warnings and error messages are as follows:
1. Error: Cannot execute statement: binlogging impossible since both
row-incapable engines and statement-incapable engines are
involved.
2. Error: Cannot execute statement: binlogging impossible since
BINLOG_FORMAT = ROW and at least one table uses a storage engine
limited to statement-logging.
3. Error: Cannot execute statement: binlogging of unsafe statement
is impossible when storage engine is limited to statement-logging
and BINLOG_FORMAT = MIXED.
4. Error: Cannot execute row injection: binlogging impossible since
at least one table uses a storage engine limited to
statement-logging.
5. Error: Cannot execute statement: binlogging impossible since
BINLOG_FORMAT = STATEMENT and at least one table uses a storage
engine limited to row-logging.
6. Error: Cannot execute row injection: binlogging impossible since
BINLOG_FORMAT = STATEMENT.
7. Warning: Unsafe statement binlogged in statement format since
BINLOG_FORMAT = STATEMENT.
In addition, we can produce the following error (not depending on
the variables of the decision diagram):
8. Error: Cannot execute statement: binlogging impossible since more
than one engine is involved and at least one engine is
self-logging.
For each error case above, the statement is prevented from being
logged, we report an error, and roll back the statement. For
warnings, we set the thd->binlog_flags variable: the warning will be
printed only if the statement is successfully logged.
@see THD::binlog_query
@param[in] thd Client thread
@param[in] tables Tables involved in the query
@retval 0 No error; statement can be logged.
@retval -1 One of the error conditions above applies (1, 2, 4, 5, or 6).
*/
int THD::decide_logging_format(TABLE_LIST *tables)
{
DBUG_ENTER("THD::decide_logging_format");
DBUG_PRINT("info", ("query: %s", query()));
DBUG_PRINT("info", ("variables.binlog_format: %lu",
variables.binlog_format));
DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x",
lex->get_stmt_unsafe_flags()));
reset_binlog_local_stmt_filter();
/*
We should not decide logging format if the binlog is closed or
binlogging is off, or if the statement is filtered out from the
binlog by filtering rules.
*/
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
!(variables.binlog_format == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db)))
{
/*
Compute one bit field with the union of all the engine
capabilities, and one with the intersection of all the engine
capabilities.
*/
handler::Table_flags flags_write_some_set= 0;
handler::Table_flags flags_access_some_set= 0;
handler::Table_flags flags_write_all_set=
HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE;
/*
If different types of engines are about to be updated.
For example: Innodb and Falcon; Innodb and MyIsam.
*/
my_bool multi_write_engine= FALSE;
/*
If different types of engines are about to be accessed
and any of them is about to be updated. For example:
Innodb and Falcon; Innodb and MyIsam.
*/
my_bool multi_access_engine= FALSE;
/*
Identifies if a table is changed.
*/
my_bool is_write= FALSE;
/*
A pointer to a previous table that was changed.
*/
TABLE* prev_write_table= NULL;
/*
A pointer to a previous table that was accessed.
*/
TABLE* prev_access_table= NULL;
/*
True if at least one table is transactional.
*/
bool write_to_some_transactional_table= false;
/*
True if at least one table is non-transactional.
*/
bool write_to_some_non_transactional_table= false;
/*
True if all non-transactional tables that has been updated
are temporary.
*/
bool write_all_non_transactional_are_tmp_tables= true;
/**
The number of tables used in the current statement,
that should be replicated.
*/
uint replicated_tables_count= 0;
/**
The number of tables written to in the current statement,
that should not be replicated.
A table should not be replicated when it is considered
'local' to a MySQL instance.
Currently, these tables are:
- mysql.slow_log
- mysql.general_log
- mysql.slave_relay_log_info
- mysql.slave_master_info
- mysql.slave_worker_info
- performance_schema.*
- TODO: information_schema.*
In practice, from this list, only performance_schema.* tables
are written to by user queries.
*/
uint non_replicated_tables_count= 0;
#ifndef DBUG_OFF
{
static const char *prelocked_mode_name[] = {
"NON_PRELOCKED",
"PRELOCKED",
"PRELOCKED_UNDER_LOCK_TABLES",
};
DBUG_PRINT("debug", ("prelocked_mode: %s",
prelocked_mode_name[locked_tables_mode]));
}
#endif
if (variables.binlog_format != BINLOG_FORMAT_ROW && tables)
{
/*
DML statements that modify a table with an auto_increment column based on
rows selected from a table are unsafe as the order in which the rows are
fetched fron the select tables cannot be determined and may differ on
master and slave.
*/
if (has_write_table_with_auto_increment_and_select(tables))
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_WRITE_AUTOINC_SELECT);
if (has_write_table_auto_increment_not_first_in_pk(tables))
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_NOT_FIRST);
/*
A query that modifies autoinc column in sub-statement can make the
master and slave inconsistent.
We can solve these problems in mixed mode by switching to binlogging
if at least one updated table is used by sub-statement
*/
if (lex->requires_prelocking() &&
has_write_table_with_auto_increment(lex->first_not_own_table()))
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS);
}
/*
Get the capabilities vector for all involved storage engines and
mask out the flags for the binary log.
*/
for (TABLE_LIST *table= tables; table; table= table->next_global)
{
if (table->placeholder())
continue;
handler::Table_flags const flags= table->table->file->ha_table_flags();
DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx",
table->table_name, flags));
if (table->table->no_replicate)
{
/*
The statement uses a table that is not replicated.
The following properties about the table:
- persistent / transient
- transactional / non transactional
- temporary / permanent
- read or write
- multiple engines involved because of this table
are not relevant, as this table is completely ignored.
Because the statement uses a non replicated table,
using STATEMENT format in the binlog is impossible.
Either this statement will be discarded entirely,
or it will be logged (possibly partially) in ROW format.
*/
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE);
if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
{
non_replicated_tables_count++;
continue;
}
}
replicated_tables_count++;
my_bool trans= table->table->file->has_transactions();
if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
{
write_to_some_transactional_table=
write_to_some_transactional_table || trans;
write_to_some_non_transactional_table=
write_to_some_non_transactional_table || !trans;
if (prev_write_table && prev_write_table->file->ht !=
table->table->file->ht)
multi_write_engine= TRUE;
if (table->table->s->tmp_table)
lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TEMP_TRANS_TABLE :
LEX::STMT_WRITES_TEMP_NON_TRANS_TABLE);
else
lex->set_stmt_accessed_table(trans ? LEX::STMT_WRITES_TRANS_TABLE :
LEX::STMT_WRITES_NON_TRANS_TABLE);
/*
Non-transactional updates are allowed when row binlog format is
used and all non-transactional tables are temporary.
Binlog format is checked on THD::is_dml_gtid_compatible() method.
*/
if (!trans)
write_all_non_transactional_are_tmp_tables=
write_all_non_transactional_are_tmp_tables &&
table->table->s->tmp_table;
flags_write_all_set &= flags;
flags_write_some_set |= flags;
is_write= TRUE;
prev_write_table= table->table;
/*
INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
can be unsafe. Check for it if the flag is already not marked for the
given statement.
*/
if (!lex->is_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS) &&
lex->sql_command == SQLCOM_INSERT &&
/* Duplicate key update is not supported by INSERT DELAYED */
get_command() != COM_DELAYED_INSERT && lex->duplicates == DUP_UPDATE)
{
uint keys= table->table->s->keys, i= 0, unique_keys= 0;
for (KEY* keyinfo= table->table->s->key_info;
i < keys && unique_keys <= 1; i++, keyinfo++)
{
if (keyinfo->flags & HA_NOSAME)
unique_keys++;
}
if (unique_keys > 1 )
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS);
}
}
flags_access_some_set |= flags;
if (lex->sql_command != SQLCOM_CREATE_TABLE ||
(lex->sql_command == SQLCOM_CREATE_TABLE &&
(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)))
{
if (table->table->s->tmp_table)
lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TEMP_TRANS_TABLE :
LEX::STMT_READS_TEMP_NON_TRANS_TABLE);
else
lex->set_stmt_accessed_table(trans ? LEX::STMT_READS_TRANS_TABLE :
LEX::STMT_READS_NON_TRANS_TABLE);
}
if (prev_access_table && prev_access_table->file->ht !=
table->table->file->ht)
multi_access_engine= TRUE;
prev_access_table= table->table;
}
DBUG_ASSERT(!is_write ||
write_to_some_transactional_table ||
write_to_some_non_transactional_table);
/*
write_all_non_transactional_are_tmp_tables may be true if any
non-transactional table was not updated, so we fix its value here.
*/
write_all_non_transactional_are_tmp_tables=
write_all_non_transactional_are_tmp_tables &&
write_to_some_non_transactional_table;
DBUG_PRINT("info", ("flags_write_all_set: 0x%llx", flags_write_all_set));
DBUG_PRINT("info", ("flags_write_some_set: 0x%llx", flags_write_some_set));
DBUG_PRINT("info", ("flags_access_some_set: 0x%llx", flags_access_some_set));
DBUG_PRINT("info", ("multi_write_engine: %d", multi_write_engine));
DBUG_PRINT("info", ("multi_access_engine: %d", multi_access_engine));
int error= 0;
int unsafe_flags;
bool multi_stmt_trans= in_multi_stmt_transaction_mode();
bool trans_table= trans_has_updated_trans_table(this);
bool binlog_direct= variables.binlog_direct_non_trans_update;
if (lex->is_mixed_stmt_unsafe(multi_stmt_trans, binlog_direct,
trans_table, tx_isolation))
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MIXED_STATEMENT);
else if (multi_stmt_trans && trans_table && !binlog_direct &&
lex->stmt_accessed_table(LEX::STMT_WRITES_NON_TRANS_TABLE))
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_NONTRANS_AFTER_TRANS);
/*
If more than one engine is involved in the statement and at
least one is doing it's own logging (is *self-logging*), the
statement cannot be logged atomically, so we generate an error
rather than allowing the binlog to become corrupt.
*/
if (multi_write_engine &&
(flags_write_some_set & HA_HAS_OWN_BINLOGGING))
my_error((error= ER_BINLOG_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE),
MYF(0));
else if (multi_access_engine && flags_access_some_set & HA_HAS_OWN_BINLOGGING)
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_MULTIPLE_ENGINES_AND_SELF_LOGGING_ENGINE);
/* both statement-only and row-only engines involved */
if ((flags_write_all_set & (HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE)) == 0)
{
/*
1. Error: Binary logging impossible since both row-incapable
engines and statement-incapable engines are involved
*/
my_error((error= ER_BINLOG_ROW_ENGINE_AND_STMT_ENGINE), MYF(0));
}
/* statement-only engines involved */
else if ((flags_write_all_set & HA_BINLOG_ROW_CAPABLE) == 0)
{
if (lex->is_stmt_row_injection())
{
/*
4. Error: Cannot execute row injection since table uses
storage engine limited to statement-logging
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
}
else if (variables.binlog_format == BINLOG_FORMAT_ROW &&
sqlcom_can_generate_row_events(this->lex->sql_command))
{
/*
2. Error: Cannot modify table that uses a storage engine
limited to statement-logging when BINLOG_FORMAT = ROW
*/
my_error((error= ER_BINLOG_ROW_MODE_AND_STMT_ENGINE), MYF(0));
}
else if ((unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
{
/*
3. Error: Cannot execute statement: binlogging of unsafe
statement is impossible when storage engine is limited to
statement-logging and BINLOG_FORMAT = MIXED.
*/
for (int unsafe_type= 0;
unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT;
unsafe_type++)
if (unsafe_flags & (1 << unsafe_type))
my_error((error= ER_BINLOG_UNSAFE_AND_STMT_ENGINE), MYF(0),
ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
}
/* log in statement format! */
}
/* no statement-only engines */
else
{
/* binlog_format = STATEMENT */
if (variables.binlog_format == BINLOG_FORMAT_STMT)
{
if (lex->is_stmt_row_injection())
{
/*
6. Error: Cannot execute row injection since
BINLOG_FORMAT = STATEMENT
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0));
}
else if ((flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 &&
sqlcom_can_generate_row_events(this->lex->sql_command))
{
/*
5. Error: Cannot modify table that uses a storage engine
limited to row-logging when binlog_format = STATEMENT
*/
my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), "");
}
else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
{
/*
7. Warning: Unsafe statement logged as statement due to
binlog_format = STATEMENT
*/
binlog_unsafe_warning_flags|= unsafe_flags;
DBUG_PRINT("info", ("Scheduling warning to be issued by "
"binlog_query: '%s'",
ER(ER_BINLOG_UNSAFE_STATEMENT)));
DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x",
binlog_unsafe_warning_flags));
}
/* log in statement format! */
}
/* No statement-only engines and binlog_format != STATEMENT.
I.e., nothing prevents us from row logging if needed. */
else
{
if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection()
|| (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0)
{
/* log in row format! */
set_current_stmt_binlog_format_row_if_mixed();
}
}
}
if (non_replicated_tables_count > 0)
{
if ((replicated_tables_count == 0) || ! is_write)
{
DBUG_PRINT("info", ("decision: no logging, no replicated table affected"));
set_binlog_local_stmt_filter();
}
else
{
if (! is_current_stmt_binlog_format_row())
{
my_error((error= ER_BINLOG_STMT_MODE_AND_NO_REPL_TABLES), MYF(0));
}
else
{
clear_binlog_local_stmt_filter();
}
}
}
else
{
clear_binlog_local_stmt_filter();
}
if (!error && enforce_gtid_consistency &&
!is_dml_gtid_compatible(write_to_some_transactional_table,
write_to_some_non_transactional_table,
write_all_non_transactional_are_tmp_tables))
error= 1;
if (error) {
DBUG_PRINT("info", ("decision: no logging since an error was generated"));
DBUG_RETURN(-1);
}
if (is_write &&
lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
{
/*
Master side of DML in the STMT format events parallelization.
All involving table db:s are stored in a abc-ordered name list.
In case the number of databases exceeds MAX_DBS_IN_EVENT_MTS maximum
the list gathering breaks since it won't be sent to the slave.
*/
for (TABLE_LIST *table= tables; table; table= table->next_global)
{
if (table->placeholder())
continue;
DBUG_ASSERT(table->table);
if (table->table->file->referenced_by_foreign_key())
{
/*
FK-referenced dbs can't be gathered currently. The following
event will be marked for sequential execution on slave.
*/
binlog_accessed_db_names= NULL;
add_to_binlog_accessed_dbs("");
break;
}
if (!is_current_stmt_binlog_format_row())
add_to_binlog_accessed_dbs(table->db);
}
}
DBUG_PRINT("info", ("decision: logging in %s format",
is_current_stmt_binlog_format_row() ?
"ROW" : "STATEMENT"));
if (variables.binlog_format == BINLOG_FORMAT_ROW &&
(lex->sql_command == SQLCOM_UPDATE ||
lex->sql_command == SQLCOM_UPDATE_MULTI ||
lex->sql_command == SQLCOM_DELETE ||
lex->sql_command == SQLCOM_DELETE_MULTI))
{
String table_names;
/*
Generate a warning for UPDATE/DELETE statements that modify a
BLACKHOLE table, as row events are not logged in row format.
*/
for (TABLE_LIST *table= tables; table; table= table->next_global)
{
if (table->placeholder())
continue;
if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB &&
table->lock_type >= TL_WRITE_ALLOW_WRITE)
{
table_names.append(table->table_name);
table_names.append(",");
}
}
if (!table_names.is_empty())
{
bool is_update= (lex->sql_command == SQLCOM_UPDATE ||
lex->sql_command == SQLCOM_UPDATE_MULTI);
/*
Replace the last ',' with '.' for table_names
*/
table_names.replace(table_names.length()-1, 1, ".", 1);
push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN,
WARN_ON_BLOCKHOLE_IN_RBR,
ER(WARN_ON_BLOCKHOLE_IN_RBR),
is_update ? "UPDATE" : "DELETE",
table_names.c_ptr());
}
}
}
#ifndef DBUG_OFF
else
DBUG_PRINT("info", ("decision: no logging since "
"mysql_bin_log.is_open() = %d "
"and (options & OPTION_BIN_LOG) = 0x%llx "
"and binlog_format = %lu "
"and binlog_filter->db_ok(db) = %d",
mysql_bin_log.is_open(),
(variables.option_bits & OPTION_BIN_LOG),
variables.binlog_format,
binlog_filter->db_ok(db)));
#endif
DBUG_RETURN(0);
}
bool THD::is_ddl_gtid_compatible() const
{
DBUG_ENTER("THD::is_ddl_gtid_compatible");
// If @@session.sql_log_bin has been manually turned off (only
// doable by SUPER), then no problem, we can execute any statement.
if ((variables.option_bits & OPTION_BIN_LOG) == 0)
DBUG_RETURN(true);
if (lex->sql_command == SQLCOM_CREATE_TABLE &&
!(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
lex->select_lex.item_list.elements)
{
if (opt_rds_allow_unsafe_stmt_with_gtid &&
(this->variables.gtid_next.type == AUTOMATIC_GROUP) &&
is_current_stmt_binlog_format_row())
{
#ifndef DBUG_OFF
THD *tmp_thd= (THD *) this;
tmp_thd->enable_unsafe_stmt= true;
#endif
DBUG_RETURN(true);
}
/*
CREATE ... SELECT (without TEMPORARY) is unsafe because if
binlog_format=row it will be logged as a CREATE TABLE followed
by row events, re-executed non-atomically as two transactions,
and then written to the slave's binary log as two separate
transactions with the same GTID.
*/
my_error(ER_GTID_UNSAFE_CREATE_SELECT, MYF(0));
DBUG_RETURN(false);
}
if ((lex->sql_command == SQLCOM_CREATE_TABLE &&
(lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) != 0) ||
(lex->sql_command == SQLCOM_DROP_TABLE && lex->drop_temporary))
{
/*
[CREATE|DROP] TEMPORARY TABLE is unsafe to execute
inside a transaction because the table will stay and the
transaction will be written to the slave's binary log with the
GTID even if the transaction is rolled back.
This includes the execution inside Functions and Triggers.
*/
if (in_multi_stmt_transaction_mode() || in_sub_stmt)
{
my_error(ER_GTID_UNSAFE_CREATE_DROP_TEMPORARY_TABLE_IN_TRANSACTION,
MYF(0));
DBUG_RETURN(false);
}
}
DBUG_RETURN(true);
}
bool
THD::is_dml_gtid_compatible(bool transactional_table,
bool non_transactional_table,
bool non_transactional_tmp_tables) const
{
DBUG_ENTER("THD::is_dml_gtid_compatible(bool, bool, bool)");
// If @@session.sql_log_bin has been manually turned off (only
// doable by SUPER), then no problem, we can execute any statement.
if ((variables.option_bits & OPTION_BIN_LOG) == 0)
DBUG_RETURN(true);
/*
Single non-transactional updates are allowed when not mixed
together with transactional statements within a transaction.
Furthermore, writing to transactional and non-transactional
engines in a single statement is also disallowed.
Multi-statement transactions on non-transactional tables are
split into single-statement transactions when
GTID_NEXT = "AUTOMATIC".
Non-transactional updates are allowed when row binlog format is
used and all non-transactional tables are temporary.
The debug symbol "allow_gtid_unsafe_non_transactional_updates"
disables the error. This is useful because it allows us to run
old tests that were not written with the restrictions of GTIDs in
mind.
*/
if (non_transactional_table &&
(transactional_table || trans_has_updated_trans_table(this)) &&
!(non_transactional_tmp_tables && is_current_stmt_binlog_format_row()) &&
!DBUG_EVALUATE_IF("allow_gtid_unsafe_non_transactional_updates", 1, 0))
{
if (opt_rds_allow_unsafe_stmt_with_gtid &&
this->variables.gtid_next.type == AUTOMATIC_GROUP && // Gtid is auto-generated
((lex->sql_command != SQLCOM_UPDATE_MULTI &&
lex->sql_command != SQLCOM_DELETE_MULTI) || // Allow multi update only when binlog format is row
is_current_stmt_binlog_format_row()))
{
#ifndef DBUG_OFF
THD *tmp_thd= (THD *) this;
tmp_thd->enable_unsafe_stmt= true;
#endif
DBUG_RETURN(true);
}
my_error(ER_GTID_UNSAFE_NON_TRANSACTIONAL_TABLE, MYF(0));
DBUG_RETURN(false);
}
DBUG_RETURN(true);
}
/*
Implementation of interface to write rows to the binary log through the
thread. The thread is responsible for writing the rows it has
inserted/updated/deleted.
*/
#ifndef MYSQL_CLIENT
/*
Template member function for ensuring that there is an rows log
event of the apropriate type before proceeding.
PRE CONDITION:
- Events of type 'RowEventT' have the type code 'type_code'.
POST CONDITION:
If a non-NULL pointer is returned, the pending event for thread 'thd' will
be an event of type 'RowEventT' (which have the type code 'type_code')
will either empty or have enough space to hold 'needed' bytes. In
addition, the columns bitmap will be correct for the row, meaning that
the pending event will be flushed if the columns in the event differ from
the columns suppled to the function.
RETURNS
If no error, a non-NULL pending event (either one which already existed or
the newly created one).
If error, NULL.
*/
template Rows_log_event*
THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
size_t needed,
bool is_transactional,
RowsEventT *hint MY_ATTRIBUTE((unused)),
const uchar* extra_row_info)
{
DBUG_ENTER("binlog_prepare_pending_rows_event");
/* Fetch the type code for the RowsEventT template parameter */
int const general_type_code= RowsEventT::TYPE_CODE;
Rows_log_event* pending= binlog_get_pending_rows_event(is_transactional);
if (unlikely(pending && !pending->is_valid()))
DBUG_RETURN(NULL);
/*
Check if the current event is non-NULL and a write-rows
event. Also check if the table provided is mapped: if it is not,
then we have switched to writing to a new table.
If there is no pending event, we need to create one. If there is a pending
event, but it's not about the same table id, or not of the same type
(between Write, Update and Delete), or not the same affected columns, or
going to be too big, flush this event to disk and create a new pending
event.
*/
if (!pending ||
pending->server_id != serv_id ||
pending->get_table_id() != table->s->table_map_id ||
pending->get_general_type_code() != general_type_code ||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
pending->read_write_bitmaps_cmp(table) == FALSE ||
!binlog_row_event_extra_data_eq(pending->get_extra_row_data(),
extra_row_info))
{
/* Create a new RowsEventT... */
Rows_log_event* const
ev= new RowsEventT(this, table, table->s->table_map_id,
is_transactional, extra_row_info);
if (unlikely(!ev))
DBUG_RETURN(NULL);
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
/*
flush the pending event and replace it with the newly created
event...
*/
if (unlikely(
mysql_bin_log.flush_and_set_pending_rows_event(this, ev,
is_transactional)))
{
delete ev;
DBUG_RETURN(NULL);
}
DBUG_RETURN(ev); /* This is the new pending event */
}
DBUG_RETURN(pending); /* This is the current pending event */
}
/* Declare in unnamed namespace. */
CPP_UNNAMED_NS_START
/**
Class to handle temporary allocation of memory for row data.
The responsibilities of the class is to provide memory for
packing one or two rows of packed data (depending on what
constructor is called).
In order to make the allocation more efficient for "simple" rows,
i.e., rows that do not contain any blobs, a pointer to the
allocated memory is of memory is stored in the table structure
for simple rows. If memory for a table containing a blob field
is requested, only memory for that is allocated, and subsequently
released when the object is destroyed.
*/
class Row_data_memory {
public:
/**
Build an object to keep track of a block-local piece of memory
for storing a row of data.
@param table
Table where the pre-allocated memory is stored.
@param length
Length of data that is needed, if the record contain blobs.
*/
Row_data_memory(TABLE *table, size_t const len1)
: m_memory(0)
{
#ifndef DBUG_OFF
m_alloc_checked= FALSE;
#endif
allocate_memory(table, len1);
m_ptr[0]= has_memory() ? m_memory : 0;
m_ptr[1]= 0;
}
Row_data_memory(TABLE *table, size_t const len1, size_t const len2)
: m_memory(0)
{
#ifndef DBUG_OFF
m_alloc_checked= FALSE;
#endif
allocate_memory(table, len1 + len2);
m_ptr[0]= has_memory() ? m_memory : 0;
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
}
~Row_data_memory()
{
if (m_memory != 0 && m_release_memory_on_destruction)
my_free(m_memory);
}
/**
Is there memory allocated?
@retval true There is memory allocated
@retval false Memory allocation failed
*/
bool has_memory() const {
#ifndef DBUG_OFF
m_alloc_checked= TRUE;
#endif
return m_memory != 0;
}
uchar *slot(uint s)
{
DBUG_ASSERT(s < sizeof(m_ptr)/sizeof(*m_ptr));
DBUG_ASSERT(m_ptr[s] != 0);
DBUG_ASSERT(m_alloc_checked == TRUE);
return m_ptr[s];
}
private:
void allocate_memory(TABLE *const table, size_t const total_length)
{
if (table->s->blob_fields == 0)
{
/*
The maximum length of a packed record is less than this
length. We use this value instead of the supplied length
when allocating memory for records, since we don't know how
the memory will be used in future allocations.
Since table->s->reclength is for unpacked records, we have
to add two bytes for each field, which can potentially be
added to hold the length of a packed field.
*/
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
/*
Allocate memory for two records if memory hasn't been
allocated. We allocate memory for two records so that it can
be used when processing update rows as well.
*/
if (table->write_row_record == 0)
table->write_row_record=
(uchar *) alloc_root(&table->mem_root, 2 * maxlen);
m_memory= table->write_row_record;
m_release_memory_on_destruction= FALSE;
}
else
{
m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME));
m_release_memory_on_destruction= TRUE;
}
}
#ifndef DBUG_OFF
mutable bool m_alloc_checked;
#endif
bool m_release_memory_on_destruction;
uchar *m_memory;
uchar *m_ptr[2];
};
CPP_UNNAMED_NS_END
int THD::binlog_write_row(TABLE* table, bool is_trans,
uchar const *record,
const uchar* extra_row_info)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
/*
Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter.
*/
Row_data_memory memory(table, max_row_length(table, record));
if (!memory.has_memory())
return HA_ERR_OUT_OF_MEM;
uchar *row_data= memory.slot(0);
size_t const len= pack_row(table, table->write_set, row_data, record);
Rows_log_event* const ev=
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
static_cast(0),
extra_row_info);
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
return ev->add_row_data(row_data, len);
}
int THD::binlog_update_row(TABLE* table, bool is_trans,
const uchar *before_record,
const uchar *after_record,
const uchar* extra_row_info)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
int error= 0;
/**
Save a reference to the original read and write set bitmaps.
We will need this to restore the bitmaps at the end.
*/
MY_BITMAP *old_read_set= table->read_set;
MY_BITMAP *old_write_set= table->write_set;
/**
This will remove spurious fields required during execution but
not needed for binlogging. This is done according to the:
binlog-row-image option.
*/
binlog_prepare_row_images(table);
size_t const before_maxlen = max_row_length(table, before_record);
size_t const after_maxlen = max_row_length(table, after_record);
Row_data_memory row_data(table, before_maxlen, after_maxlen);
if (!row_data.has_memory())
return HA_ERR_OUT_OF_MEM;
uchar *before_row= row_data.slot(0);
uchar *after_row= row_data.slot(1);
size_t const before_size= pack_row(table, table->read_set, before_row,
before_record);
size_t const after_size= pack_row(table, table->write_set, after_row,
after_record);
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
*/
#ifndef HAVE_purify
DBUG_DUMP("before_record", before_record, table->s->reclength);
DBUG_DUMP("after_record", after_record, table->s->reclength);
DBUG_DUMP("before_row", before_row, before_size);
DBUG_DUMP("after_row", after_row, after_size);
#endif
Rows_log_event* const ev=
binlog_prepare_pending_rows_event(table, server_id,
before_size + after_size, is_trans,
static_cast(0),
extra_row_info);
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
error= ev->add_row_data(before_row, before_size) ||
ev->add_row_data(after_row, after_size);
/* restore read/write set for the rest of execution */
table->column_bitmaps_set_no_signal(old_read_set,
old_write_set);
return error;
}
int THD::binlog_delete_row(TABLE* table, bool is_trans,
uchar const *record,
const uchar* extra_row_info)
{
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
int error= 0;
/**
Save a reference to the original read and write set bitmaps.
We will need this to restore the bitmaps at the end.
*/
MY_BITMAP *old_read_set= table->read_set;
MY_BITMAP *old_write_set= table->write_set;
/**
This will remove spurious fields required during execution but
not needed for binlogging. This is done according to the:
binlog-row-image option.
*/
binlog_prepare_row_images(table);
/*
Pack records into format for transfer. We are allocating more
memory than needed, but that doesn't matter.
*/
Row_data_memory memory(table, max_row_length(table, record));
if (unlikely(!memory.has_memory()))
return HA_ERR_OUT_OF_MEM;
uchar *row_data= memory.slot(0);
DBUG_DUMP("table->read_set", (uchar*) table->read_set->bitmap, (table->s->fields + 7) / 8);
size_t const len= pack_row(table, table->read_set, row_data, record);
Rows_log_event* const ev=
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
static_cast(0),
extra_row_info);
if (unlikely(ev == 0))
return HA_ERR_OUT_OF_MEM;
error= ev->add_row_data(row_data, len);
/* restore read/write set for the rest of execution */
table->column_bitmaps_set_no_signal(old_read_set,
old_write_set);
return error;
}
void THD::binlog_prepare_row_images(TABLE *table)
{
DBUG_ENTER("THD::binlog_prepare_row_images");
/**
Remove from read_set spurious columns. The write_set has been
handled before in table->mark_columns_needed_for_update.
*/
DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", table->read_set);
THD *thd= table->in_use;
/**
if there is a primary key in the table (ie, user declared PK or a
non-null unique index) and we dont want to ship the entire image,
and the handler involved supports this.
*/
if (table->s->primary_key < MAX_KEY &&
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
{
/**
Just to be sure that tmp_set is currently not in use as
the read_set already.
*/
DBUG_ASSERT(table->read_set != &table->tmp_set);
bitmap_clear_all(&table->tmp_set);
switch(thd->variables.binlog_row_image)
{
case BINLOG_ROW_IMAGE_MINIMAL:
/* MINIMAL: Mark only PK */
table->mark_columns_used_by_index_no_reset(table->s->primary_key,
&table->tmp_set);
break;
case BINLOG_ROW_IMAGE_NOBLOB:
/**
NOBLOB: Remove unnecessary BLOB fields from read_set
(the ones that are not part of PK).
*/
bitmap_union(&table->tmp_set, table->read_set);
for (Field **ptr=table->field ; *ptr ; ptr++)
{
Field *field= (*ptr);
if ((field->type() == MYSQL_TYPE_BLOB) &&
!(field->flags & PRI_KEY_FLAG))
bitmap_clear_bit(&table->tmp_set, field->field_index);
}
break;
default:
DBUG_ASSERT(0); // impossible.
}
/* set the temporary read_set */
table->column_bitmaps_set_no_signal(&table->tmp_set,
table->write_set);
}
DBUG_PRINT_BITSET("debug", "table->read_set (after preparing): %s", table->read_set);
DBUG_VOID_RETURN;
}
int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
{
DBUG_ENTER("THD::binlog_flush_pending_rows_event");
/*
We shall flush the pending event even if we are not in row-based
mode: it might be the case that we left row-based mode before
flushing anything (e.g., if we have explicitly locked tables).
*/
if (!mysql_bin_log.is_open())
DBUG_RETURN(0);
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
*/
int error= 0;
if (Rows_log_event *pending= binlog_get_pending_rows_event(is_transactional))
{
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
binlog_table_maps= 0;
}
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
is_transactional);
}
DBUG_RETURN(error);
}
/**
binlog_row_event_extra_data_eq
Comparator for two binlog row event extra data
pointers.
It compares their significant bytes.
Null pointers are acceptable
@param a
first pointer
@param b
first pointer
@return
true if the referenced structures are equal
*/
bool
THD::binlog_row_event_extra_data_eq(const uchar* a,
const uchar* b)
{
return ((a == b) ||
((a != NULL) &&
(b != NULL) &&
(a[EXTRA_ROW_INFO_LEN_OFFSET] ==
b[EXTRA_ROW_INFO_LEN_OFFSET]) &&
(memcmp(a, b,
a[EXTRA_ROW_INFO_LEN_OFFSET]) == 0)));
}
#if !defined(DBUG_OFF) && !defined(_lint)
static const char *
show_query_type(THD::enum_binlog_query_type qtype)
{
switch (qtype) {
case THD::ROW_QUERY_TYPE:
return "ROW";
case THD::STMT_QUERY_TYPE:
return "STMT";
case THD::QUERY_TYPE_COUNT:
default:
DBUG_ASSERT(0 <= qtype && qtype < THD::QUERY_TYPE_COUNT);
}
static char buf[64];
sprintf(buf, "UNKNOWN#%d", qtype);
return buf;
}
#endif
/**
Auxiliary function to reset the limit unsafety warning suppression.
*/
static void reset_binlog_unsafe_suppression()
{
DBUG_ENTER("reset_binlog_unsafe_suppression");
unsafe_warning_suppression_is_activated= false;
limit_unsafe_warning_count= 0;
limit_unsafe_suppression_start_time= my_getsystime()/10000000;
DBUG_VOID_RETURN;
}
/**
Auxiliary function to print warning in the error log.
*/
static void print_unsafe_warning_to_log(int unsafe_type, char* buf,
char* query)
{
DBUG_ENTER("print_unsafe_warning_in_log");
sprintf(buf, ER(ER_BINLOG_UNSAFE_STATEMENT),
ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
sql_print_warning(ER(ER_MESSAGE_AND_STATEMENT), buf, query);
DBUG_VOID_RETURN;
}
/**
Auxiliary function to check if the warning for limit unsafety should be
thrown or suppressed. Details of the implementation can be found in the
comments inline.
SYNOPSIS:
@params
buf - buffer to hold the warning message text
unsafe_type - The type of unsafety.
query - The actual query statement.
TODO: Remove this function and implement a general service for all warnings
that would prevent flooding the error log.
*/
static void do_unsafe_limit_checkout(char* buf, int unsafe_type, char* query)
{
ulonglong now;
DBUG_ENTER("do_unsafe_limit_checkout");
DBUG_ASSERT(unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT);
limit_unsafe_warning_count++;
/*
INITIALIZING:
If this is the first time this function is called with log warning
enabled, the monitoring the unsafe warnings should start.
*/
if (limit_unsafe_suppression_start_time == 0)
{
limit_unsafe_suppression_start_time= my_getsystime()/10000000;
print_unsafe_warning_to_log(unsafe_type, buf, query);
}
else
{
if (!unsafe_warning_suppression_is_activated)
print_unsafe_warning_to_log(unsafe_type, buf, query);
if (limit_unsafe_warning_count >=
LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT)
{
now= my_getsystime()/10000000;
if (!unsafe_warning_suppression_is_activated)
{
/*
ACTIVATION:
We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT warnings in
less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT we activate the
suppression.
*/
if ((now-limit_unsafe_suppression_start_time) <=
LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
{
unsafe_warning_suppression_is_activated= true;
DBUG_PRINT("info",("A warning flood has been detected and the limit \
unsafety warning suppression has been activated."));
}
else
{
/*
there is no flooding till now, therefore we restart the monitoring
*/
limit_unsafe_suppression_start_time= my_getsystime()/10000000;
limit_unsafe_warning_count= 0;
}
}
else
{
/*
Print the suppression note and the unsafe warning.
*/
sql_print_information("The following warning was suppressed %d times \
during the last %d seconds in the error log",
limit_unsafe_warning_count,
(int)
(now-limit_unsafe_suppression_start_time));
print_unsafe_warning_to_log(unsafe_type, buf, query);
/*
DEACTIVATION: We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT
warnings in more than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT, the
suppression should be deactivated.
*/
if ((now - limit_unsafe_suppression_start_time) >
LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
{
reset_binlog_unsafe_suppression();
DBUG_PRINT("info",("The limit unsafety warning supression has been \
deactivated"));
}
}
limit_unsafe_warning_count= 0;
}
}
DBUG_VOID_RETURN;
}
/**
Auxiliary method used by @c binlog_query() to raise warnings.
The type of warning and the type of unsafeness is stored in
THD::binlog_unsafe_warning_flags.
*/
void THD::issue_unsafe_warnings()
{
char buf[MYSQL_ERRMSG_SIZE * 2];
DBUG_ENTER("issue_unsafe_warnings");
/*
Ensure that binlog_unsafe_warning_flags is big enough to hold all
bits. This is actually a constant expression.
*/
DBUG_ASSERT(LEX::BINLOG_STMT_UNSAFE_COUNT <=
sizeof(binlog_unsafe_warning_flags) * CHAR_BIT);
uint32 unsafe_type_flags= binlog_unsafe_warning_flags;
/*
For each unsafe_type, check if the statement is unsafe in this way
and issue a warning.
*/
for (int unsafe_type=0;
unsafe_type < LEX::BINLOG_STMT_UNSAFE_COUNT;
unsafe_type++)
{
if ((unsafe_type_flags & (1 << unsafe_type)) != 0)
{
push_warning_printf(this, Sql_condition::WARN_LEVEL_NOTE,
ER_BINLOG_UNSAFE_STATEMENT,
ER(ER_BINLOG_UNSAFE_STATEMENT),
ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
if (log_warnings)
{
if (unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT)
do_unsafe_limit_checkout( buf, unsafe_type, query());
else //cases other than LIMIT unsafety
print_unsafe_warning_to_log(unsafe_type, buf, query());
}
}
}
DBUG_VOID_RETURN;
}
/**
Log the current query.
The query will be logged in either row format or statement format
depending on the value of @c current_stmt_binlog_format_row field and
the value of the @c qtype parameter.
This function must be called:
- After the all calls to ha_*_row() functions have been issued.
- After any writes to system tables. Rationale: if system tables
were written after a call to this function, and the master crashes
after the call to this function and before writing the system
tables, then the master and slave get out of sync.
- Before tables are unlocked and closed.
@see decide_logging_format
@retval 0 Success
@retval nonzero If there is a failure when writing the query (e.g.,
write failure), then the error code is returned.
*/
int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
ulong query_len, bool is_trans, bool direct,
bool suppress_use, int errcode)
{
DBUG_ENTER("THD::binlog_query");
DBUG_PRINT("enter", ("qtype: %s query: '%s'",
show_query_type(qtype), query_arg));
DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
if (get_binlog_local_stmt_filter() == BINLOG_FILTER_SET)
{
/*
The current statement is to be ignored, and not written to
the binlog. Do not call issue_unsafe_warnings().
*/
DBUG_RETURN(0);
}
/*
If we are not in prelocked mode, mysql_unlock_tables() will be
called after this binlog_query(), so we have to flush the pending
rows event with the STMT_END_F set to unlock all tables at the
slave side as well.
If we are in prelocked mode, the flushing will be done inside the
top-most close_thread_tables().
*/
if (this->locked_tables_mode <= LTM_LOCK_TABLES)
if (int error= binlog_flush_pending_rows_event(TRUE, is_trans))
DBUG_RETURN(error);
/*
Warnings for unsafe statements logged in statement format are
printed in three places instead of in decide_logging_format().
This is because the warnings should be printed only if the statement
is actually logged. When executing decide_logging_format(), we cannot
know for sure if the statement will be logged:
1 - sp_head::execute_procedure which prints out warnings for calls to
stored procedures.
2 - sp_head::execute_function which prints out warnings for calls
involving functions.
3 - THD::binlog_query (here) which prints warning for top level
statements not covered by the two cases above: i.e., if not insided a
procedure and a function.
Besides, we should not try to print these warnings if it is not
possible to write statements to the binary log as it happens when
the execution is inside a function, or generaly speaking, when
the variables.option_bits & OPTION_BIN_LOG is false.
*/
if ((variables.option_bits & OPTION_BIN_LOG) &&
sp_runtime_ctx == NULL && !binlog_evt_union.do_union)
issue_unsafe_warnings();
switch (qtype) {
/*
ROW_QUERY_TYPE means that the statement may be logged either in
row format or in statement format. If
current_stmt_binlog_format is row, it means that the
statement has already been logged in row format and hence shall
not be logged again.
*/
case THD::ROW_QUERY_TYPE:
DBUG_PRINT("debug",
("is_current_stmt_binlog_format_row: %d",
is_current_stmt_binlog_format_row()));
if (is_current_stmt_binlog_format_row())
DBUG_RETURN(0);
/* Fall through */
/*
STMT_QUERY_TYPE means that the query must be logged in statement
format; it cannot be logged in row format. This is typically
used by DDL statements. It is an error to use this query type
if current_stmt_binlog_format_row is row.
@todo Currently there are places that call this method with
STMT_QUERY_TYPE and current_stmt_binlog_format is row. Fix those
places and add assert to ensure correct behavior. /Sven
*/
case THD::STMT_QUERY_TYPE:
/*
The MYSQL_LOG::write() function will set the STMT_END_F flag and
flush the pending rows event if necessary.
*/
{
Query_log_event qinfo(this, query_arg, query_len, is_trans, direct,
suppress_use, errcode);
/*
Binlog table maps will be irrelevant after a Query_log_event
(they are just removed on the slave side) so after the query
log event is written to the binary log, we pretend that no
table maps were written.
*/
int error= mysql_bin_log.write_event(&qinfo);
binlog_table_maps= 0;
DBUG_RETURN(error);
}
break;
case THD::QUERY_TYPE_COUNT:
default:
DBUG_ASSERT(0 <= qtype && qtype < QUERY_TYPE_COUNT);
}
DBUG_RETURN(0);
}
#endif /* !defined(MYSQL_CLIENT) */
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
/** @} */
mysql_declare_plugin(binlog)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&binlog_storage_engine,
"binlog",
"MySQL AB",
"This is a pseudo storage engine to represent the binlog in a transaction",
PLUGIN_LICENSE_GPL,
binlog_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
NULL, /* status variables */
NULL, /* system variables */
NULL, /* config options */
0,
}
mysql_declare_plugin_end;