AsyncResult
object#
apply()
returns in the noblock
mode an AsyncResult
object. This allows inquiries with the get()
method at a later point in time. In addition, metadata occurring during execution is also collected in this object.
The AsyncResult object provides a number of useful functions for parallelisation that can be accessed through Python’s multiprocessing.pool.AsyncResult:
get_dict
#
AsyncResult.get_dict()
[1]:
import os
import ipyparallel as ipp
rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pids = ar.get_dict()
rc[:]['pid_map'] = pids
Metadata#
Client.metadata
Timing#
Iterable map results#
[2]:
from __future__ import print_function
import time
import ipyparallel as ipp
# create client & view
rc = ipp.Client()
dv = rc[:]
v = rc.load_balanced_view()
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv.scatter('id', rc.ids, flatten=True)
print("Engine IDs: ", dv['id'])
# create a Reference to `id`. This will be a different value on each engine
ref = ipp.Reference('id')
print("sleeping for `id` seconds on each engine")
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
print("%i: %.3f"%(i, time.time()-tic))
def sleep_here(t):
import time
time.sleep(t)
return id,t
# one call per task
print("running with one call per task")
amr = v.map(sleep_here, [.01*t for t in range(100)])
tic = time.time()
for i,r in enumerate(amr):
print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))
print("running with four calls per task")
# with chunksize, we can have four calls per task
amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))
print("running with two calls per task, with unordered results")
# We can even iterate through faster results first, with ordered=False
amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
tic = time.time()
for i,r in enumerate(amr):
print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic))
Engine IDs: [0, 1, 2]
sleeping for `id` seconds on each engine
0: 0.027
1: 1.022
2: 2.024
running with one call per task
task 0 on engine 2: 0.000
task 1 on engine 1: 0.001
task 2 on engine 0: 0.001
task 3 on engine 2: 0.001
task 4 on engine 1: 0.001
…
slept 0.12s on engine 2: 16.868
slept 0.11s on engine 2: 16.868
slept 0.14s on engine 0: 16.873
slept 0.13s on engine 0: 16.873
slept 0.16s on engine 1: 16.893
…
[4]:
from functools import reduce
from math import sqrt
import numpy as np
X = np.linspace(0,100)
add = lambda a,b: a+b
sq = lambda x: x*x
sqrt(reduce(add, map(sq, X)) / len(X))
[4]:
58.028845747399714
map(sq, X)
computes the square of each item in the list.reduce(add, sqX) / len(X)
calculates the mean by adding the list ofAsyncMapResult
and dividing by the number.Square root of the resulting number.
See also:
If you want to expand the results of AsyncResult
or AsyncMapResult
you can do so with the msg_ids
attribute. You can find an example for this at ipyparallel/examples/customresults.py.