Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 26 additions & 44 deletions src/dbms/inmemory/replication_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,12 @@ void MoveDurabilityFiles(std::vector<storage::durability::SnapshotDurabilityInfo
RemoveDirIfEmpty(backup_wal_dir);
}

// Read durability files into optional arguments
auto ReadDurabilityFiles(
std::optional<std::vector<storage::durability::SnapshotDurabilityInfo>> &maybe_old_snapshot_files,
std::filesystem::path const &current_snapshot_dir,
std::optional<std::vector<storage::durability::WalDurabilityInfo>> &maybe_old_wal_files,
std::filesystem::path const &current_wal_dir) -> bool {
auto maybe_wal_files = storage::durability::GetWalFiles(current_wal_dir);
// If there are 0 WAL files, replica will be recovered.
if (!maybe_wal_files.has_value()) {
spdlog::warn("Failed to read current WAL files. Replica won't be recovered.");
return false;
}
maybe_old_wal_files.emplace(std::move(*maybe_wal_files));
// Read all snapshot files
maybe_old_snapshot_files.emplace(storage::durability::GetSnapshotFiles(current_snapshot_dir));
return true;
void ReadDurabilityFiles(std::vector<storage::durability::SnapshotDurabilityInfo> &old_snapshot_files,
std::filesystem::path const &current_snapshot_dir,
std::vector<storage::durability::WalDurabilityInfo> &old_wal_files,
std::filesystem::path const &current_wal_dir) {
old_wal_files = storage::durability::GetWalFiles(current_wal_dir);
old_snapshot_files = storage::durability::GetSnapshotFiles(current_snapshot_dir);
}

struct BackupDirectories {
Expand Down Expand Up @@ -507,19 +497,11 @@ void InMemoryReplicationHandlers::SnapshotHandler(rpc::FileReplicationHandler co

// Read durability files
auto const curr_snapshot_files = storage::durability::GetSnapshotFiles(current_snapshot_dir);
auto const maybe_curr_wal_files = storage::durability::GetWalFiles(current_wal_directory);
// If there are 0 WAL files, replica will be recovered.
if (!maybe_curr_wal_files.has_value()) {
spdlog::error("Cannot read current WAL files. Replica won't be recovered.");
rpc::SendFinalResponse(storage::replication::SnapshotRes{std::nullopt, 0}, request_version, res_builder,
fmt::format("db: {}", storage->name()));
return;
}
auto const curr_wal_files = storage::durability::GetWalFiles(current_wal_directory);

auto const &curr_wal_files = *maybe_curr_wal_files;
auto const &active_files = file_replication_handler.GetActiveFileNames();
MG_ASSERT(active_files.size() == 1, "Received {} snapshot files but expecting only one!", active_files.size());
auto const src_snapshot_file = active_files[0];
auto const &src_snapshot_file = active_files[0];
auto const dst_snapshot_file = current_snapshot_dir / active_files[0].filename();

if (!utils::RenamePath(src_snapshot_file, dst_snapshot_file)) {
Expand Down Expand Up @@ -653,8 +635,8 @@ void InMemoryReplicationHandlers::WalFilesHandler(rpc::FileReplicationHandler co
}
auto const &[backup_snapshot_dir, backup_wal_dir] = *maybe_backup_dirs;

std::optional<std::vector<storage::durability::SnapshotDurabilityInfo>> maybe_old_snapshot_files;
std::optional<std::vector<storage::durability::WalDurabilityInfo>> maybe_old_wal_files;
std::vector<storage::durability::SnapshotDurabilityInfo> old_snapshot_files;
std::vector<storage::durability::WalDurabilityInfo> old_wal_files;

if (req.reset_needed) {
{
Expand All @@ -670,12 +652,7 @@ void InMemoryReplicationHandlers::WalFilesHandler(rpc::FileReplicationHandler co
storage->name());
storage->Clear();
}
if (!ReadDurabilityFiles(maybe_old_snapshot_files, current_snapshot_dir, maybe_old_wal_files,
current_wal_directory)) {
rpc::SendFinalResponse(storage::replication::WalFilesRes{std::nullopt, 0}, request_version, res_builder,
fmt::format("db: {}", storage->name()));
return;
}
ReadDurabilityFiles(old_snapshot_files, current_snapshot_dir, old_wal_files, current_wal_directory);
}

const auto wal_file_number = req.file_number;
Expand All @@ -687,6 +664,10 @@ void InMemoryReplicationHandlers::WalFilesHandler(rpc::FileReplicationHandler co
uint64_t num_committed_txns{0};
for (auto i = 0; i < wal_file_number; ++i) {
auto const load_wal_res = LoadWal(active_files[i], storage, res_builder, local_batch_counter);
// Failure to delete the received WAL file isn't fatal since it is saved in the tmp directory so it will eventually
// get deleted
utils::DeleteFile(active_files[i]);

if (!load_wal_res.success) {
spdlog::debug("Replication recovery from WAL files failed while loading one of WAL files for db {}.",
storage->name());
Expand All @@ -705,7 +686,7 @@ void InMemoryReplicationHandlers::WalFilesHandler(rpc::FileReplicationHandler co
rpc::SendFinalResponse(res, request_version, res_builder, fmt::format("db: {}", storage->name()));

if (req.reset_needed) {
MoveDurabilityFiles(*maybe_old_snapshot_files, backup_snapshot_dir, *maybe_old_wal_files, backup_wal_dir,
MoveDurabilityFiles(old_snapshot_files, backup_snapshot_dir, old_wal_files, backup_wal_dir,
&(storage->file_retainer_));
}
}
Expand Down Expand Up @@ -756,8 +737,8 @@ void InMemoryReplicationHandlers::CurrentWalHandler(rpc::FileReplicationHandler
}
auto const &[backup_snapshot_dir, backup_wal_dir] = *maybe_backup_dirs;

std::optional<std::vector<storage::durability::SnapshotDurabilityInfo>> maybe_old_snapshot_files;
std::optional<std::vector<storage::durability::WalDurabilityInfo>> maybe_old_wal_files;
std::vector<storage::durability::SnapshotDurabilityInfo> old_snapshot_files;
std::vector<storage::durability::WalDurabilityInfo> old_wal_files;

if (req.reset_needed) {
{
Expand All @@ -771,12 +752,7 @@ void InMemoryReplicationHandlers::CurrentWalHandler(rpc::FileReplicationHandler
storage->name());
storage->Clear();
}
if (!ReadDurabilityFiles(maybe_old_snapshot_files, current_snapshot_dir, maybe_old_wal_files,
current_wal_directory)) {
rpc::SendFinalResponse(storage::replication::CurrentWalRes{}, request_version, res_builder,
fmt::format("db: {}", storage->name()));
return;
}
ReadDurabilityFiles(old_snapshot_files, current_snapshot_dir, old_wal_files, current_wal_directory);
}

// Even if loading wal file failed, we return last_durable_timestamp to the main because it is not a fatal error
Expand All @@ -799,9 +775,13 @@ void InMemoryReplicationHandlers::CurrentWalHandler(rpc::FileReplicationHandler
rpc::SendFinalResponse(res, request_version, res_builder, fmt::format("db: {}", storage->name()));

if (req.reset_needed) {
MoveDurabilityFiles(*maybe_old_snapshot_files, backup_snapshot_dir, *maybe_old_wal_files, backup_wal_dir,
MoveDurabilityFiles(old_snapshot_files, backup_snapshot_dir, old_wal_files, backup_wal_dir,
&(storage->file_retainer_));
}

// Failure to delete the received WAL file isn't fatal since it is saved in the tmp directory so it will eventually
// get deleted
utils::DeleteFile(active_files[0]);
}

// The method will return false and hence signal the failure of completely loading the WAL file if:
Expand Down Expand Up @@ -920,6 +900,8 @@ std::optional<storage::SingleTxnDeltasProcessingResult> InMemoryReplicationHandl
return storage::StorageAccessType::READ;
case storage::durability::TransactionAccessType::READ_ONLY:
return storage::StorageAccessType::READ_ONLY;
default:
throw std::runtime_error("Unrecognized access type!");
}
};

Expand Down
91 changes: 50 additions & 41 deletions src/storage/v2/durability/durability.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/indices/text_index.hpp"
#include "storage/v2/inmemory/edge_property_index.hpp"
#include "storage/v2/inmemory/edge_type_index.hpp"
#include "storage/v2/inmemory/edge_type_property_index.hpp"
Expand Down Expand Up @@ -110,55 +109,70 @@ void VerifyStorageDirectoryOwnerAndProcessUserOrDie(const std::filesystem::path
user_process, user_directory, user_directory);
}

bool ValidateDurabilityFile(std::filesystem::directory_entry const &dir_entry) {
auto const &path = dir_entry.path();
if (!dir_entry.is_regular_file()) {
spdlog::error("{} is not a regular file", path);
return false;
}

if (!utils::HasReadAccess(path)) {
spdlog::warn("Skipping durability file '{}' because it is not readable, check file ownership and read permissions!",
path);
return false;
}

return true;
}

std::vector<SnapshotDurabilityInfo> GetSnapshotFiles(const std::filesystem::path &snapshot_directory,
const std::string_view uuid) {
if (!utils::DirExists(snapshot_directory)) {
spdlog::error("Snapshot directory {} doesn't exist", snapshot_directory);
return {};
}

std::vector<SnapshotDurabilityInfo> snapshot_files;

std::error_code error_code;
if (utils::DirExists(snapshot_directory)) {
for (const auto &item : std::filesystem::directory_iterator(snapshot_directory, error_code)) {
if (!item.is_regular_file()) continue;
if (!utils::HasReadAccess(item.path())) {
spdlog::warn(
"Skipping snapshot file '{}' because it is not readable, check file ownership and read permissions!",
item.path());
continue;
}
try {
auto info = ReadSnapshotInfo(item.path());
if (uuid.empty() || info.uuid == uuid) {
snapshot_files.emplace_back(item.path(), std::move(info.uuid), info.start_timestamp);
} else {
spdlog::warn("Skipping snapshot file '{}' because UUID does not match!", item.path());
}
} catch (const RecoveryFailure &) {
continue;
for (const auto &item : std::filesystem::directory_iterator(snapshot_directory, error_code)) {
if (!ValidateDurabilityFile(item)) continue;

try {
auto info = ReadSnapshotInfo(item.path());
if (uuid.empty() || info.uuid == uuid) {
snapshot_files.emplace_back(item.path(), std::move(info.uuid), info.start_timestamp);
} else {
spdlog::warn("Skipping snapshot file '{}' because UUIDs does not match!", item.path());
}
} catch (const RecoveryFailure &e) {
spdlog::error("Couldn't read snapshot info in GetSnapshotFiles: {}", e.what());
}
MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message());
}
MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message());

std::sort(snapshot_files.begin(), snapshot_files.end());
std::ranges::sort(snapshot_files);
return snapshot_files;
}

std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem::path &wal_directory,
const std::string_view uuid,
const std::optional<size_t> current_seq_num) {
if (!utils::DirExists(wal_directory)) return std::nullopt;

std::vector<WalDurabilityInfo> wal_files;
std::error_code error_code;
std::vector<WalDurabilityInfo> GetWalFiles(const std::filesystem::path &wal_directory, const std::string_view uuid,
const std::optional<size_t> current_seq_num) {
if (!utils::DirExists(wal_directory)) {
spdlog::error("WAL directory {} doesn't exist", wal_directory);
return {};
}

// There could be multiple "current" WAL files, the "_current" tag just means that the previous session didn't
// finalize. We cannot skip based on name, will be able to skip based on invalid data or sequence number, so the
// actual current wal will be skipped

std::vector<WalDurabilityInfo> wal_files;
std::error_code error_code;

// TODO: (andi) Inefficient to use I/O again, you already read infos.
for (const auto &item : std::filesystem::directory_iterator(wal_directory, error_code)) {
if (!item.is_regular_file()) {
spdlog::trace("Non-regular file {} found in the wal directory. Skipping it.", item.path());
continue;
}
if (!ValidateDurabilityFile(item)) continue;

try {
auto info = ReadWalInfo(item.path());
spdlog::trace(
Expand All @@ -179,11 +193,10 @@ std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem:
spdlog::warn("Failed to read WAL file {}. Error: {}", item.path(), e.what());
}
}
MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message());

// Sort based on the sequence number, not the file name.
std::sort(wal_files.begin(), wal_files.end());
return std::move(wal_files);
MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message());
std::ranges::sort(wal_files);
return wal_files;
}

// Function used to recover all discovered indices and constraints. The
Expand Down Expand Up @@ -570,11 +583,7 @@ std::optional<RecoveryInfo> Recovery::RecoverData(
repl_storage_state.epoch_.id());
}

if (const auto maybe_wal_files = GetWalFiles(wal_directory_, std::string{uuid});
maybe_wal_files && !maybe_wal_files->empty()) {
// Array of all discovered WAL files, ordered by sequence number.
const auto &wal_files = *maybe_wal_files;

if (auto const wal_files = GetWalFiles(wal_directory_, std::string{uuid}); !wal_files.empty()) {
spdlog::info("Checking WAL files.");
r::for_each(wal_files,
[](auto &&wal_file) { spdlog::trace("Wal file: {}. Seq num: {}.", wal_file.path, wal_file.seq_num); });
Expand Down
7 changes: 4 additions & 3 deletions src/storage/v2/durability/durability.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ struct WalDurabilityInfo {
/// with seq_num < current_seq_num.
/// @return List of WAL files. Each WAL file is defined with its sequence
/// number, from timestamp, to timestamp and path.
std::optional<std::vector<WalDurabilityInfo>> GetWalFiles(const std::filesystem::path &wal_directory,
std::string_view uuid = "",
std::optional<size_t> current_seq_num = {});
std::vector<WalDurabilityInfo> GetWalFiles(const std::filesystem::path &wal_directory, std::string_view uuid = "",
std::optional<size_t> current_seq_num = {});

bool ValidateDurabilityFile(std::filesystem::directory_entry const &dir_entry);

// Helper function used to recover all discovered indices. The
// indices must be recovered after the data recovery is done
Expand Down
Loading
Loading