Skip to content

Abandon encoded tuples as task definition in dsk graphs #9969

Open
dask/distributed
#8797
@fjetter

Description

TLDR

I would like to introduce a Task class (name TBD) instead of using tuples to define our graphs. I believe this can help us with many problem spaces like serialization, stringification, graph composition and analysis, etc.

Motivation

Currently, all task graphs are defined as dictionaries that are mapping a hashable to a tuple of things

Example

dsk = {
    "key-1": (func, "a", "b"),
    "key-2": (func, "key-1", "b"),
    "key-3": (func, (func2, "c", "key-1"), "key-2")
}

This examples showcases three different possibilities of defining tasks

  • key-1: is the simplest example. This is a function that is supposed to be called with a couple of arguments. This will translate to an expression like key_1 = func("a", "b")
  • key-2: This is a task that defines a dependency to key-1 by defining the literal key value as an argument. [1, 2] It will evaluate to key_2 = func(key_1, "b").
  • key-3: It is even possible to define tasks recursively with arbitrary nesting levels. It will evaluate to key_3 = func(func2("c", key_1), key_2).

To handle this structure, several utility functions are typically used through the code base to convert strings, generate tokens or walk the graph recursively. Below a couple of important methods

  • istask inspects and object and guesses that if the structure is approximately as described above, the object is a task spec
  • get_deps, get_dependencies and keys_in_tasks which is used to walk the dask graph
  • _execute_task which is used to execute a task and walk a recursive function like key-3. Note that a very similar functionality is provided by SubgraphCallable.
  • dumps_task to serialize such a tuple and again recursing through it. The developer has to ensure that this only happens after ``

The important piece to take away is that we are iterating and recursing over almost every iterable we can find in the graph and replace matching keys automatically. Multiple times. This is very error prone and not particularly performance sensitive [3].

Apart from unnecessarily walking the graph, a lot of the HighLevelGraph and distributed.protocol complexity stems from the attempt to not deserialize payload data or user functions on the scheduler. The reasons for this are plentiful but also being debated recently (e.g. scheduler environments differ from clients, performance by not serializing data unnecessarily). I consider this only a minor motivator for this change but I think it's a nice to have and comes for free.

Proposal

Most, if not all, of the above described complexity could be avoided if we chose to go for a very explicit task representation that is not based on an encoded tuple which would avoid any semantic confusion and would reduce our need to walk and recurse significantly.

from distributed.utils_comm import WrappedKey
from typing import Callable, Any, NewType

Key = NewType("Key", str)

_RemotePlaceholder = object()


class Task:
    func: Callable
    key: Key
    dependencies: dict[int, Key | WrappedKey]
    argspec: list[Any]

    __slots__ = tuple(__annotations__)

    def __init__(self, key: Key, func, args):
        self.key = key
        self.func = func
        self.dependencies = {}
        parsed_args: list[Any] = [None] * len(args)
        for ix, arg in enumerate(args):
            if isinstance(arg, (Key, WrappedKey)):
                self.dependencies[ix] = arg
                parsed_args[ix] = _RemotePlaceholder
            else:
                parsed_args[ix] = arg
        self.argspec = parsed_args

    def pack(self):
        # Serialization could be expresse by just convering between two objects
        # ... or this logic would just be implemented in __reduce__
        return PackedTask(
            key=self.key,
            dependencies=self.dependencies,
            # This could be just a pickle.dumps
            runspec=dumps({"func": self.func, "argspec": self.argspec}),
        )

    def __call__(self, *args, **kwargs):
        # resolve the input and match it to the dependencies
        argspec = self.match_args_with_dependencies(*args, **kwargs)
        return self.func(*argspec)

# Note how graph related metadata stays untouched between packing/unpacking. In
# the end, the dask scheduler only cares about the graph metadata and everything
# else is a pass through.
# More complicated versions of this could also curry arguments, e.g. user
# functions or payload data, while keeping others still free for later
# optimizations

class PackedTask:
    key: Key
    runspec: bytes
    dependencies: dict[int, Key]
    __slots__ = tuple(__annotations__)

    def __init__(self, key: Key, runspec: bytes, dependencies: dict[int, Key]):
        self.key = key
        self.runspec = runspec
        self.dependencies = dependencies

    def unpack(self):
        spec = loads(self.runspec)
        return Task(
            key=self.key,
            # We could implement a fast path that
            # is not computing dependencies again
            #
            # dependencies=self.dependencies,
            func=spec["func"],
            args=spec["argspec"],
        )

    @property
    def nbytes(self) -> int:
        # Some utility things could become object properties
        ...

I ran a couple of micro benchmarks to estimate the performance impact of using a slotted class instead of tuples and could indeed see a slowdown during graph generation (maybe a factor of 2) but this appears to be easily amortized by just not walking the graph so often and not recursing into everything and matching strings, etc. since this class exposes everything we'd be interested to know.

Cherry on top, this would allow us to have an identical definition of a tasks runspec in vanilla dask, on the client, scheduler and worker. Specifically, we could get rid of all the dumps_function, loads_function, dumps_task, execute_task, etc. in distributed/worker.py.

A migration path would be straight forward since we would convert at the outmost layer a legacy task graph to a new task graph such that all internal systems can use this representation instead. Nested tuples would be converted to SubgraphCallable (in fact, I believe in this world we should make SubgraphCallable a Task but that's a detail)

I can genuinely only see benefits. This will be a bit of tedious work but I believe it would make working on internals so much easier, particularly around serialization topics.

Thoughts? Am I missing something?

[1] The fact that we're using simple literals to identify dependencies can cause problems for developers if "stringification" is not properly applied since keys are not necessarily required to be strings until they reach the scheduler. However, if graph construction was not done properly, this can cause spurious errors, e.g. because the user function receives the non-stringified literal instead of the actual data.
[2] Whenever users are providing a key themselves, e.g. in scatter or compute this can cause dask to resolve dependency structures falsely and confuses literal user argumnets with keys.
[3] All of these functions are written efficiently and are operating on builtins so the performance overhead is OK but still unnecessary.

cc @rjzamora, @madsbk, @jrbourbeau, @mrocklin, @crusaderky

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    corediscussionDiscussing a topic with no specific actions yetenhancementImprove existing functionality or make things work betterhighlevelgraphIssues relating to HighLevelGraphs.needs attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.scheduler

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions