Customize initialization
Contents
Customize initialization¶
Often we want to run custom code when we start up or tear down a scheduler or
worker. We might do this manually with functions like Client.run
or
Client.run_on_scheduler
, but this is error prone and difficult to automate.
To resolve this, Dask includes a few mechanisms to run arbitrary code around the lifecycle of a Scheduler, Worker, Nanny, or Client.
Preload Scripts¶
Both dask-scheduler
and dask-worker
support a --preload
option that
allows custom initialization of each scheduler/worker respectively. A module or
Python file passed as a --preload
value is guaranteed to be imported before
establishing any connection. A dask_setup(service)
function is called if
found, with a Scheduler
, Worker
, Nanny
, or Client
instance as
the argument. As the service stops, dask_teardown(service)
is called if
present.
To support additional configuration, a single --preload
module may register
additional command-line arguments by exposing dask_setup
as a Click
command. This command will be used to parse additional arguments provided to
dask-worker
or dask-scheduler
and will be called before service
initialization.
Example¶
As an example, consider the following file that creates a scheduler plugin and registers it with the scheduler
# scheduler-setup.py
import click
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def __init__(self, print_count):
self.print_count = print_count
super().__init__()
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at:", worker)
if self.print_count and scheduler is not None:
print("Total workers:", len(scheduler.workers))
@click.command()
@click.option("--print-count/--no-print-count", default=False)
def dask_setup(scheduler, print_count):
plugin = MyPlugin(print_count)
scheduler.add_plugin(plugin)
We can then run this preload script by referring to its filename (or module name if it is on the path) when we start the scheduler:
dask-scheduler --preload scheduler-setup.py --print-count
Types¶
Preloads can be specified as any of the following forms:
A path to a script, like
/path/to/myfile.py
A module name that is on the path, like
my_module.initialize
The text of a Python script, like
import os; os.environ["A"] = "value"
Configuration¶
Preloads can also be registered with configuration at the following values:
distributed:
scheduler:
preload:
- "import os; os.environ['A'] = 'b'" # use Python text
- /path/to/myfile.py # or a filename
- my_module # or a module name
preload-argv:
- [] # Pass optional keywords
- ["--option", "value"]
- []
worker:
preload: []
preload-argv: []
nanny:
preload: []
preload-argv: []
client:
preload: []
preload-argv: []
Note
Because the dask-worker
command needs to accept keywords for both the
Worker and the Nanny (if a nanny is used) it has both a --preload
and
--preload-nanny
keyword. All extra keywords (like --print-count
above) will be sent to the workers rather than the nanny. There is no way
to specify extra keywords to the nanny preload scripts on the command line.
We recommend the use of the more flexible configuration if this is
necessary.
Worker Lifecycle Plugins¶
You can also create a class with setup
, teardown
, and transition
methods,
and register that class with the scheduler to give to every worker using the
Client.register_worker_plugin
method.
|
Registers a lifecycle worker plugin for all current and future workers. |
- Client.register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[source]
Registers a lifecycle worker plugin for all current and future workers.
Deprecated since version 2023.9.2: Use
Client.register_plugin()
instead.This registers a new object to handle setup, task state transitions and teardown for workers in this cluster. The plugin will instantiate itself on all currently connected workers. It will also be run on any worker that connects in the future.
The plugin may include methods
setup
,teardown
,transition
, andrelease_key
. See thedask.distributed.WorkerPlugin
class or the examples below for the interface and docstrings. It must be serializable with the pickle or cloudpickle modules.If the plugin has a
name
attribute, or if thename=
keyword is used then that will control idempotency. If a plugin with that name has already been registered, then it will be removed and replaced by the new one.For alternatives to plugins, you may also wish to look into preload scripts.
- Parameters
- pluginWorkerPlugin or NannyPlugin
WorkerPlugin or NannyPlugin instance to register.
- namestr, optional
A name for the plugin. Registering a plugin with the same name will have no effect. If plugin has no name attribute a random name is used.
- nannybool, optional
Whether to register the plugin with workers or nannies.
See also
distributed.WorkerPlugin
unregister_worker_plugin
Examples
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, ... **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin)
You can get access to the plugin with the
get_worker
function>>> client.register_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state
>>> future = client.run(f)