Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify batch and stream API check #271

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ else()
set(cuFile_BATCH_API_FOUND TRUE)
endif()
message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}")
string(FIND "${CUFILE_H_STR}" "cuFileGetVersion" cuFileGetVersion_location)
if(cuFileGetVersion_location EQUAL "-1")
string(FIND "${CUFILE_H_STR}" "cuFileReadAsync" cuFileReadAsync_location)
if(cuFileReadAsync_location EQUAL "-1")
set(cuFile_STREAM_API_FOUND FALSE)
else()
set(cuFile_STREAM_API_FOUND TRUE)
Expand All @@ -94,7 +94,6 @@ target_link_libraries(kvikio INTERFACE CUDA::toolkit)
if(cuFile_FOUND)
target_link_libraries(kvikio INTERFACE cufile::cuFile_interface)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_FOUND)

if(cuFile_BATCH_API_FOUND)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_BATCH_API_FOUND)
endif()
Expand Down
52 changes: 27 additions & 25 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ int main()
<< " threads): " << read << endl;
}

if (kvikio::is_batch_available() && !kvikio::defaults::compat_mode()) {
if (kvikio::is_batch_and_stream_available() && !kvikio::defaults::compat_mode()) {
// Here we use the batch API to read "/tmp/test-file" into `b_dev` by
// submitting 4 batch operations.
constexpr int num_ops_in_batch = 4;
Expand Down Expand Up @@ -195,39 +195,41 @@ int main()
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
}
}

cout << "stream : " << kvikio::is_stream_available() << endl;
if (kvikio::is_stream_available()) {
{
cout << "Performing stream I/O using file handle" << endl;
off_t f_off = 0, d_off = 0;
ssize_t bytes_done;
CUstream stream;
cout << "Performing async I/O using file handle" << endl;
off_t f_off{0};
off_t d_off{0};
// Notice, we have to allocate the `bytes_done_p` argument on the heap and set it to 0.
ssize_t* bytes_done_p{};
check(cudaHostAlloc((void**)&bytes_done_p, SIZE, cudaHostAllocDefault) == cudaSuccess);
*bytes_done_p = 0;

// Let's create a new stream and submit a sync write
CUstream stream{};
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false);
check(cudaMemcpy(a_dev, a, SIZE, cudaMemcpyHostToDevice) == cudaSuccess);

/*
* For stream based I/Os, buffer registration is not mandatory. However,
* it gives a better performance.
*/
check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);

kvikio::buffer_register(a_dev, SIZE);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
// After synchronizing `stream`, we can read the number of bytes written
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Write : " << bytes_done << endl;
kvikio::buffer_deregister(a_dev);
// Note, `*bytes_done_p` might be negative, which indicate an IO error thus we
// use `CUFILE_CHECK_STREAM_IO` to check for errors.
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async write : " << *bytes_done_p << endl;

/* Read */
bytes_done = 0;
kvikio::buffer_register(c_dev, SIZE);
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
*bytes_done_p = 0;
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Read : " << bytes_done << endl;
kvikio::buffer_deregister(c_dev);
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async read : " << *bytes_done_p << endl;
check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess);
}
} else {
cout << "The batch and stream API isn't available, requires CUDA 12.2+" << endl;
}
}
28 changes: 26 additions & 2 deletions cpp/include/kvikio/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ struct CUfileException : public std::runtime_error {
std::string(err_str) + ")"}; \
} \
} while (0)
#define CUDA_DRIVER_TRY_1(_call) CUDA_DRIVER_TRY_2(_call, CUfileException)
#define CUDA_DRIVER_TRY_1(_call) CUDA_DRIVER_TRY_2(_call, kvikio::CUfileException)
#endif

#ifdef KVIKIO_CUFILE_FOUND
Expand All @@ -75,8 +75,32 @@ struct CUfileException : public std::runtime_error {
cufileop_status_error(error.err)}; \
} \
} while (0)
#define CUFILE_TRY_1(_call) CUFILE_TRY_2(_call, CUfileException)
#define CUFILE_TRY_1(_call) CUFILE_TRY_2(_call, kvikio::CUfileException)
#endif
#endif

#ifndef CUFILE_CHECK_STREAM_IO
#define CUFILE_CHECK_STREAM_IO(...) \
GET_CUFILE_CHECK_STREAM_IO_MACRO( \
__VA_ARGS__, CUFILE_CHECK_STREAM_IO_2, CUFILE_CHECK_STREAM_IO_1) \
(__VA_ARGS__)
#define GET_CUFILE_CHECK_STREAM_IO_MACRO(_1, _2, NAME, ...) NAME
#ifdef KVIKIO_CUFILE_FOUND
#define CUFILE_CHECK_STREAM_IO_2(_nbytes_done, _exception_type) \
do { \
int const _nbytes = (*_nbytes_done); \
if (_nbytes < 0) { \
throw(_exception_type){std::string{"cuFile error at: "} + __FILE__ + ":" + \
KVIKIO_STRINGIFY(__LINE__) + ": " + std::to_string(_nbytes)}; \
} \
} while (0)
#else
// if cufile isn't available, we don't do anything in the body
#define CUFILE_CHECK_STREAM_IO_2(_nbytes_done, _exception_type) \
do { \
} while (0)
#endif
#define CUFILE_CHECK_STREAM_IO_1(_call) CUFILE_CHECK_STREAM_IO_2(_call, kvikio::CUfileException)
#endif

} // namespace kvikio
86 changes: 49 additions & 37 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,28 +497,35 @@ class FileHandle {
}

/**
* @brief Reads specified bytes from the file into the device memory.
* @brief Reads specified bytes from the file into the device memory asynchronously.
*
* This API reads size bytes asynchronously from the file into device memory writing
* to a specified offset using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned read.
* This is an asynchronous call and will be executed in sequence for the specified stream.
* This is an asynchronous version of `.read()`, which will be executed in sequence
* for the specified stream.
*
* @note For the `devPtr_offset`, if data will be read starting exactly from the
* `devPtr_base` that is registered with `buffer_register`, `devPtr_offset` should
* be set to 0. To read starting from an offset in the registered buffer range,
* the relative offset should be specified in the `devPtr_offset`, and the
* `devPtr_base` must remain set to the base address that was used in the
* `buffer_register` call.
* The arguments have the same meaning as in `.read()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into.
* This parameter should be used only with registered buffers.
* @param bytes_read number of bytes that were successfully read.
* @param stream associated stream for this I/O.
* @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O
* submission, then you must set it to the maximum possible I/O size for that stream I/O. Later
* the actual size can be set prior to the stream I/O execution.
* @param file_offset Pointer to offset in the file from which to read. Unless otherwise set using
* cuFileStreamRegister API, this value will not be evaluated until execution time.
* @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to
* write. Unless otherwise set using cuFileStreamRegister API, this value will not be evaluated
* until execution time.
* @param bytes_read Pointer to the bytes read from file. This pointer should be a non-NULL value
* and *bytes_read set to 0. The bytes_read memory should be allocated with cuMemHostAlloc/malloc/
* mmap or registered with cuMemHostRegister.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you have just copied the cufile docs here, but this sentence makes no sense to me. AIUI, just plain malloc and mmap are completely unknown to the driver. So does this mean use cuMemHostAlloc or cuMemAlloc or cuMemMap? Or am I fully even more confused than I thought?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it means that it has to be allocated on the heap (not stack)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

* After successful execution of the operation in the stream, the value *bytes_read will contain
* either:
* - The number of bytes successfully read.
* - -1 on IO errors.
* - All other errors return a negative integer value of the CUfileOpError enum value.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
*/
inline void read_async(void* devPtr_base,
std::size_t* size,
Expand All @@ -532,34 +539,39 @@ class FileHandle {
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream));
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
#endif
}

/**
* @brief Writes specified bytes from the device memory into the file.
* @brief Writes specified bytes from the device memory into the file asynchronously.
*
* This API writes asynchronously the data from the GPU memory to the file at a specified offset
* and size bytes by using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned writes.
* This is an asynchronous call and will be executed in sequence for the specified stream.
* This is an asynchronous version of `.write()`, which will be executed in sequence
* for the specified stream.
*
* @note GDS functionality modified the standard file system metadata in SysMem.
* However, GDS functionality does not take any special responsibility for writing
* that metadata back to permanent storage. The data is not guaranteed to be present
* after a system crash unless the application uses an explicit `fsync(2)` call. If the
* file is opened with an `O_SYNC` flag, the metadata will be written to the disk before
* the call is complete.
* Refer to the note in read for more information about `devPtr_offset`.
* The arguments have the same meaning as in `.write()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write at.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from.
* This parameter should be used only with registered buffers.
* @param bytes_written number of bytes that were successfully written.
* @param stream associated stream for this I/O.
* @param size Pointer to size in bytes to read. If the exact size is not known at the time of I/O
* submission, then you must set it to the maximum possible I/O size for that stream I/O. Later
* the actual size can be set prior to the stream I/O execution.
* @param file_offset Pointer to offset in the file from which to read. Unless otherwise set
* using cuFileStreamRegister API, this value will not be evaluated until execution time.
* @param devPtr_offset Pointer to the offset relative to the bufPtr_base pointer from which to
* write.
* @param bytes_written Pointer to the bytes read from file. This pointer should be a non-NULL
* value and *bytes_written set to 0. The bytes_written memory should be allocated with
* cuMemHostAlloc/malloc/mmap or registered with cuMemHostRegister.
* After successful execution of the operation in the stream, the value *bytes_written will
* contain either:
* - The number of bytes successfully read.
* - -1 on IO errors.
* - All other errors return a negative integer value of the CUfileOpError enum value.
* @param stream CUDA stream in which to enqueue the operation. If NULL, make this operation
* synchronous.
*/
inline void write_async(void* devPtr_base,
std::size_t* size,
Expand All @@ -573,7 +585,7 @@ class FileHandle {
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream));
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
#endif
}

Expand Down
48 changes: 9 additions & 39 deletions cpp/include/kvikio/shim/cufile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ class cuFileAPI {
decltype(cuFileBatchIOCancel)* BatchIOCancel{nullptr};
decltype(cuFileBatchIODestroy)* BatchIODestroy{nullptr};
#endif
bool batch_available = false;

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
decltype(cuFileGetVersion)* GetVersion{nullptr};
decltype(cuFileReadAsync)* ReadAsync{nullptr};
decltype(cuFileWriteAsync)* WriteAsync{nullptr};
decltype(cuFileStreamRegister)* StreamRegister{nullptr};
Expand Down Expand Up @@ -98,32 +96,18 @@ class cuFileAPI {
get_symbol(BatchIOGetStatus, lib, KVIKIO_STRINGIFY(cuFileBatchIOGetStatus));
get_symbol(BatchIOCancel, lib, KVIKIO_STRINGIFY(cuFileBatchIOCancel));
get_symbol(BatchIODestroy, lib, KVIKIO_STRINGIFY(cuFileBatchIODestroy));

// HACK: we use the mangled name of the `CUfileOpError` to determine if cuFile's
// batch API is available (v12.0.1+). Notice, the symbols of `cuFileBatchIOSetUp` & co.
// exist all the way back to CUDA v11.5 but calling them is undefined behavior.
// TODO: when CUDA v12.2 is released, use `cuFileReadAsync` to determine the availability
// of both the batch and async API.
try {
void* s{};
get_symbol(s, lib, "_ZTS13CUfileOpError");
batch_available = true;
} catch (const std::runtime_error&) {
}
#endif

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
get_symbol(GetVersion, lib, KVIKIO_STRINGIFY(cuFileGetVersion));
get_symbol(ReadAsync, lib, KVIKIO_STRINGIFY(cuFileReadAsync));
get_symbol(WriteAsync, lib, KVIKIO_STRINGIFY(cuFileWriteAsync));
get_symbol(StreamRegister, lib, KVIKIO_STRINGIFY(cuFileStreamRegister));
get_symbol(StreamDeregister, lib, KVIKIO_STRINGIFY(cuFileStreamDeregister));
try {
void* s{};
get_symbol(s, lib, "cuFileGetVersion");
get_symbol(s, lib, "cuFileReadAsync");
stream_available = true;
} catch (const std::runtime_error&) {
stream_available = false;
}
#endif

Expand Down Expand Up @@ -193,39 +177,25 @@ inline bool is_cufile_available()
}

/**
* @brief Check if cuFile's batch API is available
* @brief Check if cuFile's batch and stream API is available
*
* @return The boolean answer
*/
#ifdef KVIKIO_CUFILE_BATCH_API_FOUND
inline bool is_batch_available()
{
try {
return is_cufile_available() && cuFileAPI::instance().batch_available;
} catch (const std::runtime_error&) {
return false;
}
}
#else
constexpr bool is_batch_available() { return false; }
#endif

/**
* @brief Check if cuFile's stream API is available
* Technically, the batch API is available in CUDA 12.1 but since there is no good
* way to check CUDA version using the driver API, we check for the existing of the
* `cuFileReadAsync` symbol, which is defined in CUDA 12.2+.
*
* @return The boolean answer
*/
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
inline bool is_stream_available()
#if defined(KVIKIO_CUFILE_STREAM_API_FOUND) && defined(KVIKIO_CUFILE_STREAM_API_FOUND)
inline bool is_batch_and_stream_available()
{
try {
return cuFileAPI::instance().stream_available;
return is_cufile_available() && cuFileAPI::instance().stream_available;
} catch (const std::runtime_error&) {
return false;
}
}
#else
constexpr bool is_stream_available() { return false; }
constexpr bool is_batch_and_stream_available() { return false; }
#endif

} // namespace kvikio