This post explores using the ITK suite of image processing utilities in parallel with Dask Array.
We cover …
Let’s start with a full example applying Richardson Lucy deconvolution to astack of light sheet microscopy data. This is the same data that we showed howto load in our last blogpost on image loading:
# Load our data from last time¶
import dask.array as da
imgs = da.from_zarr("AOLLSMData_m4_raw.zarr/", "data")
Array Chunk Bytes 188.74 GB 316.15 MB Shape (3, 199, 201, 1024, 768) (1, 1, 201, 1024, 768) Count 598 Tasks 597 Chunks Type uint16 numpy.ndarray 1993 7681024201
# Load our Point Spread Function (PSF)
import dask.array.image
psf = dask.array.image.imread("AOLLSMData/m4/psfs_z0p1/*.tif")[:, None, ...]
Array Chunk Bytes 2.48 MB 827.39 kB Shape (3, 1, 101, 64, 64) (1, 1, 101, 64, 64) Count 6 Tasks 3 Chunks Type uint16 numpy.ndarray 13 6464101
# Convert data to float32 for computation¶
import numpy as np
imgs = imgs.astype(np.float32)
# Note: the psf needs to be sampled with a voxel spacing
# consistent with the image's sampling
psf = psf.astype(np.float32)
# Apply Richardson-Lucy Deconvolution¶
def richardson_lucy_deconvolution(img, psf, iterations=1):
""" Apply deconvolution to a single chunk of data """
import itk
img = img[0, 0, ...] # remove leading two length-one dimensions
psf = psf[0, 0, ...] # remove leading two length-one dimensions
image = itk.image_view_from_array(img) # Convert to ITK object
kernel = itk.image_view_from_array(psf) # Convert to ITK object
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image,
kernel_image=kernel,
number_of_iterations=iterations
)
result = itk.array_from_image(deconvolved) # Convert back to Numpy array
result = result[None, None, ...] # Add back the leading length-one dimensions
return result
out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
# Create a local cluster of dask worker processes
# (this could also point to a distributed cluster if you have it)
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster) # now dask operations use this cluster by default
# Trigger computation and store
out.to_zarr("AOLLSMData_m4_raw.zarr", "deconvolved", overwrite=True)
So in the example above we …
From the perspective of an imaging scientist,the new piece of technology here is thedask.array.map_blocks function.Given a Dask array composed of many NumPy arrays and a function, map_blocks applies that function across each block in parallel, returning a Dask array as a result.It’s a great tool whenever you want to apply an operation across many blocks in a simple fashion.Because Dask arrays are just made out of Numpy arrays it’s an easy way tocompose Dask with the rest of the Scientific Python ecosystem.
However in this case there are a few challenges to constructing the right Numpy-> Numpy function, due to both idiosyncrasies in ITK and Dask Array. Let’slook at our function again:
def richardson_lucy_deconvolution(img, psf, iterations=1):
""" Apply deconvolution to a single chunk of data """
import itk
img = img[0, 0, ...] # remove leading two length-one dimensions
psf = psf[0, 0, ...] # remove leading two length-one dimensions
image = itk.image_view_from_array(img) # Convert to ITK object
kernel = itk.image_view_from_array(psf) # Convert to ITK object
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image,
kernel_image=kernel,
number_of_iterations=iterations
)
result = itk.array_from_image(deconvolved) # Convert back to Numpy array
result = result[None, None, ...] # Add back the leading length-one dimensions
return result
out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
This is longer than we would like.Instead, we would have preferred to just use the itk function directly,without all of the steps before and after.
deconvolved = da.map_blocks(itk.richardson_lucy_deconvolution_image_filter, imgs, psf)
What were the extra steps in our function and why were they necessary?
But if you’re comfortable working around things like this,then ITK and map_blocks can be a powerful combinationif you want to parallelize out ITK operations across a cluster.
Above we used dask.distributed.LocalCluster to set up 20 single-threadedworkers on our local workstation:
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster) # now dask operations use this cluster by default
If you had a distributed resource, this is where you would connect it.You would swap out LocalCluster with one ofDask’s other deployment options.
Also, we found that we needed to use many single-threaded processes rather thanone multi-threaded process because ITK functions seem to still hold onto theGIL. This is fine, we just need to be aware of it so that we set up our Daskworkers appropriately with one thread per process for maximum efficiency.See ITK #1134for an active Github issue on this topic.
We had some difficulty when using the ITK library across multiple processes,because the library itself didn’t serialize well. (If you don’t understandwhat that means, don’t worry). We solved a bit of this inITK #1090,but some issues still remain.
We got around this by including the import in the function rather than outsideof it.
def richardson_lucy_deconvolution(img, psf, iterations=1):
import itk # <--- we work around serialization issues by importing within the function
That way each task imports itk individually, and we sidestep this issue.
We also tried out the Richardson Lucy deconvolution operation inScikit-Image. Scikit-Image is known for beingmore Scipy/Numpy native, but not always as fast as ITK. Our experienceconfirmed this perception.
First, we were glad to see that the scikit-image function worked withmap_blocks immediately without any packing/unpacking, dimensionality, orserialization issues:
import skimage.restoration
out = da.map_blocks(skimage.restoration.richardson_lucy, imgs, psf) # just works
So all of that converting to and from image objects or removing and addingsingleton dimensions isn’t necessary here.
In terms of performance we were also happy to see that Scikit-Image releasedthe GIL, so we were able to get very high reported CPU utilization when using asmall number of multi-threaded processes. However, even though CPU utilizationwas high, our parallel performance was poor enough that we stuck with the ITKsolution, warts and all. More information about this is available inGithub issue scikit-image #4083.
Note: sequentially on a single chunk, ITK ran in around 2 minutes whilescikit-image ran in 3 minutes. It was only once we started parallelizing thatthings became slow.
Regardless, our goal in this experiment was to see how well ITK and Daskarray played together. It was nice to see what smooth integration looks like,if only to motivate future development in ITK+Dask relations.
An alternative to da.map_blocks are Generalized Universal Functions (gufuncs)These are functions that have many magical properties, one of which is thatthey operate equally well on both NumPy and Dask arrays. If libraries likeITK or Scikit-Image make their functions into gufuncs then they work withoutusers having to do anything special.
The easiest way to implement gufuncs today is with Numba. I did this on ourwrapped richardson_lucy function, just to show how it could work, in caseother libraries want to take this on in the future.
import numba
@numba.guvectorize(
["float32[:,:,:], float32[:,:,:], float32[:,:,:]"], # we have to specify types
"(i,j,k),(a,b,c)->(i,j,k)", # and dimensionality explicitly
forceobj=True,
)
def richardson_lucy_deconvolution(img, psf, out):
# <---- no dimension unpacking!
iterations = 1
image = itk.image_view_from_array(np.ascontiguousarray(img))
kernel = itk.image_view_from_array(np.ascontiguousarray(psf))
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image, kernel_image=kernel, number_of_iterations=iterations
)
out[:] = itk.array_from_image(deconvolved)
# Now this function works natively on either NumPy or Dask arrays
out = richardson_lucy_deconvolution(imgs, psf) # <-- no map_blocks call!
Note that we’ve both lost the dimension unpacking and the map_blocks call.Our function now knows enough information about how it can broadcast that Daskcan do the parallelization without being told what to do explicitly.
This adds some burden onto library maintainers,but makes the user experience much more smooth.
When doing some user research on image processing and Dask, almost everyone weinterviewed said that they wanted faster deconvolution. This seemed to be amajor pain point. Now we know why. It’s both very common, and very slow.
Running deconvolution on a single chunk of this size takes around 2-4 minutes,and we have hundreds of chunks in a single dataset. Multi-core parallelism canhelp a bit here, but this problem may also be ripe for GPU acceleration.Similar operations typically have 100x speedups on GPUs. This might be a morepragmatic solution than scaling out to large distributed clusters.
This experiment both …
We’re also going to continue with our imaging experiment, while these technicalissues get worked out in the background. Next up, segmentation!