Skip to content

Commit

Permalink
Zarr+CuPy+GDS+nvCOMP made easy (rapidsai#267)
Browse files Browse the repository at this point in the history
Introducing `open_cupy_array()`, which is a CUDA friendly version of `zarr.open_array` that reads and writes to CuPy arrays.

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Benjamin Zaitlen (https://github.com/quasiben)
  - Lawrence Mitchell (https://github.com/wence-)

URL: rapidsai#267
  • Loading branch information
madsbk authored Aug 25, 2023
1 parent 35424d2 commit 37b9d00
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 11 deletions.
58 changes: 58 additions & 0 deletions python/examples/zarr_cupy_nvcomp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import cupy
import numpy
import zarr

import kvikio
import kvikio.zarr


def main(path):
a = cupy.arange(20)

# Let's use KvikIO's convenience function `open_cupy_array()` to create
# a new Zarr file on disk. Its semantic is the same as `zarr.open_array()`
# but uses a GDS file store, nvCOMP compression, and CuPy arrays.
z = kvikio.zarr.open_cupy_array(store=path, mode="w", shape=(20,), chunks=(5,))

# `z` is a regular Zarr Array that we can write to as usual
z[0:10] = numpy.arange(0, 10)
# but it also support direct reads and writes of CuPy arrays
z[10:20] = cupy.arange(10, 20)

# Reading `z` returns a CuPy array
assert isinstance(z[:], cupy.ndarray)
assert (a == z[:]).all()

# Normally, we cannot assume that GPU and CPU compressors are compatible.
# E.g., `open_cupy_array()` uses nvCOMP's Snappy GPU compression by default,
# which, as far as we know, isn’t compatible with any CPU compressor. Thus,
# let’s re-write our Zarr array using a CPU and GPU compatible compressor.
z = kvikio.zarr.open_cupy_array(
store=path,
mode="w",
shape=(20,),
chunks=(5,),
compressor=kvikio.zarr.CompatCompressor.lz4(),
)
z[:] = a

# Because we are using a CompatCompressor, it is now possible to open the file
# using Zarr's built-in LZ4 decompressor that uses the CPU.
z = zarr.open_array(path)
# `z` is now read as a regular NumPy array
assert isinstance(z[:], numpy.ndarray)
assert (a.get() == z[:]).all()
# and we can write to is as usual
z[:] = numpy.arange(20, 40)

# And we can read the Zarr file back into a CuPy array.
z = kvikio.zarr.open_cupy_array(store=path, mode="r")
assert isinstance(z[:], cupy.ndarray)
assert (cupy.arange(20, 40) == z[:]).all()


if __name__ == "__main__":
main("/tmp/zarr-cupy-nvcomp")
6 changes: 3 additions & 3 deletions python/kvikio/nvcomp_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from numcodecs.abc import Codec
from numcodecs.compat import ensure_contiguous_ndarray_like

import kvikio._lib.libnvcomp_ll as _ll
from kvikio._lib.libnvcomp_ll import SUPPORTED_ALGORITHMS


class NvCompBatchCodec(Codec):
Expand All @@ -34,11 +34,11 @@ def __init__(
stream: Optional[cp.cuda.Stream] = None,
) -> None:
algo_id = algorithm.lower()
algo_t = _ll.SUPPORTED_ALGORITHMS.get(algo_id, None)
algo_t = SUPPORTED_ALGORITHMS.get(algo_id, None)
if algo_t is None:
raise ValueError(
f"{algorithm} is not supported. "
f"Must be one of: {list(_ll.SUPPORTED_ALGORITHMS.keys())}"
f"Must be one of: {list(SUPPORTED_ALGORITHMS.keys())}"
)

self.algorithm = algo_id
Expand Down
176 changes: 170 additions & 6 deletions python/kvikio/zarr.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.
from __future__ import annotations

import contextlib
import os
import os.path
from abc import abstractmethod
from typing import Any, Mapping, Sequence
from typing import Any, Literal, Mapping, Optional, Sequence, Union

import cupy
import numcodecs
import numpy
import numpy as np
import zarr
Expand All @@ -20,6 +22,9 @@

import kvikio
import kvikio.nvcomp
import kvikio.nvcomp_codec
import kvikio.zarr
from kvikio.nvcomp_codec import NvCompBatchCodec

MINIMUM_ZARR_VERSION = "2.15"

Expand All @@ -37,22 +42,60 @@ class GDSStore(zarr.storage.DirectoryStore):
It uses KvikIO for reads and writes, which in turn will use GDS
when applicable.
Parameters
----------
path : string
Location of directory to use as the root of the storage hierarchy.
normalize_keys : bool, optional
If True, all store keys will be normalized to use lower case characters
(e.g. 'foo' and 'FOO' will be treated as equivalent). This can be
useful to avoid potential discrepancies between case-sensitive and
case-insensitive file system. Default value is False.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
compressor_config_overwrite
If not None, use this `Mapping` to specify what is written to the Zarr metadata
file on disk (`.zarray`). Normally, Zarr writes the configuration[1] given by
the `compressor` argument to the `.zarray` file. Use this argument to overwrite
the normal configuration and use the specified `Mapping` instead.
decompressor_config_overwrite
If not None, use this `Mapping` to specify what compressor configuration[1] is
used for decompressing no matter the configuration found in the Zarr metadata
on disk (the `.zarray` file).
[1] https://github.com/zarr-developers/numcodecs/blob/cb155432/numcodecs/abc.py#L79
Notes
-----
GDSStore doesn't implement `_fromfile()` thus non-array data such as
meta data is always read into host memory.
This is because only zarr.Array use getitems() to retrieve data.
Atomic writes are used, which means that data are first written to a
temporary file, then moved into place when the write is successfully
completed. Files are only held open while they are being read or written and are
closed immediately afterwards, so there is no need to manually close any files.
Safe to write in multiple threads or processes.
"""

# The default output array type used by getitems().
default_meta_array = numpy.empty(())

def __init__(self, *args, **kwargs) -> None:
def __init__(
self,
path,
normalize_keys=False,
dimension_separator=None,
*,
compressor_config_overwrite: Optional[Mapping] = None,
decompressor_config_overwrite: Optional[Mapping] = None,
) -> None:
if not kvikio.zarr.supported:
raise RuntimeError(
f"GDSStore requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}"
)
super().__init__(*args, **kwargs)
super().__init__(
path, normalize_keys=normalize_keys, dimension_separator=dimension_separator
)
self.compressor_config_overwrite = compressor_config_overwrite
self.decompressor_config_overwrite = decompressor_config_overwrite

def __eq__(self, other):
return isinstance(other, GDSStore) and self.path == other.path
Expand All @@ -62,6 +105,23 @@ def _tofile(self, a, fn):
written = f.write(a)
assert written == a.nbytes

def __getitem__(self, key):
ret = super().__getitem__(key)
if self.decompressor_config_overwrite and key == ".zarray":
meta = self._metadata_class.decode_array_metadata(ret)
if meta["compressor"]:
meta["compressor"] = self.decompressor_config_overwrite
ret = self._metadata_class.encode_array_metadata(meta)
return ret

def __setitem__(self, key, value):
if self.compressor_config_overwrite and key == ".zarray":
meta = self._metadata_class.decode_array_metadata(value)
if meta["compressor"]:
meta["compressor"] = self.compressor_config_overwrite
value = self._metadata_class.encode_array_metadata(meta)
super().__setitem__(key, value)

def getitems(
self,
keys: Sequence[str],
Expand Down Expand Up @@ -237,3 +297,107 @@ def get_nvcomp_manager(self):
nvcomp_compressors = [ANS, Bitcomp, Cascaded, Gdeflate, LZ4, Snappy]
for c in nvcomp_compressors:
register_codec(c)


class CompatCompressor:
"""A pair of compatible compressors one using the CPU and one using the GPU"""

def __init__(self, cpu: Codec, gpu: Codec) -> None:
self.cpu = cpu
self.gpu = gpu

@classmethod
def lz4(cls) -> CompatCompressor:
"""A compatible pair of LZ4 compressors"""
return cls(cpu=numcodecs.LZ4(), gpu=NvCompBatchCodec("lz4"))


def open_cupy_array(
store: Union[os.PathLike, str],
mode: Literal["r", "r+", "a", "w", "w-"] = "a",
compressor: Codec | CompatCompressor = Snappy(device_ordinal=0),
meta_array=cupy.empty(()),
**kwargs,
) -> zarr.Array:
"""Open an Zarr array as a CuPy-like array using file-mode-like semantics.
This function is a CUDA friendly version of `zarr.open_array` that reads
and writes to CuPy arrays. Beside the arguments listed below, the arguments
have the same semantics as in `zarr.open_array`.
Parameters
----------
store
Path to directory in file system. As opposed to `zarr.open_array`,
Store and path to zip files isn't supported.
mode
Persistence mode: 'r' means read only (must exist); 'r+' means
read/write (must exist); 'a' means read/write (create if doesn't
exist); 'w' means create (overwrite if exists); 'w-' means create
(fail if exists).
compressor
The compressor used when creating a Zarr file or None if no compressor
is to be used. If a `CompatCompressor` is given, `CompatCompressor.gpu`
is used for compression and decompression; and `CompatCompressor.cpu`
is written as the compressor in the Zarr file metadata on disk.
This argument is ignored in "r" and "r+" mode. By default the
Snappy compressor by nvCOMP is used.
meta_array : array-like, optional
An CuPy-like array instance to use for determining arrays to create and
return to users. It must implement `__cuda_array_interface__`.
**kwargs
The rest of the arguments are forwarded to `zarr.open_array` as-is.
Returns
-------
Zarr array backed by a GDS file store, nvCOMP compression, and CuPy arrays.
"""

if not isinstance(store, (str, os.PathLike)):
raise ValueError("store must be a path")
store = str(os.fspath(store))
if not hasattr(meta_array, "__cuda_array_interface__"):
raise ValueError("meta_array must implement __cuda_array_interface__")

if mode in ("r", "r+"):
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(path=store),
mode=mode,
meta_array=meta_array,
**kwargs,
)
# If we are reading a LZ4-CPU compressed file, we overwrite the metadata
# on-the-fly to make Zarr use LZ4-GPU for both compression and decompression.
compat_lz4 = CompatCompressor.lz4()
if ret.compressor == compat_lz4.cpu:
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=compat_lz4.cpu.get_config(),
decompressor_config_overwrite=compat_lz4.gpu.get_config(),
),
mode=mode,
meta_array=meta_array,
**kwargs,
)
return ret

if isinstance(compressor, CompatCompressor):
compressor_config_overwrite = compressor.cpu.get_config()
decompressor_config_overwrite = compressor.gpu.get_config()
compressor = compressor.gpu
else:
compressor_config_overwrite = None
decompressor_config_overwrite = None

return zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=compressor_config_overwrite,
decompressor_config_overwrite=decompressor_config_overwrite,
),
mode=mode,
meta_array=meta_array,
compressor=compressor,
**kwargs,
)
12 changes: 11 additions & 1 deletion python/tests/test_examples.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import os
Expand All @@ -16,3 +16,13 @@ def test_hello_world(tmp_path, monkeypatch):

monkeypatch.syspath_prepend(str(examples_path))
import_module("hello_world").main(tmp_path / "test-file")


def test_zarr_cupy_nvcomp(tmp_path, monkeypatch):
"""Test examples/zarr_cupy_nvcomp.py"""

# `examples/zarr_cupy_nvcomp.py` requires the Zarr submodule
pytest.importorskip("kvikio.zarr")

monkeypatch.syspath_prepend(str(examples_path))
import_module("zarr_cupy_nvcomp").main(tmp_path / "test-file")
Loading

0 comments on commit 37b9d00

Please sign in to comment.