Skip to content

an example that shows the need for memory backpressure #2602

Closed
@rabernat

Description

@rabernat

In my work with large climate datasets, I often concoct calculations that cause my dask workers to run out of memory, start dumping to disk, and eventually grind my computation to a halt. There are many ways to mitigate this by e.g. using more workers, more memory, better disk-spilling settings, simpler jobs, etc. and these have all been tried over the years with some degree of success. But in this issue, I would like to address what I believe is the root of my problems within the dask scheduler algorithms.

The core problem is that the tasks early in my graph generate data faster than it can be consumed downstream, causing data to pile up, eventually overwhelming my workers. Here is a self contained example:

import dask.array as dsa

# create some random data
# assume chunk structure is not under my control, because it originates
# from the way the data is laid out in the underlying files
shape = (500000, 100, 500)
chunks = (100, 100, 500)
data = dsa.random.random(shape, chunks=chunks)

# now rechunk the data to permit me to do some computations along different axes
# this aggregates chunks along axis 0 and dis-aggregates along axis 1
data_rc = data.rechunk((1000, 1, 500))
FACTOR = 15


def my_custom_function(f):
    # a pretend custom function that would do a bunch of stuff along
    # axis 0 and 2 and then reduce the data heavily
    return f.ravel()[::15][None, :]

# apply that function to each chunk
c1 = math.ceil(data_rc.ravel()[::FACTOR].size / c0)
res = data_rc.map_blocks(my_custom_function, dtype=data.dtype,
                         drop_axis=[1, 2], new_axis=[1], chunks=(1, c1))

res.compute()

(Perhaps this could be simplified further, but I have done my best to preserve the basic structure of my real problem.)

When I watch this execute on my dashboard, I see the workers just keep generating data until they reach their memory thresholds, at which point they start writing data to disk, before my_custom_function ever gets called to relieve the memory buildup. Depending on the size of the problem and the speed of the disks where they are spilling, sometimes we can recover and manage to finish after a very long time. Usually the workers just stop working.

This fail case is frustrating, because often I can achieve a reasonable result by just doing the naive thing:

for n in range(500):
    res[n].compute()

and evaluating my computation in serial.

I wish the dask scheduler knew to stop generating new data before the downstream data could be consumed. I am not an expert, but I believe the term for this is backpressure. I see this term has come up in #641, and also in this blog post by @mrocklin regarding streaming data.

I have a hunch that resolving this problem would resolve many of the pervasive but hard-to-diagnose problems we have in the xarray / pangeo sphere. But I also suspect it is not easy and requires major changes to core algorithms.

Dask version 1.1.4

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions