Scheduler Overview
Contents
Scheduler Overview¶
After we create a dask graph, we use a scheduler to run it. Dask currently implements a few different schedulers:
dask.threaded.get
: a scheduler backed by a thread pooldask.multiprocessing.get
: a scheduler backed by a process pooldask.get
: a synchronous scheduler, good for debuggingdistributed.Client.get
: a distributed scheduler for executing graphs on multiple machines. This lives in the external distributed project.
The get
function¶
The entry point for all schedulers is a get
function. This takes a dask
graph, and a key or list of keys to compute:
>>> from operator import add
>>> dsk = {'a': 1,
... 'b': 2,
... 'c': (add, 'a', 'b'),
... 'd': (sum, ['a', 'b', 'c'])}
>>> get(dsk, 'c')
3
>>> get(dsk, 'd')
6
>>> get(dsk, ['a', 'b', 'c'])
[1, 2, 3]
Using compute
methods¶
When working with dask collections, you will rarely need to
interact with scheduler get
functions directly. Each collection has a
default scheduler, and a built-in compute
method that calculates the output
of the collection:
>>> import dask.array as da
>>> x = da.arange(100, chunks=10)
>>> x.sum().compute()
4950
The compute method takes a number of keywords:
scheduler
: the name of the desired scheduler as a string ("threads"
,"processes"
,"single-threaded"
, etc.), aget
function, or adask.distributed.Client
object. Overrides the default for the collection.**kwargs
: extra keywords to pass on to the schedulerget
function.
See also: Configuring the schedulers.
The compute
function¶
You may wish to compute results from multiple dask collections at once.
Similar to the compute
method on each collection, there is a general
compute
function that takes multiple collections and returns multiple
results. This merges the graphs from each collection, so intermediate results
are shared:
>>> y = (x + 1).sum()
>>> z = (x + 1).mean()
>>> da.compute(y, z) # Compute y and z, sharing intermediate results
(5050, 50.5)
Here the x + 1
intermediate was only computed once, while calling
y.compute()
and z.compute()
would compute it twice. For large graphs
that share many intermediates, this can be a big performance gain.
The compute
function works with any dask collection, and is found in
dask.base
. For convenience it has also been imported into the top level
namespace of each collection.
>>> from dask.base import compute
>>> compute is da.compute
True
Configuring the schedulers¶
The dask collections each have a default scheduler:
dask.array
anddask.dataframe
use the threaded scheduler by defaultdask.bag
uses the multiprocessing scheduler by default.
For most cases, the default settings are good choices. However, sometimes you may want to use a different scheduler. There are two ways to do this.
Using the
scheduler
keyword in thecompute
method:>>> x.sum().compute(scheduler='processes')
Using
dask.config.set
. This can be used either as a context manager, or to set the scheduler globally:# As a context manager >>> with dask.config.set(scheduler='processes'): ... x.sum().compute() # Set globally >>> dask.config.set(scheduler='processes') >>> x.sum().compute()
Additionally, each scheduler may take a few extra keywords specific to that
scheduler. For example, the multiprocessing and threaded schedulers each take a
num_workers
keyword, which sets the number of processes or threads to use
(defaults to number of cores). This can be set by passing the keyword when
calling compute
:
# Compute with 4 threads
>>> x.compute(num_workers=4)
Alternatively, the multiprocessing and threaded schedulers will check for a
global pool set with dask.config.set
:
>>> from concurrent.futures import ThreadPoolExecutor
>>> with dask.config.set(pool=ThreadPoolExecutor(4)):
... x.compute()
The multiprocessing scheduler also supports different contexts (“spawn”,
“forkserver”, “fork”) which you can set with dask.config.set
. The default
context is “spawn”, but you can set a different one:
>>> with dask.config.set({"multiprocessing.context": "forkserver"}):
... x.compute()
For more information on the individual options for each scheduler, see the
docstrings for each scheduler get
function.
Debugging the schedulers¶
Debugging parallel code can be difficult, as conventional tools such as pdb
don’t work well with multiple threads or processes. To get around this when
debugging, we recommend using the synchronous scheduler found at
dask.get
. This runs everything serially, allowing it to work
well with pdb
:
>>> dask.config.set(scheduler='single-threaded')
>>> x.sum().compute() # This computation runs serially instead of in parallel
The shared memory schedulers also provide a set of callbacks that can be used for diagnosing and profiling. You can learn more about scheduler callbacks and diagnostics here.
More Information¶
See Shared Memory for information on the design of the shared memory (threaded or multiprocessing) schedulers
See distributed for information on the distributed memory scheduler