Introduction¶
Rationale¶
Asyncio has builtin support for starting new tasks.
But for many reasons raw tasks are not sufficient for daily needs:
Fire-and-forget call like
asyncio.create_task(f())
doesn’t give control about errors raised fromf()
async function: all exceptions are thrown intoasyncio.AbstractEventLoop.call_exception_handler()
.Tasks are not grouped into a container.
asyncio.Task.get_tasks()
gets all of them but it is not very helpful: typical asyncio program creates a lot of internal tasks.Graceful shutdown requires correct cancellation for task spawned by user.
Very often a limit for amount of concurrent user tasks a desirable to prevent over-flooding.
Web servers like aiohttp.web
introduces more challenges: a code
executed by web-handler might be closed at every await
by
HTTP client disconnection.
Sometimes it is desirable behavior. If server makes long call to database on processing GET HTTP request and the request is cancelled there is no reason to continue data collecting: output HTTP channel is closed anyway.
But sometimes HTTP POST processing requires atomicity: data should be put into DB or sent to other server regardless of HTTP client disconnection. It could be done by spawning a new task for data sending but see concerns above.
Solution¶
aiojobs
provides two concepts: Job and Scheduler.
Job is a wrapper around asyncio task. Jobs could be spawned to start async function execution, awaited for result/exception and closed.
Scheduler is a container for spawned jobs with implied concurrency limit.
Scheduler’s jobs could be enumerated and closed.
There is simple usage example:
scheduler = aiojobs.Scheduler()
job = await scheduler.spawn(f())
await scheduler.close()
Every job could be explicitly awaited for its result or closed:
result = await job.wait(timeout=5.0)
result = await job.close()
All exceptions raised by job’s async function are propagated to caller
by explicit Job.wait()
and Job.close()
calls.
In case of fire-and-forget scheduler.spawn(f())
call without
explicit awaiting the error is passed to
Scheduler.call_exception_handler()
for logging.
Concurrency limit¶
The Scheduler
has implied limit for amount of concurrent jobs
(100
by default).
Suppose we have 100 active jobs already. Next Scheduler.spawn()
call pushed a new job into pending list. Once one of already running
jobs stops next job from pending list is executed.
It prevents a program over-flooding by running a billion of jobs at the same time.
The limit could be relaxed by passing limit parameter into
Scheduler
: aiojobs.Scheduler(limit=100000)
or even
disabled by limit=None
.
Graceful Shutdown¶
All spawned jobs are stopped and closed by Scheduler.close()
.
The call has a timeout for waiting for close:
Scheduler.close_timeout
(0.1
second by default).
If spawned job’s closing time takes more than timeout a message is logged by
Scheduler.call_exception_handler()
.
Close timeout could be overridden in Scheduler
:
aiojobs.Scheduler(close_timeout=10)
Introspection¶
A scheduler is container for jobs.
Atomicity¶
The library has no guarantee for job execution starting.
The problem is:
task = asyncio.create_task(coro())
task.cancel()
cancels a task immediately, a code from coro()
has no chance to
be executed.
Adding a context switch like asyncio.sleep(0)
between
create_task()
and cancel()
calls doesn’t solve the problem:
callee could be cancelled on waiting for sleep()
also.
Thus shielding an async function task =
asyncio.create_task(asyncio.shield(coro()))
still doesn’t guarantee
that coro()
will be executed if callee is cancelled.