API¶
Scheduler¶
- class aiojobs.Scheduler(*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None)¶
A container for managed jobs.
Jobs are created by
spawn().close()should be used for finishing all scheduled jobs.The class implements
collections.abc.Collectioncontract, jobs could be iterated etc.:len(scheduler),for job in scheduler,job in scheduleroperations are supported.Class must be instantiated within a running event loop (e.g. in an
asyncfunction).close_timeout is a timeout for job closing,
0.1by default. If job’s closing time takes more than timeout a message is logged byScheduler.call_exception_handler().limit is a limit for jobs spawned by scheduler,
100by default.pending_limit is a limit for amount of jobs awaiting starting,
10000by default. Use0for infinite pending queue size.exception_handler is a callable with
handler(scheduler, context)signature to log unhandled exceptions from jobs (seeScheduler.call_exception_handler()for documentation about context and default implementation).
Note
close_timeout pinned down to
0.1second, it looks too small at first glance. But it is a timeout for waiting cancelled jobs. Normally job is finished immediately if it doesn’t swallowasyncio.CancelledError.But in last case there is no reasonable timeout with good number for everybody, user should pass a value suitable for his environment anyway.
- limit¶
Concurrency limit (
100by default) orNoneif the limit is disabled.
- pending_limit¶
A limit for pending queue size (
0for unlimited queue).See
spawn()for details.New in version 0.2.
- close_timeout¶
Timeout for waiting for jobs closing,
0.1by default.
- active_count¶
Count of active (executed) jobs.
- pending_count¶
Count of scheduled but not executed yet jobs.
- async spawn(coro)¶
Spawn a new job for execution coro coroutine.
Return a new
Jobobject.The job might be started immediately or pushed into pending list if concurrency
limitexceeded.If
pending_countis greater thanpending_limitand the limit is finite (not0) the method suspends execution without scheduling a new job (adding it into pending queue) until penging queue size will be reduced to have a free slot.Changed in version 0.2: The method respects
pending_limitnow.
- async close()¶
Close scheduler and all its jobs.
It finishing time for particular job exceeds
close_timeoutthis job is logged bycall_exception_handler().
- exception_handler¶
A callable with signature
(scheduler, context)orNonefor default handler.Used by
call_exception_handler().
- call_exception_handler(context)¶
Log an information about errors in not explicitly awaited jobs and jobs that close procedure exceeds
close_timeout.By default calls
asyncio.AbstractEventLoop.call_exception_handler(), the behavior could be overridden by passing exception_handler parameter intoScheduler.context is a
dictwith the following keys:message: error message,
strjob: failed job,
Jobinstanceexception: caught exception,
Exceptioninstancesource_traceback: a traceback at the moment of job creation (present only for debug event loops, see also
PYTHONASYNCIODEBUG).
Job¶
- class aiojobs.Job¶
A wrapper around spawned async function.
Job has three states:
pending: spawn but not executed yet because of concurrency limit
active: is executing now
closed: job has finished or stopped.
All exception not explicitly awaited by
wait()andclose()are logged byScheduler.call_exception_handler()- active¶
Job is executed now
- pending¶
Job was spawned by actual execution is delayed because scheduler reached concurrency limit.
- closed¶
Job is finished.
- async wait(*, timeout=None)¶
Wait for job finishing.
If timeout exceeded
asyncio.TimeoutErrorraised.The job is in closed state after finishing the method.
- async close(*, timeout=None)¶
Close the job.
If timeout exceeded
asyncio.TimeoutErrorraised.The job is in closed state after finishing the method.
Integration with aiohttp web server¶
For using the project with aiohttp server a scheduler should be installed into app and new function should be used for spawning new jobs.
- aiojobs.aiohttp.setup(app, **kwargs)¶
Register
aiohttp.web.Application.on_startupandaiohttp.web.Application.on_cleanuphooks for creatingaiojobs.Scheduleron application initialization stage and closing it on web server shutdown.app -
aiohttp.web.Applicationinstance.kwargs - additional named parameters passed to
aiojobs.Scheduler.
- aiojobs.aiohttp.spawn(request, coro)¶
- :async:
Spawn a new job using scheduler registered into
request.app, or a parentaiohttp.web.Application.request –
aiohttp.web.Requestgiven from web-handlercoro a coroutine to be executed inside a new job
Return
aiojobs.Jobinstance
Helpers
- aiojobs.aiohttp.get_scheduler(request)¶
Return a scheduler from request, raise
RuntimeErrorif scheduler was not registered on application startup phase (seesetup()). The scheduler will be resolved from the current or any parentaiohttp.web.Application, if available.
- aiojobs.aiohttp.get_scheduler_from_app(app)¶
Return a scheduler from aiohttp application or
Noneif scheduler was not registered on application startup phase (seesetup()).
- aiojobs.aiohttp.get_scheduler_from_request(request)¶
Return a scheduler from aiohttp request or
Noneif scheduler was not registered on any application in the hierarchy of parent applications (seesetup())
- @aiojobs.aiohttp.atomic¶
Wrap a web-handler to execute the entire handler as a new job.
@atomic async def handler(request): return web.Response()
is a functional equivalent of
async def inner(request): return web.Response() async def handler(request): job = await spawn(request, inner(request)) return await job.wait()