Skip to content

Commit

Permalink
Non-static bounce buffer option (#454)
Browse files Browse the repository at this point in the history
closes #451 
closes #452

Also some refactor/clean up

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #454
  • Loading branch information
madsbk authored Sep 2, 2024
1 parent 94edeee commit 8676e8d
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 49 deletions.
16 changes: 15 additions & 1 deletion cpp/doxygen/main_page.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,28 @@ Set the environment variable `KVIKIO_COMPAT_MODE` to enable/disable compatibilit
- when running in Windows Subsystem for Linux (WSL).
- when `/run/udev` isn't readable, which typically happens when running inside a docker image not launched with `--volume /run/udev:/run/udev:ro`.

This setting can also be controlled by `defaults::compat_mode()` and `defaults::compat_mode_reset()`.


#### Thread Pool (KVIKIO_NTHREADS)
KvikIO can use multiple threads for IO automatically. Set the environment variable `KVIKIO_NTHREADS` to the number of threads in the thread pool. If not set, the default value is 1.

This setting can also be controlled by `defaults::thread_pool_nthreads()` and `defaults::thread_pool_nthreads_reset()`.

#### Task Size (KVIKIO_TASK_SIZE)
KvikIO splits parallel IO operations into multiple tasks. Set the environment variable `KVIKIO_TASK_SIZE` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB).

This setting can also be controlled by `defaults::task_size()` and `defaults::task_size_reset()`.

#### GDS Threshold (KVIKIO_GDS_THRESHOLD)
In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).
To improve performance of small IO requests, `.pread()` and `.pwrite()` implement a shortcut that circumvents the threadpool and uses the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).

This setting can also be controlled by `defaults::gds_threshold()` and `defaults::gds_threshold_reset()`.

#### Size of the Bounce Buffer (KVIKIO_GDS_THRESHOLD)
KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by `defaults::bounce_buffer_size()` and `defaults::bounce_buffer_size_reset()`.


## Example
Expand Down
70 changes: 49 additions & 21 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,21 @@ class AllocRetain {
// The size of each allocation in `_free_allocs`
std::size_t _size{defaults::bounce_buffer_size()};

public:
/**
* @brief An host memory allocation
*/
class Alloc {
private:
AllocRetain* _manager;
void* _alloc;
const std::size_t _size;
std::size_t const _size;

public:
Alloc(AllocRetain* manager, void* alloc, std::size_t size)
: _manager(manager), _alloc{alloc}, _size{size}
{
}
Alloc(const Alloc&) = delete;
Alloc(Alloc const&) = delete;
Alloc& operator=(Alloc const&) = delete;
Alloc(Alloc&& o) = delete;
Alloc& operator=(Alloc&& o) = delete;
Expand All @@ -61,29 +60,49 @@ class AllocRetain {
};

AllocRetain() = default;
~AllocRetain() noexcept
{
try {
clear();
} catch (const CUfileException& e) {
std::cerr << "~AllocRetain(): " << e.what() << std::endl;
}
}

void clear()
// Notice, we do not clear the allocations at destruction thus the allocations leaks
// at exit. We do this because `AllocRetain::instance()` stores the allocations in a
// static stack that are destructed below main, which is not allowed in CUDA:
// <https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization>
~AllocRetain() noexcept = default;

/**
* @brief Free all retained allocations
*
* NB: The `_mutex` must be taken prior to calling this function.
*
* @return The number of bytes cleared
*/
std::size_t _clear()
{
std::size_t ret = _free_allocs.size() * _size;
while (!_free_allocs.empty()) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top()));
_free_allocs.pop();
}
return ret;
}

[[nodiscard]] Alloc get()
/**
* @brief Ensure the sizes of the retained allocations match `defaults::bounce_buffer_size()`
*
* NB: `_mutex` must be taken prior to calling this function.
*/
void _ensure_alloc_size()
{
const std::lock_guard lock(_mutex);
if (_size != defaults::bounce_buffer_size()) {
clear(); // the desired allocation size has changed.
auto const bounce_buffer_size = defaults::bounce_buffer_size();
if (_size != bounce_buffer_size) {
_clear();
_size = bounce_buffer_size;
}
}

public:
[[nodiscard]] Alloc get()
{
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// Check if we have an allocation available
if (!_free_allocs.empty()) {
Expand All @@ -101,10 +120,8 @@ class AllocRetain {

void put(void* alloc, std::size_t size)
{
const std::lock_guard lock(_mutex);
if (_size != defaults::bounce_buffer_size()) {
clear(); // the desired allocation size has changed.
}
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// If the size of `alloc` matches the sizes of the retained allocations,
// it is added to the set of free allocation otherwise it is freed.
Expand All @@ -115,13 +132,24 @@ class AllocRetain {
}
}

/**
* @brief Free all retained allocations
*
* @return The number of bytes cleared
*/
std::size_t clear()
{
std::lock_guard const lock(_mutex);
return _clear();
}

static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}

AllocRetain(const AllocRetain&) = delete;
AllocRetain(AllocRetain const&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
AllocRetain(AllocRetain&& o) = delete;
AllocRetain& operator=(AllocRetain&& o) = delete;
Expand Down
12 changes: 12 additions & 0 deletions docs/source/runtime_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,29 @@ Set the environment variable ``KVIKIO_COMPAT_MODE`` to enable/disable compatibil
* when running in Windows Subsystem for Linux (WSL).
* when ``/run/udev`` isn't readable, which typically happens when running inside a docker image not launched with ``--volume /run/udev:/run/udev:ro``.

This setting can also be controlled by :py:func:`kvikio.defaults.compat_mode`, :py:func:`kvikio.defaults.compat_mode_reset`, and :py:func:`kvikio.defaults.set_compat_mode`.


Thread Pool ``KVIKIO_NTHREADS``
-------------------------------
KvikIO can use multiple threads for IO automatically. Set the environment variable ``KVIKIO_NTHREADS`` to the number of threads in the thread pool. If not set, the default value is 1.

This setting can also be controlled by :py:func:`kvikio.defaults.get_num_threads`, :py:func:`kvikio.defaults.num_threads_reset`, and :py:func:`kvikio.defaults.set_num_threads`.

Task Size ``KVIKIO_TASK_SIZE``
------------------------------
KvikIO splits parallel IO operations into multiple tasks. Set the environment variable ``KVIKIO_TASK_SIZE`` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.task_size`, :py:func:`kvikio.defaults.task_size_reset`, and :py:func:`kvikio.defaults.set_task_size`.

GDS Threshold ``KVIKIO_GDS_THRESHOLD``
--------------------------------------
In order to improve performance of small IO, ``.pread()`` and ``.pwrite()`` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable ``KVIKIO_GDS_THRESHOLD`` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.gds_threshold`, :py:func:`kvikio.defaults.gds_threshold_reset`, and :py:func:`kvikio.defaults.set_gds_threshold`.

Size of the Bounce Buffer ``KVIKIO_BOUNCE_BUFFER_SIZE``
-------------------------------------------------------
KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_size`, :py:func:`kvikio.defaults.bounce_buffer_size_reset`, and :py:func:`kvikio.defaults.set_bounce_buffer_size`.
11 changes: 1 addition & 10 deletions python/kvikio/kvikio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

from kvikio._lib import buffer, driver_properties # type: ignore
from kvikio._lib import driver_properties # type: ignore
from kvikio._version import __git_commit__, __version__
from kvikio.cufile import CuFile


def memory_register(buf) -> None:
return buffer.memory_register(buf)


def memory_deregister(buf) -> None:
buffer.memory_deregister(buf)


# TODO: Wrap nicely, maybe as a dataclass?
DriverProperties = driver_properties.DriverProperties

Expand Down
10 changes: 9 additions & 1 deletion python/kvikio/kvikio/_lib/buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kvikio._lib.arr cimport Array


cdef extern from "<kvikio/buffer.hpp>" namespace "kvikio" nogil:
cdef extern from "<kvikio/buffer.hpp>" nogil:
void cpp_memory_register "kvikio::memory_register"(const void* devPtr) except +
void cpp_memory_deregister "kvikio::memory_deregister"(const void* devPtr) except +

Expand All @@ -25,3 +25,11 @@ def memory_deregister(buf) -> None:
buf = Array(buf)
cdef Array arr = buf
cpp_memory_deregister(<void*>arr.ptr)


cdef extern from "<kvikio/bounce_buffer.hpp>" nogil:
size_t cpp_alloc_retain_clear "kvikio::AllocRetain::instance().clear"() except +


def bounce_buffer_free() -> int:
return cpp_alloc_retain_clear()
17 changes: 9 additions & 8 deletions python/kvikio/kvikio/benchmarks/single_node_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dask.utils import format_bytes, parse_bytes

import kvikio
import kvikio.buffer
import kvikio.defaults
from kvikio.benchmarks.utils import parse_directory, pprint_sys_info

Expand All @@ -38,7 +39,7 @@ def run_cufile(args):
file_path = args.dir / "kvikio-single-file"
data = create_data(args.nbytes)
if args.pre_register_buffer:
kvikio.memory_register(data)
kvikio.buffer.memory_register(data)

# Write
f = kvikio.CuFile(file_path, flags="w")
Expand All @@ -57,7 +58,7 @@ def run_cufile(args):
assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}"

if args.pre_register_buffer:
kvikio.memory_deregister(data)
kvikio.buffer.memory_deregister(data)

return read_time, write_time

Expand All @@ -73,7 +74,7 @@ def run_cufile_multiple_files_multiple_arrays(args):
arrays = [create_data(chunksize) for _ in range(args.nthreads)]
if args.pre_register_buffer:
for array in arrays:
kvikio.memory_register(array)
kvikio.buffer.memory_register(array)

# Write
files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)]
Expand All @@ -95,7 +96,7 @@ def run_cufile_multiple_files_multiple_arrays(args):

if args.pre_register_buffer:
for array in arrays:
kvikio.memory_deregister(array)
kvikio.buffer.memory_deregister(array)

return read_time, write_time

Expand All @@ -108,7 +109,7 @@ def run_cufile_multiple_files(args):
file_path = str(args.dir / "cufile-p-%03d")
data = create_data(args.nbytes)
if args.pre_register_buffer:
kvikio.memory_register(data)
kvikio.buffer.memory_register(data)

# Write
files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)]
Expand All @@ -133,7 +134,7 @@ def run_cufile_multiple_files(args):
assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}"

if args.pre_register_buffer:
kvikio.memory_deregister(data)
kvikio.buffer.memory_deregister(data)

return read_time, write_time

Expand All @@ -149,7 +150,7 @@ def run_cufile_multiple_arrays(args):
arrays = [create_data(chunksize) for _ in range(args.nthreads)]
if args.pre_register_buffer:
for array in arrays:
kvikio.memory_register(array)
kvikio.buffer.memory_register(array)

# Write
f = kvikio.CuFile(file_path, flags="w")
Expand All @@ -174,7 +175,7 @@ def run_cufile_multiple_arrays(args):

if args.pre_register_buffer:
for array in arrays:
kvikio.memory_deregister(array)
kvikio.buffer.memory_deregister(array)

return read_time, write_time

Expand Down
41 changes: 41 additions & 0 deletions python/kvikio/kvikio/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

from kvikio._lib import buffer # type: ignore


def memory_register(buf) -> None:
"""Register a device memory allocation with cuFile.
Warning
-------
This API is intended for usecases where the memory is used as a streaming
buffer that is reused across multiple cuFile IO operations.
Parameters
----------
buf: buffer-like or array-like
Device buffer to register .
"""
return buffer.memory_register(buf)


def memory_deregister(buf) -> None:
"""Deregister an already registered device memory from cuFile.
Parameters
----------
buf: buffer-like or array-like
Device buffer to deregister .
"""
buffer.memory_deregister(buf)


def bounce_buffer_free() -> int:
"""Free the host allocations used as bounce buffers.
Returns
-------
Number of bytes freed.
"""
return buffer.bounce_buffer_free()
16 changes: 8 additions & 8 deletions python/kvikio/kvikio/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def compat_mode() -> bool:
- when `/run/udev` isn't readable, which typically happens when running inside
a docker image not launched with `--volume /run/udev:/run/udev:ro`
Return
------
Returns
-------
bool
Whether KvikIO is running in compatibility mode or not.
"""
Expand Down Expand Up @@ -68,8 +68,8 @@ def get_num_threads() -> int:
Set the default value using `num_threads_reset()` or by setting the
`KVIKIO_NTHREADS` environment variable. If not set, the default value is 1.
Return
------
Returns
-------
nthreads: int
The number of threads in the current thread pool.
"""
Expand Down Expand Up @@ -119,8 +119,8 @@ def task_size() -> int:
the `KVIKIO_TASK_SIZE` environment variable. If not set,
the default value is 4 MiB.
Return
------
Returns
-------
nbytes: int
The default task size in bytes.
"""
Expand Down Expand Up @@ -166,8 +166,8 @@ def gds_threshold() -> int:
`KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default
value is 1 MiB.
Return
------
Returns
-------
nbytes : int
The default GDS threshold size in bytes.
"""
Expand Down
Loading

0 comments on commit 8676e8d

Please sign in to comment.