Description
TLDR
I would like to introduce a Task
class (name TBD) instead of using tuples to define our graphs. I believe this can help us with many problem spaces like serialization, stringification, graph composition and analysis, etc.
Motivation
Currently, all task graphs are defined as dictionaries that are mapping a hashable to a tuple of things
Example
dsk = {
"key-1": (func, "a", "b"),
"key-2": (func, "key-1", "b"),
"key-3": (func, (func2, "c", "key-1"), "key-2")
}
This examples showcases three different possibilities of defining tasks
key-1
: is the simplest example. This is a function that is supposed to be called with a couple of arguments. This will translate to an expression likekey_1 = func("a", "b")
key-2
: This is a task that defines a dependency tokey-1
by defining the literal key value as an argument. [1, 2] It will evaluate tokey_2 = func(key_1, "b")
.key-3
: It is even possible to define tasks recursively with arbitrary nesting levels. It will evaluate tokey_3 = func(func2("c", key_1), key_2)
.
To handle this structure, several utility functions are typically used through the code base to convert strings, generate tokens or walk the graph recursively. Below a couple of important methods
istask
inspects and object and guesses that if the structure is approximately as described above, the object is a task specget_deps
,get_dependencies
andkeys_in_tasks
which is used to walk the dask graph_execute_task
which is used to execute a task and walk a recursive function likekey-3
. Note that a very similar functionality is provided bySubgraphCallable
.dumps_task
to serialize such a tuple and again recursing through it. The developer has to ensure that this only happens after ``
The important piece to take away is that we are iterating and recursing over almost every iterable we can find in the graph and replace matching keys automatically. Multiple times. This is very error prone and not particularly performance sensitive [3].
Apart from unnecessarily walking the graph, a lot of the HighLevelGraph
and distributed.protocol
complexity stems from the attempt to not deserialize payload data or user functions on the scheduler. The reasons for this are plentiful but also being debated recently (e.g. scheduler environments differ from clients, performance by not serializing data unnecessarily). I consider this only a minor motivator for this change but I think it's a nice to have and comes for free.
Proposal
Most, if not all, of the above described complexity could be avoided if we chose to go for a very explicit task representation that is not based on an encoded tuple which would avoid any semantic confusion and would reduce our need to walk and recurse significantly.
from distributed.utils_comm import WrappedKey
from typing import Callable, Any, NewType
Key = NewType("Key", str)
_RemotePlaceholder = object()
class Task:
func: Callable
key: Key
dependencies: dict[int, Key | WrappedKey]
argspec: list[Any]
__slots__ = tuple(__annotations__)
def __init__(self, key: Key, func, args):
self.key = key
self.func = func
self.dependencies = {}
parsed_args: list[Any] = [None] * len(args)
for ix, arg in enumerate(args):
if isinstance(arg, (Key, WrappedKey)):
self.dependencies[ix] = arg
parsed_args[ix] = _RemotePlaceholder
else:
parsed_args[ix] = arg
self.argspec = parsed_args
def pack(self):
# Serialization could be expresse by just convering between two objects
# ... or this logic would just be implemented in __reduce__
return PackedTask(
key=self.key,
dependencies=self.dependencies,
# This could be just a pickle.dumps
runspec=dumps({"func": self.func, "argspec": self.argspec}),
)
def __call__(self, *args, **kwargs):
# resolve the input and match it to the dependencies
argspec = self.match_args_with_dependencies(*args, **kwargs)
return self.func(*argspec)
# Note how graph related metadata stays untouched between packing/unpacking. In
# the end, the dask scheduler only cares about the graph metadata and everything
# else is a pass through.
# More complicated versions of this could also curry arguments, e.g. user
# functions or payload data, while keeping others still free for later
# optimizations
class PackedTask:
key: Key
runspec: bytes
dependencies: dict[int, Key]
__slots__ = tuple(__annotations__)
def __init__(self, key: Key, runspec: bytes, dependencies: dict[int, Key]):
self.key = key
self.runspec = runspec
self.dependencies = dependencies
def unpack(self):
spec = loads(self.runspec)
return Task(
key=self.key,
# We could implement a fast path that
# is not computing dependencies again
#
# dependencies=self.dependencies,
func=spec["func"],
args=spec["argspec"],
)
@property
def nbytes(self) -> int:
# Some utility things could become object properties
...
I ran a couple of micro benchmarks to estimate the performance impact of using a slotted class instead of tuples and could indeed see a slowdown during graph generation (maybe a factor of 2) but this appears to be easily amortized by just not walking the graph so often and not recursing into everything and matching strings, etc. since this class exposes everything we'd be interested to know.
Cherry on top, this would allow us to have an identical definition of a tasks runspec in vanilla dask, on the client, scheduler and worker. Specifically, we could get rid of all the dumps_function
, loads_function
, dumps_task
, execute_task
, etc. in distributed/worker.py
.
A migration path would be straight forward since we would convert at the outmost layer a legacy task graph to a new task graph such that all internal systems can use this representation instead. Nested tuples would be converted to SubgraphCallable
(in fact, I believe in this world we should make SubgraphCallable
a Task
but that's a detail)
I can genuinely only see benefits. This will be a bit of tedious work but I believe it would make working on internals so much easier, particularly around serialization topics.
Thoughts? Am I missing something?
[1] The fact that we're using simple literals to identify dependencies can cause problems for developers if "stringification" is not properly applied since keys are not necessarily required to be strings until they reach the scheduler. However, if graph construction was not done properly, this can cause spurious errors, e.g. because the user function receives the non-stringified literal instead of the actual data.
[2] Whenever users are providing a key
themselves, e.g. in scatter or compute this can cause dask to resolve dependency structures falsely and confuses literal user argumnets with keys.
[3] All of these functions are written efficiently and are operating on builtins so the performance overhead is OK but still unnecessary.
Activity