Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-static bounce buffer option #454

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cpp/doxygen/main_page.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,28 @@ Set the environment variable `KVIKIO_COMPAT_MODE` to enable/disable compatibilit
- when running in Windows Subsystem for Linux (WSL).
- when `/run/udev` isn't readable, which typically happens when running inside a docker image not launched with `--volume /run/udev:/run/udev:ro`.

This setting can also be controlled by `defaults::compat_mode()` and `defaults::compat_mode_reset()`.


#### Thread Pool (KVIKIO_NTHREADS)
KvikIO can use multiple threads for IO automatically. Set the environment variable `KVIKIO_NTHREADS` to the number of threads in the thread pool. If not set, the default value is 1.

This setting can also be controlled by `defaults::thread_pool_nthreads()` and `defaults::thread_pool_nthreads_reset()`.

#### Task Size (KVIKIO_TASK_SIZE)
KvikIO splits parallel IO operations into multiple tasks. Set the environment variable `KVIKIO_TASK_SIZE` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB).

This setting can also be controlled by `defaults::task_size()` and `defaults::task_size_reset()`.

#### GDS Threshold (KVIKIO_GDS_THRESHOLD)
In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).
To improve performance of small IO requests, `.pread()` and `.pwrite()` implement a shortcut that circumvents the threadpool and uses the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).

This setting can also be controlled by `defaults::gds_threshold()` and `defaults::gds_threshold_reset()`.

#### Size of the Bounce Buffer (KVIKIO_GDS_THRESHOLD)
KvikIO might have to use an intermediate host buffer when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of this "bounce" buffer. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by `defaults::bounce_buffer_size()` and `defaults::bounce_buffer_size_reset()`.


## Example
Expand Down
70 changes: 49 additions & 21 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,21 @@ class AllocRetain {
// The size of each allocation in `_free_allocs`
std::size_t _size{defaults::bounce_buffer_size()};

public:
/**
* @brief An host memory allocation
*/
class Alloc {
private:
AllocRetain* _manager;
void* _alloc;
const std::size_t _size;
std::size_t const _size;

public:
Alloc(AllocRetain* manager, void* alloc, std::size_t size)
: _manager(manager), _alloc{alloc}, _size{size}
{
}
Alloc(const Alloc&) = delete;
Alloc(Alloc const&) = delete;
Alloc& operator=(Alloc const&) = delete;
Alloc(Alloc&& o) = delete;
Alloc& operator=(Alloc&& o) = delete;
Expand All @@ -61,29 +60,49 @@ class AllocRetain {
};

AllocRetain() = default;
~AllocRetain() noexcept
{
try {
clear();
} catch (const CUfileException& e) {
std::cerr << "~AllocRetain(): " << e.what() << std::endl;
}
}

void clear()
// Notice, we do not clear the allocations at destruction thus the allocations leaks
// at exit. We do this because `AllocRetain::instance()` stores the allocations in a
// static stack that are destructed below main, which is not allowed in CUDA:
// <https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization>
~AllocRetain() noexcept = default;

/**
* @brief Free all retained allocations
*
* NB: The `_mutex` must be taken prior to calling this function.
*
* @return The number of bytes cleared
*/
std::size_t _clear()
{
std::size_t ret = _free_allocs.size() * _size;
while (!_free_allocs.empty()) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top()));
_free_allocs.pop();
}
return ret;
}

[[nodiscard]] Alloc get()
/**
* @brief Ensure the sizes of the retained allocations match `defaults::bounce_buffer_size()`
*
* NB: `_mutex` must be taken prior to calling this function.
*/
void _ensure_alloc_size()
{
const std::lock_guard lock(_mutex);
if (_size != defaults::bounce_buffer_size()) {
clear(); // the desired allocation size has changed.
auto const bounce_buffer_size = defaults::bounce_buffer_size();
if (_size != bounce_buffer_size) {
_clear();
_size = bounce_buffer_size;
}
}

public:
[[nodiscard]] Alloc get()
{
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// Check if we have an allocation available
if (!_free_allocs.empty()) {
Expand All @@ -101,10 +120,8 @@ class AllocRetain {

void put(void* alloc, std::size_t size)
{
const std::lock_guard lock(_mutex);
if (_size != defaults::bounce_buffer_size()) {
clear(); // the desired allocation size has changed.
}
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// If the size of `alloc` matches the sizes of the retained allocations,
// it is added to the set of free allocation otherwise it is freed.
Expand All @@ -115,13 +132,24 @@ class AllocRetain {
}
}

/**
* @brief Free all retained allocations
*
* @return The number of bytes cleared
*/
std::size_t clear()
{
std::lock_guard const lock(_mutex);
return _clear();
}

static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}

AllocRetain(const AllocRetain&) = delete;
AllocRetain(AllocRetain const&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
AllocRetain(AllocRetain&& o) = delete;
AllocRetain& operator=(AllocRetain&& o) = delete;
Expand Down
12 changes: 12 additions & 0 deletions docs/source/runtime_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,29 @@ Set the environment variable ``KVIKIO_COMPAT_MODE`` to enable/disable compatibil
* when running in Windows Subsystem for Linux (WSL).
* when ``/run/udev`` isn't readable, which typically happens when running inside a docker image not launched with ``--volume /run/udev:/run/udev:ro``.

This setting can also be controlled by :py:func:`kvikio.defaults.compat_mode`, :py:func:`kvikio.defaults.compat_mode_reset`, and :py:func:`kvikio.defaults.set_compat_mode`.


Thread Pool ``KVIKIO_NTHREADS``
-------------------------------
KvikIO can use multiple threads for IO automatically. Set the environment variable ``KVIKIO_NTHREADS`` to the number of threads in the thread pool. If not set, the default value is 1.

This setting can also be controlled by :py:func:`kvikio.defaults.get_num_threads`, :py:func:`kvikio.defaults.num_threads_reset`, and :py:func:`kvikio.defaults.set_num_threads`.

Task Size ``KVIKIO_TASK_SIZE``
------------------------------
KvikIO splits parallel IO operations into multiple tasks. Set the environment variable ``KVIKIO_TASK_SIZE`` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.task_size`, :py:func:`kvikio.defaults.task_size_reset`, and :py:func:`kvikio.defaults.set_task_size`.

GDS Threshold ``KVIKIO_GDS_THRESHOLD``
--------------------------------------
In order to improve performance of small IO, ``.pread()`` and ``.pwrite()`` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable ``KVIKIO_GDS_THRESHOLD`` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.gds_threshold`, :py:func:`kvikio.defaults.gds_threshold_reset`, and :py:func:`kvikio.defaults.set_gds_threshold`.

Size of the Bounce Buffer ``KVIKIO_BOUNCE_BUFFER_SIZE``
-------------------------------------------------------
KvikIO might have to use an intermediate host buffer when copying between file and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to size (in bytes) of this "bounce" buffer. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_size`, :py:func:`kvikio.defaults.bounce_buffer_size_reset`, and :py:func:`kvikio.defaults.set_bounce_buffer_size`.
11 changes: 1 addition & 10 deletions python/kvikio/kvikio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

from kvikio._lib import buffer, driver_properties # type: ignore
from kvikio._lib import driver_properties # type: ignore
from kvikio._version import __git_commit__, __version__
from kvikio.cufile import CuFile


def memory_register(buf) -> None:
return buffer.memory_register(buf)


def memory_deregister(buf) -> None:
buffer.memory_deregister(buf)


# TODO: Wrap nicely, maybe as a dataclass?
DriverProperties = driver_properties.DriverProperties

Expand Down
10 changes: 9 additions & 1 deletion python/kvikio/kvikio/_lib/buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kvikio._lib.arr cimport Array


cdef extern from "<kvikio/buffer.hpp>" namespace "kvikio" nogil:
cdef extern from "<kvikio/buffer.hpp>" nogil:
void cpp_memory_register "kvikio::memory_register"(const void* devPtr) except +
void cpp_memory_deregister "kvikio::memory_deregister"(const void* devPtr) except +

Expand All @@ -25,3 +25,11 @@ def memory_deregister(buf) -> None:
buf = Array(buf)
cdef Array arr = buf
cpp_memory_deregister(<void*>arr.ptr)


cdef extern from "<kvikio/bounce_buffer.hpp>" nogil:
size_t cpp_alloc_retain_clear "kvikio::AllocRetain::instance().clear"() except +


def bounce_buffer_clear() -> int:
return cpp_alloc_retain_clear()
17 changes: 9 additions & 8 deletions python/kvikio/kvikio/benchmarks/single_node_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dask.utils import format_bytes, parse_bytes

import kvikio
import kvikio.buffer
import kvikio.defaults
from kvikio.benchmarks.utils import parse_directory, pprint_sys_info

Expand All @@ -38,7 +39,7 @@ def run_cufile(args):
file_path = args.dir / "kvikio-single-file"
data = create_data(args.nbytes)
if args.pre_register_buffer:
kvikio.memory_register(data)
kvikio.buffer.memory_register(data)

# Write
f = kvikio.CuFile(file_path, flags="w")
Expand All @@ -57,7 +58,7 @@ def run_cufile(args):
assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}"

if args.pre_register_buffer:
kvikio.memory_deregister(data)
kvikio.buffer.memory_deregister(data)

return read_time, write_time

Expand All @@ -73,7 +74,7 @@ def run_cufile_multiple_files_multiple_arrays(args):
arrays = [create_data(chunksize) for _ in range(args.nthreads)]
if args.pre_register_buffer:
for array in arrays:
kvikio.memory_register(array)
kvikio.buffer.memory_register(array)

# Write
files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)]
Expand All @@ -95,7 +96,7 @@ def run_cufile_multiple_files_multiple_arrays(args):

if args.pre_register_buffer:
for array in arrays:
kvikio.memory_deregister(array)
kvikio.buffer.memory_deregister(array)

return read_time, write_time

Expand All @@ -108,7 +109,7 @@ def run_cufile_multiple_files(args):
file_path = str(args.dir / "cufile-p-%03d")
data = create_data(args.nbytes)
if args.pre_register_buffer:
kvikio.memory_register(data)
kvikio.buffer.memory_register(data)

# Write
files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)]
Expand All @@ -133,7 +134,7 @@ def run_cufile_multiple_files(args):
assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}"

if args.pre_register_buffer:
kvikio.memory_deregister(data)
kvikio.buffer.memory_deregister(data)

return read_time, write_time

Expand All @@ -149,7 +150,7 @@ def run_cufile_multiple_arrays(args):
arrays = [create_data(chunksize) for _ in range(args.nthreads)]
if args.pre_register_buffer:
for array in arrays:
kvikio.memory_register(array)
kvikio.buffer.memory_register(array)

# Write
f = kvikio.CuFile(file_path, flags="w")
Expand All @@ -174,7 +175,7 @@ def run_cufile_multiple_arrays(args):

if args.pre_register_buffer:
for array in arrays:
kvikio.memory_deregister(array)
kvikio.buffer.memory_deregister(array)

return read_time, write_time

Expand Down
35 changes: 35 additions & 0 deletions python/kvikio/kvikio/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

from kvikio._lib import buffer # type: ignore


def memory_register(buf) -> None:
"""Register a device memory allocation with cuFile.

Warning
-------
This API is intended for usecases where the memory is used as a streaming
buffer that is reused across multiple cuFile IO operations.

Parameters
----------
buf: buffer-like or array-like
Device buffer to register .
"""
return buffer.memory_register(buf)


def memory_deregister(buf) -> None:
"""Deregister an already registered device memory from cuFile.

Parameters
----------
buf: buffer-like or array-like
Device buffer to deregister .
"""
buffer.memory_deregister(buf)


def bounce_buffer_clear() -> int:
madsbk marked this conversation as resolved.
Show resolved Hide resolved
return buffer.bounce_buffer_clear()
16 changes: 8 additions & 8 deletions python/kvikio/kvikio/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def compat_mode() -> bool:
- when `/run/udev` isn't readable, which typically happens when running inside
a docker image not launched with `--volume /run/udev:/run/udev:ro`
Return
------
Returns
-------
bool
Whether KvikIO is running in compatibility mode or not.
"""
Expand Down Expand Up @@ -68,8 +68,8 @@ def get_num_threads() -> int:
Set the default value using `num_threads_reset()` or by setting the
`KVIKIO_NTHREADS` environment variable. If not set, the default value is 1.
Return
------
Returns
-------
nthreads: int
The number of threads in the current thread pool.
"""
Expand Down Expand Up @@ -119,8 +119,8 @@ def task_size() -> int:
the `KVIKIO_TASK_SIZE` environment variable. If not set,
the default value is 4 MiB.
Return
------
Returns
-------
nbytes: int
The default task size in bytes.
"""
Expand Down Expand Up @@ -166,8 +166,8 @@ def gds_threshold() -> int:
`KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default
value is 1 MiB.
Return
------
Returns
-------
nbytes : int
The default GDS threshold size in bytes.
"""
Expand Down
Loading