Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial changes to support cufile stream I/O. #259

Merged
merged 8 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ else()
set(cuFile_BATCH_API_FOUND TRUE)
endif()
message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}")
string(FIND "${CUFILE_H_STR}" "cuFileGetVersion" cuFileGetVersion_location)
if(cuFileGetVersion_location EQUAL "-1")
set(cuFile_STREAM_API_FOUND FALSE)
else()
set(cuFile_STREAM_API_FOUND TRUE)
endif()
message(STATUS "Found cuFile's Stream API: ${cuFile_STREAM_API_FOUND}")
endif()

# library targets
Expand All @@ -91,6 +98,9 @@ if(cuFile_FOUND)
if(cuFile_BATCH_API_FOUND)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_BATCH_API_FOUND)
endif()
if(cuFile_STREAM_API_FOUND)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_STREAM_API_FOUND)
endif()
endif()
target_link_libraries(kvikio INTERFACE ${CMAKE_DL_LIBS})
target_compile_features(kvikio INTERFACE cxx_std_17)
Expand Down
116 changes: 77 additions & 39 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <kvikio/buffer.hpp>
#include <kvikio/defaults.hpp>
#include <kvikio/driver.hpp>
#include <kvikio/error.hpp>
#include <kvikio/file_handle.hpp>

using namespace std;
Expand All @@ -34,11 +35,13 @@ void check(bool condition)
}
}

constexpr int NELEM = 1000; // Number of elements used throughout the test
constexpr int SIZE = NELEM * sizeof(int); // Size of the memory allocations (in bytes)
constexpr int NELEM = 1024; // Number of elements used throughout the test
constexpr int SIZE = NELEM * sizeof(int); // Size of the memory allocations (in bytes)
constexpr int LARGE_SIZE = 8 * SIZE; // LARGE SIZE to test partial submit (in bytes)

int main()
{
std::size_t io_size = SIZE;
check(cudaSetDevice(0) == cudaSuccess);

cout << "KvikIO defaults: " << endl;
Expand Down Expand Up @@ -152,44 +155,79 @@ int main()
constexpr int batchsize = SIZE / num_ops_in_batch;
kvikio::DriverProperties props;
check(num_ops_in_batch < props.get_max_batch_io_size());

// We open the file as usual.
kvikio::FileHandle f("/tmp/test-file", "r");

// Then we create a batch
auto batch = kvikio::BatchHandle(num_ops_in_batch);

// And submit 4 operations each with its own offset
std::vector<kvikio::BatchOp> ops;
for (int i = 0; i < num_ops_in_batch; ++i) {
ops.push_back(kvikio::BatchOp{.file_handle = f,
.devPtr_base = b_dev,
.file_offset = i * batchsize,
.devPtr_offset = i * batchsize,
.size = batchsize,
.opcode = CUFILE_READ});
}
batch.submit(ops);

// Finally, we wait on all 4 operations to be finished and check the result
auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.size() == num_ops_in_batch);
size_t total_read = 0;
for (auto status : statuses) {
check(status.status == CUFILE_COMPLETE);
check(status.ret == batchsize);
total_read += status.ret;
}
check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess);
for (int i = 0; i < NELEM; ++i) {
check(a[i] == b[i]);
{
// We open the file as usual.
kvikio::FileHandle f("/tmp/test-file", "r");

// Then we create a batch
auto batch = kvikio::BatchHandle(num_ops_in_batch);

// And submit 4 operations each with its own offset
std::vector<kvikio::BatchOp> ops;
for (int i = 0; i < num_ops_in_batch; ++i) {
ops.push_back(kvikio::BatchOp{.file_handle = f,
.devPtr_base = b_dev,
.file_offset = i * batchsize,
.devPtr_offset = i * batchsize,
.size = batchsize,
.opcode = CUFILE_READ});
}
batch.submit(ops);

// Finally, we wait on all 4 operations to be finished and check the result
auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.size() == num_ops_in_batch);
size_t total_read = 0;
for (auto status : statuses) {
check(status.status == CUFILE_COMPLETE);
check(status.ret == batchsize);
total_read += status.ret;
}
check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess);
for (int i = 0; i < NELEM; ++i) {
check(a[i] == b[i]);
}
cout << "Batch read using 4 operations: " << total_read << endl;

batch.submit(ops);
batch.cancel();
statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
}
cout << "Batch read using 4 operations: " << total_read << endl;
}

batch.submit(ops);
batch.cancel();
statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
cout << "stream : " << kvikio::is_stream_available() << endl;
if (kvikio::is_stream_available()) {
{
cout << "Performing stream I/O using file handle" << endl;
off_t f_off = 0, d_off = 0;
ssize_t bytes_done;
CUstream stream;
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false);
check(cudaMemcpy(a_dev, a, SIZE, cudaMemcpyHostToDevice) == cudaSuccess);

/*
* For stream based I/Os, buffer registration is not mandatory. However,
* it gives a better performance.
*/

kvikio::buffer_register(a_dev, SIZE);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Write : " << bytes_done << endl;
kvikio::buffer_deregister(a_dev);

/* Read */
bytes_done = 0;
kvikio::buffer_register(c_dev, SIZE);
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Read : " << bytes_done << endl;
kvikio::buffer_deregister(c_dev);
}
}
}
81 changes: 81 additions & 0 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,87 @@ class FileHandle {
return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
}

/**
* @brief Reads specified bytes from the file into the device memory.
*
* This API reads size bytes asynchronously from the file into device memory writing
* to a specified offset using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned read.
* This is an asynchronous call and will be executed in sequence for the specified stream.
*
* @note For the `devPtr_offset`, if data will be read starting exactly from the
* `devPtr_base` that is registered with `buffer_register`, `devPtr_offset` should
* be set to 0. To read starting from an offset in the registered buffer range,
* the relative offset should be specified in the `devPtr_offset`, and the
* `devPtr_base` must remain set to the base address that was used in the
* `buffer_register` call.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into.
* This parameter should be used only with registered buffers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to distinguish registered buffers in the type system at the moment? That would be preferable over this void * C-like interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not ATM but it would be a cool addition: #266

* @param bytes_read number of bytes that were successfully read.
* @param stream associated stream for this I/O.
*/
inline void read_async(void* devPtr_base,
std::size_t* size,
off_t* file_offset,
off_t* devPtr_offset,
ssize_t* bytes_read,
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream));
madsbk marked this conversation as resolved.
Show resolved Hide resolved
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
#endif
}

/**
* @brief Writes specified bytes from the device memory into the file.
*
* This API writes asynchronously the data from the GPU memory to the file at a specified offset
* and size bytes by using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned writes.
* This is an asynchronous call and will be executed in sequence for the specified stream.
*
* @note GDS functionality modified the standard file system metadata in SysMem.
* However, GDS functionality does not take any special responsibility for writing
* that metadata back to permanent storage. The data is not guaranteed to be present
* after a system crash unless the application uses an explicit `fsync(2)` call. If the
* file is opened with an `O_SYNC` flag, the metadata will be written to the disk before
* the call is complete.
* Refer to the note in read for more information about `devPtr_offset`.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write at.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from.
* This parameter should be used only with registered buffers.
* @param bytes_written number of bytes that were successfully written.
* @param stream associated stream for this I/O.
*/
inline void write_async(void* devPtr_base,
std::size_t* size,
off_t* file_offset,
off_t* devPtr_offset,
ssize_t* bytes_written,
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream));
madsbk marked this conversation as resolved.
Show resolved Hide resolved
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
#endif
}

/**
* @brief Returns `true` if the compatibility mode has been enabled for this file.
*
Expand Down
42 changes: 42 additions & 0 deletions cpp/include/kvikio/shim/cufile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class cuFileAPI {
#endif
bool batch_available = false;

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
decltype(cuFileGetVersion)* GetVersion{nullptr};
decltype(cuFileReadAsync)* ReadAsync{nullptr};
decltype(cuFileWriteAsync)* WriteAsync{nullptr};
decltype(cuFileStreamRegister)* StreamRegister{nullptr};
decltype(cuFileStreamDeregister)* StreamDeregister{nullptr};
#endif
bool stream_available = false;

private:
cuFileAPI()
{
Expand Down Expand Up @@ -103,6 +112,21 @@ class cuFileAPI {
}
#endif

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
get_symbol(GetVersion, lib, KVIKIO_STRINGIFY(cuFileGetVersion));
get_symbol(ReadAsync, lib, KVIKIO_STRINGIFY(cuFileReadAsync));
get_symbol(WriteAsync, lib, KVIKIO_STRINGIFY(cuFileWriteAsync));
get_symbol(StreamRegister, lib, KVIKIO_STRINGIFY(cuFileStreamRegister));
get_symbol(StreamDeregister, lib, KVIKIO_STRINGIFY(cuFileStreamDeregister));
try {
void* s{};
get_symbol(s, lib, "cuFileGetVersion");
stream_available = true;
} catch (const std::runtime_error&) {
stream_available = false;
}
#endif

// cuFile is supposed to open and close the driver automatically but because of a bug in
// CUDA 11.8, it sometimes segfault. See <https://github.com/rapidsai/kvikio/issues/159>.
CUfileError_t const error = DriverOpen();
Expand Down Expand Up @@ -186,4 +210,22 @@ inline bool is_batch_available()
constexpr bool is_batch_available() { return false; }
#endif

/**
* @brief Check if cuFile's stream API is available
*
* @return The boolean answer
*/
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
inline bool is_stream_available()
{
try {
return cuFileAPI::instance().stream_available;
} catch (const std::runtime_error&) {
return false;
}
}
#else
constexpr bool is_stream_available() { return false; }
#endif

} // namespace kvikio