API¶
Scheduler¶
- class aiojobs.Scheduler(*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None)¶
A container for managed jobs.
Jobs are created by
spawn()
.close()
should be used for finishing all scheduled jobs.The class implements
collections.abc.Collection
contract, jobs could be iterated etc.:len(scheduler)
,for job in scheduler
,job in scheduler
operations are supported.Class must be instantiated within a running event loop (e.g. in an
async
function).close_timeout is a timeout for job closing,
0.1
by default. If job’s closing time takes more than timeout a message is logged byScheduler.call_exception_handler()
.limit is a limit for jobs spawned by scheduler,
100
by default.pending_limit is a limit for amount of jobs awaiting starting,
10000
by default. Use0
for infinite pending queue size.exception_handler is a callable with
handler(scheduler, context)
signature to log unhandled exceptions from jobs (seeScheduler.call_exception_handler()
for documentation about context and default implementation).
Note
close_timeout pinned down to
0.1
second, it looks too small at first glance. But it is a timeout for waiting cancelled jobs. Normally job is finished immediately if it doesn’t swallowasyncio.CancelledError
.But in last case there is no reasonable timeout with good number for everybody, user should pass a value suitable for his environment anyway.
- limit¶
Concurrency limit (
100
by default) orNone
if the limit is disabled.
- pending_limit¶
A limit for pending queue size (
0
for unlimited queue).See
spawn()
for details.New in version 0.2.
- close_timeout¶
Timeout for waiting for jobs closing,
0.1
by default.
- active_count¶
Count of active (executed) jobs.
- pending_count¶
Count of scheduled but not executed yet jobs.
- async spawn(coro)¶
Spawn a new job for execution coro coroutine.
Return a new
Job
object.The job might be started immediately or pushed into pending list if concurrency
limit
exceeded.If
pending_count
is greater thanpending_limit
and the limit is finite (not0
) the method suspends execution without scheduling a new job (adding it into pending queue) until penging queue size will be reduced to have a free slot.Changed in version 0.2: The method respects
pending_limit
now.
- async close()¶
Close scheduler and all its jobs.
It finishing time for particular job exceeds
close_timeout
this job is logged bycall_exception_handler()
.
- exception_handler¶
A callable with signature
(scheduler, context)
orNone
for default handler.Used by
call_exception_handler()
.
- call_exception_handler(context)¶
Log an information about errors in not explicitly awaited jobs and jobs that close procedure exceeds
close_timeout
.By default calls
asyncio.AbstractEventLoop.call_exception_handler()
, the behavior could be overridden by passing exception_handler parameter intoScheduler
.context is a
dict
with the following keys:message: error message,
str
job: failed job,
Job
instanceexception: caught exception,
Exception
instancesource_traceback: a traceback at the moment of job creation (present only for debug event loops, see also
PYTHONASYNCIODEBUG
).
Job¶
- class aiojobs.Job¶
A wrapper around spawned async function.
Job has three states:
pending: spawn but not executed yet because of concurrency limit
active: is executing now
closed: job has finished or stopped.
All exception not explicitly awaited by
wait()
andclose()
are logged byScheduler.call_exception_handler()
- active¶
Job is executed now
- pending¶
Job was spawned by actual execution is delayed because scheduler reached concurrency limit.
- closed¶
Job is finished.
- async wait(*, timeout=None)¶
Wait for job finishing.
If timeout exceeded
asyncio.TimeoutError
raised.The job is in closed state after finishing the method.
- async close(*, timeout=None)¶
Close the job.
If timeout exceeded
asyncio.TimeoutError
raised.The job is in closed state after finishing the method.
Integration with aiohttp web server¶
For using the project with aiohttp server a scheduler should be installed into app and new function should be used for spawning new jobs.
- aiojobs.aiohttp.setup(app, **kwargs)¶
Register
aiohttp.web.Application.on_startup
andaiohttp.web.Application.on_cleanup
hooks for creatingaiojobs.Scheduler
on application initialization stage and closing it on web server shutdown.app -
aiohttp.web.Application
instance.kwargs - additional named parameters passed to
aiojobs.Scheduler
.
- aiojobs.aiohttp.spawn(request, coro)¶
- :async:
Spawn a new job using scheduler registered into
request.app
, or a parentaiohttp.web.Application
.request –
aiohttp.web.Request
given from web-handlercoro a coroutine to be executed inside a new job
Return
aiojobs.Job
instance
Helpers
- aiojobs.aiohttp.get_scheduler(request)¶
Return a scheduler from request, raise
RuntimeError
if scheduler was not registered on application startup phase (seesetup()
). The scheduler will be resolved from the current or any parentaiohttp.web.Application
, if available.
- aiojobs.aiohttp.get_scheduler_from_app(app)¶
Return a scheduler from aiohttp application or
None
if scheduler was not registered on application startup phase (seesetup()
).
- aiojobs.aiohttp.get_scheduler_from_request(request)¶
Return a scheduler from aiohttp request or
None
if scheduler was not registered on any application in the hierarchy of parent applications (seesetup()
)
- @aiojobs.aiohttp.atomic¶
Wrap a web-handler to execute the entire handler as a new job.
@atomic async def handler(request): return web.Response()
is a functional equivalent of
async def inner(request): return web.Response() async def handler(request): job = await spawn(request, inner(request)) return await job.wait()