Reference Manual¶
Coroutines¶
Curio executes coroutines. A coroutine is a function defined using
async def
:
async def hello(name):
return 'Hello ' + name
Coroutines call other coroutines using await
:
async def main(name):
s = await hello(name)
print(s)
Coroutines never run on their own.
They always execute under the supervision of a manager (e.g., an
event-loop, a kernel, etc.). In Curio, the initial coroutine is
executed using run()
:
import curio
curio.run(main, 'Guido')
When executing, a coroutine is encapsulated by a “Task.”
Basic Execution¶
The following function runs an initial coroutine:
-
run
(corofunc, *args, debug=None, selector=None, with_monitor=False, taskcls=Task)¶ Run corofunc and return its result. args are the arguments provided to corofunc. with_monitor enables the task monitor. selector is an optional selector from the
selectors
standard library. debug is a list of debugging features (see the section on debugging). taskcls is the class used to encapsulate coroutines. Ifrun()
is called when a task is already running, aRuntimeError
is raised.
If you are going to repeatedly execute coroutines one after the other, it
is more efficient to create a Kernel
instance and submit
them using the run()
method.
-
Kernel(selector=None, debug=None, taskcls=Task):
Create a runtime kernel. The arguments are the same as described above for
run()
.
There is only one method that may be used on a Kernel
instance.
-
Kernel.
run
(corofunc=None, *args, shutdown=False)¶ Run corofunc and return its result. args are the arguments given to corofunc. If shutdown is
True
, the kernel cancels all remaining tasks and performs a clean shutdown upon return. Calling this method with corofunc set toNone
executes a single scheduling cycle of background tasks before returning immediately. Raises aRuntimeError
if called on an already running kernel or if an attempt is made to run more than one kernel in the same thread.
A kernel is commonly used as a context manager. For example:
with Kernel() as kernel:
kernel.run(corofunc1)
kernel.run(corofunc2)
...
# Kernel shuts down here
When submitting work, you can either provide an async
function and arguments or you can provide an already instantiated
coroutine. Both of these run()
invocations work:
async def hello(name):
print('hello', name)
run(hello, 'Guido') # Preferred
run(hello('Guido')) # Ok
This convention is observed by nearly all other functions that accept coroutines (e.g., spawning tasks, waiting for timeouts, etc.).
Tasks¶
The following functions manage the execution of concurrent tasks.
-
await
spawn
(corofunc, *args, daemon=False)¶ Create a new task that concurrently executes the async function corofunc. args are the arguments provided to corofunc. Returns a
Task
instance as a result. The daemon option specifies that the task is never joined and that its result may be disregarded.
-
await
current_task
()¶ Returns the
Task
instance corresponding to the caller.
spawn()
and current_task()
return a Task
instance t
with the following methods and attributes:
|
Wait for the task to terminate and return its result.
Raises |
|
Waits for task to terminate, but returns no value. |
|
Cancels the task by raising a |
|
Creates a stack traceback string. Useful for debugging. |
|
Return (filename, lineno) where the task is executing. |
|
The task’s integer id. Monotonically increases. |
|
The coroutine associated with the task. |
|
Boolean flag that indicates whether or not a task is daemonic. |
|
The name of the task’s current state. Useful for debugging. |
|
The number of scheduling cycles the task has completed. |
|
A property holding the task result. If accessed before the task terminates,
a |
|
Exception raised by a task, if any. |
|
A boolean flag that indicates whether or not the task was cancelled. |
|
A boolean flag that indicates whether or not the task has terminated. |
Task Groups¶
Tasks may be grouped together to better manage their execution and
collect results. To do this, create a TaskGroup
instance.
-
class
TaskGroup
(tasks=(), *, wait=all)¶ A class representing a group of executing tasks. tasks is an optional set of existing tasks to put into the group. wait specifies the policy used by
join()
to wait for tasks. If wait isall
, then wait for all tasks to complete. If wait isany
then wait for any task to terminate and cancel any remaining tasks. If wait isobject
, then wait for any task to return a non-None object, cancelling all remaining tasks afterwards. If wait isNone
, then immediately cancel all running tasks. Task groups do not form a hierarchy or have any kind of relationship to other previously created task groups or tasks. Moreover, Tasks created by the top levelspawn()
function are not placed into any task group. To create a task in a group, it should be created usingTaskGroup.spawn()
or explicitly added usingTaskGroup.add_task()
.
The following methods and attributes are supported on a TaskGroup
instance g
:
|
Create a new task in the group. Returns a |
|
Add an already existing task to the group. |
|
Wait for and return the next completed task. Return |
|
Wait for and return the result of the next completed task. If the task failed with an exception, the exception is raised. |
|
Wait for all tasks in the group to terminate according to the wait policy
set for the group. If any of the monitored tasks exits with an exception or
if the |
|
Cancel and remove all remaining non-daemonic tasks from the group. |
|
The first task that completed with a valid result after calling |
|
The result of the first task that completed after calling |
|
Exception raised by the first task that completed (if any). |
|
A list of all results collected by |
|
A list of all exceptions collected by |
|
A list of all non-daemonic tasks managed by the group, ordered by task id.
Does not include tasks where |
The preferred way to use a TaskGroup
is as a context manager.
Here are a few common usage patterns:
# Spawn multiple tasks and collect all of their results
async with TaskGroup(wait=all) as g:
await g.spawn(coro1)
await g.spawn(coro2)
await g.spawn(coro3)
print('Results:', g.results)
# Spawn multiple tasks and collect the result of the first one
# that completes--cancelling other tasks
async with TaskGroup(wait=any) as g:
await g.spawn(coro1)
await g.spawn(coro2)
await g.spawn(coro3)
print('Result:', g.result)
# Spawn multiple tasks and collect their results as they complete
async with TaskGroup() as g:
await g.spawn(coro1)
await g.spawn(coro2)
await g.spawn(coro3)
async for task in g:
print(task, 'completed.', task.result)
In these examples, access to the result
or results
attribute
may raise an exception if a task failed for some reason.
If an exception is raised inside the task group context, all managed tasks are cancelled and the exception is propagated. For example:
try:
async with TaskGroup() as g:
t1 = await g.spawn(func1)
t2 = await g.spawn(func2)
t3 = await g.spawn(func3)
raise RuntimeError()
except RuntimeError:
# All launched tasks will have terminated or been cancelled here
assert t1.terminated
assert t2.terminated
assert t3.terminated
It is important to emphasize that no tasks placed in a task group survive past
the join()
operation or exit from a context manager. This includes
any daemonic tasks running in the background.
Time¶
Curio manages time with an internal monotonic clock. The following functions are provided:
-
await
sleep
(seconds)¶ Sleep for a specified number of seconds. If the number of seconds is 0, execution switches to the next ready task (if any). Returns the current clock value.
-
await
clock
()¶ Returns the current value of the monotonic clock. Use this to get a base clock value for the
wake_at()
function.
Timeouts¶
Any blocking operation can be cancelled by a timeout.
-
await
timeout_after
(seconds, corofunc=None, *args)¶ Execute
corofunc(*args)
and return its result. If no result is returned before seconds have elapsed, acurio.TaskTimeout
exception is raised on the current blocking operation. If corofunc isNone
, the function returns an asynchronous context manager that applies a timeout to a block of statements.
Every call to timeout_after()
must have a matching exception handler
to catch the resulting timeout. For example:
try:
result = await timeout_after(10, coro, arg1, arg2)
except TaskTimeout:
...
# Alternative (context-manager)
try:
async with timeout_after(10):
result = coro(arg1, arg2)
...
except TaskTimeout:
...
When timeout operations are nested, the resulting TaskTimeout
exception is paired to the matching timeout_after()
operation that
produced it. Consider this subtle example:
async def main():
try:
async with timeout_after(1): # Expires first
try:
async with timeout_after(5):
await sleep(1000)
except TaskTimeout: # (a) Does NOT match
print("Inner timeout")
except TaskTimeout: # (b) Matches!
print("Outer timeout")
run(main)
If you run this, you will see output of “Outer timeout” from the
exception handler at (b). This is because the outer timeout is the one
that expired. The exception handler at (a) does not match (at that
point, the exception being reported is
curio.TimeoutCancellationError
which indicates
that a timeout/cancellation has occurred somewhere, but that it is NOT
due to the inner-most timeout).
If a nested timeout_after()
is used without a matching except
clause, a timeout is reported as a
curio.UncaughtTimeoutError
exception. Remember that all
timeouts should have a matching exception handler.
If you don’t care about exception handling, you can also use the following functions:
-
await
ignore_after
(seconds, corofunc=None, *args, timeout_result=None)¶ Execute
corofunc(*args)
and return its result. If seconds elapse, the operation is cancelled with acurio.TaskTimeout
exception, but the exception is discarded and the value of timeout_result is returned. If corofunc isNone
, returns an asynchronous context manager that applies a timeout to a block of statements. For this case, the resulting context manager object has anexpired
attribute set toTrue
if time expired.
Here are some examples:
result = await ignore_after(5, coro, args)
if result is None:
# Timeout occurred (if you care)
...
# Execute multiple statements with a 5 second timeout
async with ignore_after(5) as s:
await coro1(args)
await coro2(args)
if s.expired:
# Timeout occurred
...
The ignore_after()
function is just a convenience layer to simplify exception
handling. All of the timeout-related functions can be composed and layered
together in any configuration and it should still work.
Cancellation Control¶
Sometimes it is necessary to disable or control cancellation on critical operations. The following functions can control this:
-
await
disable_cancellation
(corofunc=None, *args)¶ Disables the delivery of cancellation-related exceptions while executing corofunc. args are the arguments to corofunc. The result of corofunc is returned. Any pending cancellation is delivered to the first-blocking operation after cancellation is reenabled. If corofunc is
None
, a context manager is returned that shields a block of statements from cancellation.
-
await
check_cancellation
(exc=None)¶ Explicitly check if a cancellation is pending for the calling task. If cancellation is enabled, any pending exception is raised immediately. If cancellation is disabled, it returns the pending cancellation exception instance (if any) or
None
. Ifexc
is supplied and it matches the type of the pending exception, the exception is returned and any pending cancellation exception is cleared.
-
await
set_cancellation
(exc)¶ Set the pending cancellation exception for the calling task to
exc
. If cancellation is enabled, it will be raised immediately on the next blocking operation. Returns any previously set, but pending cancellation exception.
A common use of these functions is to more precisely control cancellation points. Here is an example that shows how to check for cancellation at a specific code location (a):
async def coro():
async with disable_cancellation():
while True:
await coro1()
await coro2()
if await check_cancellation(): # (a)
break # Bail out!
await check_cancellation() # Cancellation (if any) delivered here
If you only need to shield a single operation, you can write statements like this:
async def coro():
...
await disable_cancellation(some_operation, x, y, z)
...
Note: It is not possible for cancellation to be reenabled inside code where it has been disabled.
Synchronization Primitives¶
The following synchronization primitives are available. Their behavior
is identical to their equivalents in the threading
module. However, none
of these primitives are safe to use with threads.
-
class
Event
¶ An event object.
An Event
instance e
supports the following methods:
|
Return |
|
Clear the event value |
|
Wait for the event to be set |
|
Set the event. Wake all waiting tasks (if any) |
-
class
Result
¶ A synchronized result object. This is like an Event except that it additionally carries a value or exception.
An Result
instance r
supports the following methods:
|
Return |
|
Set the result value, waking any waiters. |
|
Set the result exception, waking any waiters. |
|
Wait and return the set value. If an exception was set, it is reraised. |
Lock
, RLock
, Semaphore
classes that allow for mutual exclusion and
inter-task coordination.
-
class
Lock
¶ A mutual exclusion lock.
-
class
RLock
¶ A recursive mutual-exclusion lock that can be acquired multiple times within the same task.
-
class
Semaphore
(value=1)¶ Semaphores are based on a counter.
acquire()
andrelease()
decrement and increment the counter respectively. If the counter is 0,acquire()
blocks until the value is incremented by another task. Thevalue
attribute of a semaphore is a read-only property holding the current value of the internal counter.
An instance lock
of any of the above classes supports the following methods:
|
Acquire the lock |
|
Release the lock. |
|
Return |
The preferred way to use a Lock is as an asynchronous context manager. For example:
import curio
lock = curio.Lock()
async def sometask():
async with lock:
print("Have the lock")
...
-
class
Condition
(lock=None)¶ Condition variable. lock is the underlying lock to use. If
None
, then aLock
object is used.
An instance cv
of Condition
supports the following methods:
|
Acquire the underlying lock |
|
Release the underlying lock. |
|
Return |
|
Wait on the condition variable. Releases the underlying lock. |
|
Wait on the condition variable until a supplied predicate function returns |
|
Notify one or more tasks, cause them to wake from |
|
Notify all waiting tasks. |
Proper use of a condition variable is tricky. The following example shows how to implement
producer-consumer synchronization on top of a collections.deque
object:
import curio
from collections import deque
async def consumer(items, cond):
while True:
async with cond:
while not items: # (a)
await cond.wait() # Wait for items
item = items.popleft()
print('Got', item)
async def producer(items, cond):
for n in range(10):
async with cond:
items.append(n)
await cond.notify()
await curio.sleep(1)
async def main():
items = deque()
cond = curio.Condition()
await curio.spawn(producer, items, cond)
await curio.spawn(consumer, items, cond)
curio.run(main())
In this code, it is critically important that the wait()
and
notify()
operations take place in a block where the condition
variable has been properly acquired. Also, the while
-loop at (a)
is not a typo. Condition variables are often used to “signal” that
some condition has become true, but it is standard practice to re-test
the condition before proceding (it might be the case that a
condition was only briefly transient and by the time a notified task
awakes, the condition no longer holds).
Queues¶
To communicate between tasks, use a Queue
.
-
class
Queue
(maxsize=0)¶ Creates a queue with a maximum number of elements in maxsize. If not specified, the queue can hold an unlimited number of items.
An instance q
of Queue
supports the following methods:
|
Return |
|
Return |
|
Return number of items currently in the queue. |
|
Return an item from the queue. Block if no items are available. |
|
Put an item on the queue. Blocks if the queue is at capacity. |
|
Wait for all elements to be processed. Consumers must call
|
|
Indicate that the processing has finished for an item. If all
items have been processed and there are tasks waiting on
|
Here is an example of using queues in a producer-consumer problem:
import curio
async def producer(queue):
for n in range(10):
await queue.put(n)
await queue.join()
print('Producer done')
async def consumer(queue):
while True:
item = await queue.get()
print('Consumer got', item)
await queue.task_done()
async def main():
q = curio.Queue()
prod_task = await curio.spawn(producer(q))
cons_task = await curio.spawn(consumer(q))
await prod_task.join()
await cons_task.cancel()
curio.run(main())
The following variants of the basic Queue
class are also provided:
-
class
PriorityQueue
(maxsize=0)¶ Creates a priority queue with a maximum number of elements in maxsize. The priority of items is determined by standard relational operators such as
<
and<=
. Lowest priority items are returned first.
-
class
LifoQueue
(maxsize=0)¶ A queue with “Last In First Out” retrieval policy. In other words, a stack.
Universal Synchronizaton¶
Sometimes it is necessary to synchronize Curio with threads and foreign event loops. For this, use the following queue and event classes.
-
class
UniversalQueue
(maxsize=0, withfd=False)¶ A queue that can be simultaneously used from Curio tasks, threads, and
asyncio
. The same programming API is used in all environments, butawait
is required for asynchronous operations. If used to coordinate Curio andasyncio
, they must be executing in separate threads. Thewithfd
option specifies whether or not the queue should optionally set up an I/O loopback that allows it to be polled by a foreign event loop. Whenwithfd
isTrue
, adding something to the queue writes a single byte of data to the I/O loopback. Removing an item withget()
reads this byte.
-
class
UniversalEvent
¶ An event object that can be used from Curio tasks, threads, and
asyncio
. The same programming interface is used in both. Asynchronous operations must be prefaced byawait
. If used to coordinate Curio andasyncio
, they must be executing in separate threads.
-
class
UniversalResult
¶ A result object that can be used from Curio tasks, threads, and
asyncio
. A result is somewhat similar to an event, but it additionally carries an attached value or exception. To set the result, useset_value()
orset_exception()
. To return the result, blocking if necessary, useunwrap()
. If used in an asynchronous environment, these operations must be prefaced byawait
. If used to coordinate Curio andasyncio
, they must be executing in separate threads. AUniversalResult()
is somewhat similar to aFuture
in usage, but it has a much more restrictive API.
Here is an example of a producer-consumer problem with a UniversalQueue
involving Curio, threads, and asyncio
all running at once:
from curio import run, UniversalQueue, spawn, run_in_thread
import time
import threading
import asyncio
# An async task
async def consumer(name, q):
print(f'{name} consumer starting')
while True:
item = await q.get()
if item is None:
break
print(f'{name} got: {item}')
await q.task_done()
print(f'{name} consumer done')
await q.put(None)
# A threaded producer
def producer(q):
for i in range(10):
q.put(i)
time.sleep(1)
q.join()
print('Producer done')
async def main():
q = UniversalQueue()
# A Curio consumer
t1 = await spawn(consumer('curio', q))
# An asyncio consumer
t2 = threading.Thread(target=asyncio.run, args=[consumer('asyncio', q)])
t2.start()
# A threaded producer
t3 = threading.Thread(target=producer, args=[q])
t3.start()
await run_in_thread(t3.join)
# Shutdown with a sentinel
await q.put(None)
await t1.join()
await run_in_thread(t2.join)
run(main())
In this code, the consumer()
coroutine is used simultaneously in
Curio and asyncio
. producer()
is an ordinary thread.
Here is example that has Curio wait for the result produced
by a thread using a UniversalResult
object:
import threading
import curio
import time
def worker(x, y, result):
time.sleep(10)
try:
result.set_value(x+y)
except Exception as err:
result.set_exception(err)
async def main():
result = curio.UniversalResult()
threading.Thread(target=worker, args=[2,3,result]).start()
print("Result:", await result.unwrap())
curio.run(main)
When in doubt, queues, events, and results are the preferred mechanism of coordinating Curio with foreign environments. Higher-level abstractions can often be built from these.
Blocking Operations and External Work¶
Sometimes you need to perform work that takes a long time to complete or otherwise blocks the progress of other tasks. This includes CPU-intensive calculations and blocking operations carried out by foreign libraries. Use the following functions to do that:
-
await
run_in_process
(callable, *args)¶ Run
callable(*args)
in a separate process and returns the result. If cancelled, the underlying worker process is immediately cancelled by aSIGTERM
signal. The given callable executes in an entirely independent Python interpreter and there is no shared global state. The separate process is launched using the “spawn” method of themultiprocessing
module.
-
await
run_in_thread
(callable, *args)¶ Run
callable(*args)
in a separate thread and return the result. If the calling task is cancelled, the underlying worker thread (if started) is set aside and sent a termination request. However, since there is no underlying mechanism to forcefully kill threads, the thread won’t recognize the termination request until it runs the requested work to completion. It’s important to note that a cancellation won’t block other tasks from using threads. Instead, cancellation produces a kind of “zombie thread” that executes the requested work, discards the result, and then disappears. For reliability, work submitted to threads should have a timeout or some other mechanism that puts a bound on execution time.
-
await
block_in_thread
(callable, *args)¶ The same as
run_in_thread()
, but guarantees that only one background thread is used for each unique callable regardless of how many tasks simultaneously try to carry out the same operation at once. Only use this function if there is an expectation that the provided callable is going to block for an undetermined amount of time and that there might be a large amount of contention from multiple tasks on the same resource. The primary use is on waiting operations involving foreign locks and queues. For example, if you launched a hundred Curio tasks and they all decided to block on a shared thread queue, using this would be much more efficient thanrun_in_thread()
.
-
await
run_in_executor
(exc, callable, *args)¶ Run
callable(*args)
callable in a user-supplied executor and returns the result. exc is an executor from theconcurrent.futures
module in the standard library. This executor is expected to implement asubmit()
method that executes the given callable and returns aFuture
instance for collecting its result.
When performing external work, it’s almost always better to use the
run_in_process()
and run_in_thread()
functions instead
of run_in_executor()
. These functions have no external library
dependencies, have less communication overhead, and more
predictable cancellation semantics.
The following values in curio.workers
define how many
worker threads and processes are used. If you are going to
change these values, do it before any tasks are executed.
-
MAX_WORKER_THREADS
¶ Specifies the maximum number of threads used by a single kernel using the
run_in_thread()
function. Default value is 64.
-
MAX_WORKER_PROCESSES
¶ Specifies the maximum number of processes used by a single kernel using the
run_in_process()
function. Default value is the number of CPUs on the host system.
I/O Classes¶
I/O in Curio is managed by a collection of classes in curio.io
.
These classes act as asynchronous proxies around sockets, streams, and
ordinary files. The programming interface is meant to be the same as
in normal synchronous Python code.
Socket¶
The Socket
class wraps an existing socket-like object with
an async interface.
-
class
Socket
(sockobj)¶ Creates a proxy around an existing socket sockobj. sockobj is put in non-blocking mode when wrapped. sockobj is not closed unless the created
Socket
instance is explicitly closed or used as a context manager.
The following methods are redefined on an instance s
of Socket
.
|
Receive up to maxbytes of data. |
|
Receive up to nbytes of data into a buffer. |
|
Receive up to maxbytes of data. Returns a tuple |
|
Receive up to nbytes of data into a buffer. |
|
Receive normal and ancillary data. |
|
Receive normal and ancillary data into a buffer. |
|
Send data. Returns the number of bytes sent. |
|
Send all of the data in data. If cancelled, the |
|
Send data to the specified address. |
|
Send data to the specified address (alternate). |
|
Send normal and ancillary data to the socket. |
|
Wait for a new connection. Returns a tuple |
|
Make a connection. |
|
Make a connection and return an error code instead of raising an exception. |
|
Close the connection. |
|
Shutdown the socket. how is one of
|
|
Perform an SSL client handshake (only on SSL sockets). |
|
Make a |
|
Wrap the socket as a stream using |
|
A context manager that returns the internal socket placed into blocking mode. |
Any socket method not listed here (e.g., s.setsockopt()
) will be
delegated directly to the underlying socket as an ordinary method.
Socket
objects may be used as an asynchronous context manager
which cause the underlying socket to be closed when done.
Streams¶
A stream is an asynchronous file-like object that wraps around an object that natively implements non-blocking I/O. Curio implements two basic classes:
-
class
FileStream
(fileobj)¶ Create a file-like wrapper around an existing file as might be created by the built-in
open()
function orsocket.makefile()
. fileobj must be in in binary mode and must support non-blocking I/O. The file is placed into non-blocking mode usingos.set_blocking(fileobj.fileno())
. fileobj is not closed unless the resulting instance is explicitly closed or used as a context manager. Not supported on Windows.
-
class
SocketStream
(sockobj)¶ Create a file-like wrapper around a socket. sockobj is an existing socket-like object. The socket is put into non-blocking mode. sockobj is not closed unless the resulting instance is explicitly closed or used as a context manager. Instantiated by
Socket.as_stream()
.
An instance s
of either stream class implement the following methods:
|
Read up to maxbytes of data on the file. If omitted, reads as much data as is currently available. |
|
Return all data up to EOF. |
|
Read exactly n bytes of data. |
|
Read a single line of data. |
|
Read all of the lines. If cancelled, the |
|
Write all of the data in bytes. |
|
Writes all of the lines in lines. If cancelled, the |
|
Flush any unwritten data from buffers. |
|
Flush any unwritten data and close the file. Not called on garbage collection. |
|
A context manager that temporarily places the stream into blocking mode and
returns the raw file object used internally. Note: for
|
Other methods (e.g., tell()
, seek()
, setsockopt()
, etc.) are available
if the underlying fileobj
or sockobj
provides them. A Stream
may be used as an asynchronous context manager.
Files¶
The curio.file
module provides an asynchronous compatible
replacement for the built-in open()
function and associated file
objects. Use this to read and write traditional files on the
filesystem while avoiding blocking. How this is accomplished is an
implementation detail (although threads are used in the initial
version).
-
aopen
(*args, **kwargs)¶ Creates a
curio.file.AsyncFile
wrapper around a traditional file object as returned by Python’s builtinopen()
function. The arguments are exactly the same as foropen()
. The returned file object must be used as an asynchronous context manager.
-
class
AsyncFile
(fileobj)¶ This class represents an asynchronous file as returned by the
aopen()
function. Normally, instances are created by theaopen()
function. However, it can be wrapped around an already-existing file object.
The following methods are redefined on AsyncFile
objects to be
compatible with coroutines. Any method not listed here will be
delegated directly to the underlying file. These methods take the same arguments
as the underlying file object. Be aware that not all of these methods are
available on all kinds of files (e.g., read1()
, readinto()
and similar
methods are only available in binary-mode files).
|
Read up to maxbytes of data on the file. If omitted, reads as much data as is currently available. |
|
Same as |
|
Read a line of input. |
|
Read all lines of input data |
|
Read data into a buffer. |
|
Read data into a buffer using a single system call. |
|
Read all available data up to EOF. |
|
Write data |
|
Write all lines. |
|
Truncate the file to a given size/position. If |
|
Seek to a new file position. |
|
Report current file pointer. |
|
Flush data to a file |
|
Flush remaining data and close. |
The preferred way to use an AsyncFile
object is as an asynchronous context manager.
For example:
async with aopen(filename) as f:
# Use the file
data = await f.read()
AsyncFile
objects may also be used with asynchronous iteration.
For example:
async with aopen(filename) as f:
async for line in f:
...
AsyncFile
objects are intentionally incompatible with code
that uses files in a synchronous manner. Partly, this is to help
avoid unintentional errors in your program where blocking might
occur without you realizing it. If you know what you’re doing and you
need to access the underlying file in synchronous code, use the
blocking() context manager like this:
async with aopen(filename) as f:
...
# Pass to synchronous code (danger: might block)
with f.blocking() as sync_f:
# Use synchronous I/O operations
data = sync_f.read()
...
At first glance, the API to streams and files might look identical. The
difference concerns internal implementation. A stream works natively
with non-blocking I/O. An AsyncFile
uses a combination of threads and
synchronous calls to provide an async-compatible API. Given a choice,
you should use streams. However, some systems don’t provide non-blocking
implementations of certain system calls. In those cases, an AsyncFile
is a
fallback.
Networking¶
Curio provides a number of submodules for different kinds of network programming.
High Level Networking¶
The following functions are use to make network connections and implement socket-based servers.
-
await
open_connection
(host, port, *, ssl=None, source_addr=None, server_hostname=None, alpn_protocols=None)¶ Creates an outgoing connection to a server at host and port. This connection is made using the
socket.create_connection()
function and might be IPv4 or IPv6 depending on the network configuration (although you’re not supposed to worry about it). ssl specifies whether or not SSL should be used. ssl can beTrue
or an instance ofcurio.ssl.SSLContext
. source_addr specifies the source address to use on the socket. server_hostname specifies the hostname to check against when making SSL connections. It is highly advised that this be supplied to avoid man-in-the-middle attacks. alpn_protocols specifies a list of protocol names for use with the TLS ALPN extension (RFC7301). A typical value might be['h2', 'http/1.1']
for negotiating either a HTTP/2 or HTTP/1.1 connection.
-
await
open_unix_connection
(path, *, ssl=None, server_hostname=None, alpn_protocols=None)¶ Creates a connection to a Unix domain socket with optional SSL applied.
-
await
tcp_server
(host, port, client_connected_task, *, family=AF_INET, backlog=100, ssl=None, reuse_address=True, reuse_port=False)¶ Runs a server for receiving TCP connections on a given host and port. client_connected_task is a coroutine that is to be called to handle each connection. Family specifies the address family and is either
socket.AF_INET
orsocket.AF_INET6
. backlog is the argument to thesocket.socket.listen()
method. ssl specifies ancurio.ssl.SSLContext
instance to use. reuse_address specifies whether to use theSO_REUSEADDR
socket option. reuse_port specifies whether to use theSO_REUSEPORT
socket option.
-
await
unix_server
(path, client_connected_task, *, backlog=100, ssl=None)¶ Runs a Unix domain server on a given path. client_connected_task is a coroutine to execute on each connection. backlog is the argument given to the
socket.socket.listen()
method. ssl is an optionalcurio.ssl.SSLContext
to use if setting up an SSL connection.
-
await
run_server
(sock, client_connected_task, ssl=None)¶ Runs a server on a given socket. sock is a socket already configured to receive incoming connections. client_connected_task and ssl have the same meaning as for the
tcp_server()
andunix_server()
functions. If you need to perform some kind of special socket setup, not possible with the normaltcp_server()
function, you can create the underlying socket yourself and then call this function to run a server on it.
-
tcp_server_socket
(host, port, family=AF_INET, backlog=100, reuse_address=True, reuse_port=False)¶ Creates and returns a TCP socket. Arguments are the same as for the
tcp_server()
function. The socket is suitable for use with other async operations as well as therun_server()
function.
-
unix_server_socket
(path, backlog=100)¶ Creates and returns a Unix socket. Arguments are the same as for the
unix_server()
function. The socket is suitable for use with other async operations as well as therun_server()
function.
Message Passing and Channels¶
Curio provides a Channel
class that can be used to perform message
passing between interpreters running in separate processes. Message passing
uses the same protocol as the multiprocessing
standard library.
-
class
Channel
(address, family=socket.AF_INET)¶ Represents a communications endpoint for message passing. address is the address and family is the protocol family.
The following methods are used to establish a connection on a Channel
instance ch
.
|
Wait for an incoming connection and return a
|
|
Make an outgoing connection and return a |
|
Performs the address binding step of the |
|
Close the channel. |
The connect()
and accept()
methods of Channel
instances return an
instance of the Connection
class:
-
class
Connection
(reader, writer)¶ Represents a connection on which message passing of Python objects is supported. reader and writer are I/O streams on which reading and writing are to take place (for example, instances of
SocketStream
orFileStream
).
An instance c
of Connection
supports the following methods:
|
Close the connection. |
|
Receive a Python object. |
|
Receive a raw message of bytes. |
|
Send a Python object. |
|
Send a buffer of bytes as a single message. offset and size specify an optional byte offset and size into the underlying memory buffer. |
|
Authenticate server endpoint. |
|
Authenticate client endpoint. |
A Connection
instance may also be used as a context manager.
Here is an example of a producer program using channels:
# producer.py
from curio import Channel, run
async def producer(ch):
c = await ch.accept(authkey=b'peekaboo')
for i in range(10):
await c.send(i)
await c.send(None) # Sentinel
if __name__ == '__main__':
ch = Channel(('localhost', 30000))
run(producer(ch))
Here is an example of a corresponding consumer program using a channel:
# consumer.py
from curio import Channel, run
async def consumer(ch):
c = await ch.connect(authkey=b'peekaboo')
while True:
msg = await c.recv()
if msg is None:
break
print('Got:', msg)
if __name__ == '__main__':
ch = Channel(('localhost', 30000))
run(consumer(ch))
socket module¶
The curio.socket
module provides a wrapper around selected functions in the built-in
socket
module–allowing it to be used as a stand-in in
Curio-related code. The module provides exactly the same
functionality except that certain operations have been replaced by
asynchronous equivalents.
-
socket
(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)¶ Creates a
curio.io.Socket
wrapper the aroundsocket
objects created in the built-insocket
module. The arguments for construction are identical and have the same meaning. The resultingsocket
instance is set in non-blocking mode.
The following module-level functions have been modified so that the returned socket objects are compatible with Curio:
-
socketpair
(family=AF_UNIX, type=SOCK_STREAM, proto=0)¶
-
fromfd
(fd, family, type, proto=0)¶
-
create_connection
(address, source_address)¶
The following module-level functions have been redefined as coroutines so that they don’t block the kernel when interacting with DNS. This is accomplished through the use of threads.
-
await
getaddrinfo
(host, port, family=0, type=0, proto=0, flags=0)¶
-
await
getfqdn
(name)¶
-
await
gethostbyname
(hostname)¶
-
await
gethostbyname_ex
(hostname)¶
-
await
gethostname
()¶
-
await
gethostbyaddr
(ip_address)¶
-
await
getnameinfo
(sockaddr, flags)¶
ssl module¶
The curio.ssl
module provides Curio-compatible functions for creating an SSL
wrapped Curio socket. The following functions are redefined (and have the same
calling signature as their counterparts in the standard ssl
module:
-
await
wrap_socket
(*args, **kwargs)¶
-
await
get_server_certificate
(*args, **kwargs)¶
-
create_default_context
(*args, **kwargs)¶
-
class
SSLContext
¶ A redefined and modified variant of
ssl.SSLContext
so that thewrap_socket()
method returns a socket compatible with Curio.
Don’t attempt to use the curio.ssl
module without a careful read of Python’s official documentation
at https://docs.python.org/3/library/ssl.html.
It is usually easier to apply SSL to a connection using the high level network functions previously described. For example, here’s how you make an outgoing SSL connection:
sock = await curio.open_connection('www.python.org', 443,
ssl=True,
server_hostname='www.python.org')
Here’s how you create a server that uses SSL:
import curio
from curio import ssl
KEYFILE = "privkey_rsa" # Private key
CERTFILE = "certificate.crt" # Server certificat
async def handler(client, addr):
...
if __name__ == '__main__':
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
curio.run(curio.tcp_server('', 10000, handler, ssl=ssl_context))
Subprocesses¶
The curio.subprocess
module implements the same functionality as the built-in
subprocess
module.
-
class
Popen
(*args, **kwargs)¶ A wrapper around the
subprocess.Popen
class. The same arguments are accepted. On the resultingPopen
instance, thestdin
,stdout
, andstderr
file attributes have been wrapped by thecurio.io.FileStream
class. You can use these in an asynchronous context.
The following methods of Popen
have been replaced by asynchronous equivalents:
-
await
Popen.
wait
()¶ Wait for a subprocess to exit. Cancellation does not terminate the process.
-
await
Popen.
communicate
(input=b'')¶ Communicate with the subprocess, sending the specified input on standard input. Returns a tuple
(stdout, stderr)
with the resulting output of standard output and standard error. If cancelled, the resulting exception hasstdout
andstderr
attributes that contain the output read prior to cancellation. Cancellation does not terminate the underlying subprocess.
The following functions are also available. They accept the same arguments as their
equivalents in the subprocess
module:
-
await
run
(args, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False)¶ Run a command in a subprocess. Returns a
subprocess.CompletedProcess
instance. If cancelled, the underlying process is terminated using the processkill()
method. The resulting exception will havestdout
andstderr
attributes containing output read prior to cancellation.
-
await
check_output
(args, stdout=None, stderr=None, shell=False)¶ Run a command in a subprocess and return the resulting output. Raises a
subprocess.CalledProcessError
exception if an error occurred. The behavior on cancellation is the same as forrun()
.
Here is an example of using Popen
to read streaming output off of a
subprocess with Curio:
import curio
from curio import subprocess
async def main():
p = subprocess.Popen(['ping', 'www.python.org'], stdout=subprocess.PIPE)
async for line in p.stdout:
print('Got:', line.decode('ascii'), end='')
if __name__ == '__main__':
kernel = curio.Kernel()
kernel.add_task(main())
kernel.run()
Asynchronous Threads¶
If you need to perform a lot of synchronous operations, but still
interact with Curio, you can launch an async-thread.
An asynchronous thread flips the whole world around–instead
of executing selected synchronous operations using run_in_thread()
, you
run everything in a thread and perform selected async operations using the
AWAIT()
function.
To create an asynchronous thread, use spawn_thread()
:
-
await
spawn_thread
(func, *args, daemon=False)¶ Launch an asynchronous thread that runs the callable
func(*args)
.daemon
specifies if the thread runs in daemonic mode. Returns anAsyncThread
instance.
An instance t
of AsyncThread
supports the following methods.
|
Waits for the thread to terminate, returning the final result.
The final result is returned in the same manner as |
|
Waits for the thread to terminate, but does not return any result. |
|
Cancels the asynchronous thread. The behavior is the same as with |
|
The final result of the thread. If the thread crashed with an exception, that exception is reraised on access. |
|
The final exception (if any) |
|
Thread ID. A monotonically increasing integer. |
|
|
|
|
Within a thread, the following function is used to execute any coroutine.
-
AWAIT
(coro)¶ Execute a coroutine on behalf of an asynchronous thread. The requested coroutine executes in Curio’s main execution thread. The caller is blocked until it completes. If used outside of an asynchronous thread, an
AsyncOnlyError
exception is raised. Ifcoro
is not a coroutine, it is returned unmodified. The reasonAWAIT
is all-caps is to make it more easily heard when there are all of these coders yelling at you to just use pure async code instead of launching a thread. Also,await
is a reserved keyword in Python 3.7.
Here is an example of an asynchronous thread reading off a Curio queue:
from curio import run, Queue, sleep, CancelledError
from curio.thread import spawn_thread, AWAIT
def consumer(queue):
try:
while True:
item = AWAIT(queue.get())
print('Got:', item)
AWAIT(queue.task_done())
except CancelledError:
print('Consumer goodbye!')
raise
async def main():
q = Queue()
t = await spawn_thread(consumer, q)
for i in range(10):
await q.put(i)
await sleep(1)
await q.join()
await t.cancel()
run(main())
Asynchronous threads can perform any combination of blocking operations
including those that might involve normal thread-related primitives such
as locks and queues. These operations will block the thread itself, but
will not block the Curio kernel loop. In a sense, this is the whole
point–if you run things in an async threads, the rest of Curio is
protected. Asynchronous threads can be cancelled in the same manner
as normal Curio tasks. However, the same rules apply–an asynchronous
thread can only be cancelled on blocking operations involving AWAIT()
.
Scheduler Activations¶
Every task in Curio goes through a life-cycle of creation, running,
suspension, and termination. These steps are managed by an internal
scheduler. A scheduler activation is a mechanism for monitoring these
steps. To do this, you define a class that inherits from
Activation
in the submodule curio.activation
.
-
class
Activation
¶ Base class for defining scheduler activations.
An instance a
of Activation
implements the following methods:
|
Executed once upon initialization of the Curio kernel. kernel is
a reference to the |
|
Called when a new task is created. task is the newly created |
|
Called immediately prior to the execution cycle of a task. |
|
Called when a task has suspended execution. trap is the trap executed. |
|
Called when a task has terminated execution. Note: the
|
Activations are used to implement debugging and diagnostic tools. As an example, here is a scheduler activation that monitors for long-execution times and reports warnings:
from curio.activation import Activation
import time
class LongBlock(Activation):
def __init__(self, maxtime):
self.maxtime = maxtime
def running(self, task):
self.start = time.time()
def suspended(self, task, trap):
end = time.time()
if end - self.start > self.maxtime:
print(f'Long blocking in {task.name}: {end - self.start}')
Scheduler activations are registered when a Kernel
is created or with the
top-level run()
function:
kern = Kernel(activations=[LongBlock(0.05)])
with kern:
kern.run(coro)
# Alternative
run(coro, activations=[LongBlock(0.05)])
Asynchronous Metaprogramming¶
The curio.meta
module provides some functions that might be useful if
implementing more complex programs and APIs involving coroutines.
-
curio_running():
Return
True
if Curio is running in the current thread.
-
iscoroutinefunction
(func)¶ True
True
if the supplied func is a coroutine function or is known to resolve into a coroutine. Unlike a similar function ininspect
, this function knows aboutfunctools.partial()
, awaitable objects, and async generators.
-
instantiate_coroutine
(corofunc, *args, **kwargs)¶ Instantiate a coroutine from corofunc. If corofunc is already a coroutine object, it is returned unmodified. If it’s a coroutine function, it’s executed within an async context using the given arguments. If it’s not a coroutine, corofunc is called with the given arguments with the expectation that whatever is returned will be a coroutine instance.
-
from_coroutine
(level=2)¶ Returns
True
if the caller is calling function is being invoked from inside a coroutine or not. This is primarily of use when writing decorators and other advanced metaprogramming features. The implementation requires stack-frame inspection. The level argument controls the stack frame in which information is obtained and might need to be adjusted depending on the nature of code calling this function.
-
awaitable
(syncfunc)¶ A decorator that allows an asynchronous implementation of a function to be attached to an existing synchronous function. If the resulting function is called from synchronous code, the synchronous function is used. If the function is called from asynchronous code, the asynchronous function is used.
Here is an example that illustrates:
import curio
from curio.meta import awaitable
def spam(x, y):
print('Synchronous ->', x, y)
@awaitable(spam)
async def spam(x, y):
print('Asynchronous ->', x, y)
async def main():
await spam(2, 3) # Calls asynchronous spam()
if __name__ == '__main__':
spam(2, 3) # Calls synchronous spam()
curio.run(main())
Exceptions¶
The following exceptions are defined. All are subclasses of the
CurioError
base class.
|
Base class for all Curio-specific exceptions. |
|
Base class for all cancellation-related exceptions. |
|
Exception raised in a coroutine if it has been cancelled using the |
|
Exception raised in a coroutine if it has been cancelled by timeout.
A subclass of |
|
Exception raised in a coroutine if it has been cancelled due to a timeout,
but not one related to the inner-most timeout operation. A subclass
of |
|
Exception raised if a timeout from an inner timeout operation has
propagated to an outer timeout, indicating the lack of a proper
try-except block. A subclass of |
|
Exception raised by the |
|
Exception raised if a task attempts to perform a synchronous I/O operation on an object that only supports asynchronous I/O. |
|
Exception raised by the |
|
Exception raised in an I/O operation is requested on a resource, but the
resource is already busy performing the same operation on behalf of another task.
The exceptions |
Low-level Traps and Scheduling¶
The following system calls are available in curio.traps
, but not typically used
directly in user code. They are used to implement higher level
objects such as locks, socket wrappers, and so forth. If you find
yourself using these, you’re probably doing something wrong–or
implementing a new Curio primitive.
Unless otherwise indicated, all traps are potentially blocking and may raise a cancellation exception.
|
Sleep until data is available for reading on fileobj. fileobj is any file-like object with a fileno() method. |
|
Sleep until data can be written on fileobj. fileobj is any file-like object with a fileno() method. |
|
Release any kernel resources associated with fileobj. Should be called prior to closing any file. |
|
Returns a tuple (rtask, wtask) of tasks currently sleeping on fileobj (if any). Returns immediately. |
|
Sleep until a result is set on fut. fut
is an instance of |
|
Cancel task. Returns immediately. exc and val specify the exception type and value. |
|
Go to sleep on a kernel scheduler primitive. sched is an
instance of |
|
Reschedule one or more tasks from a kernel scheduler primitive. n is the number of tasks to release. value and exc specify the return value or exception to raise in the task when it resumes execution. Returns immediately. |
|
Get a reference to the running |
|
Get a reference to the currently running |
|
Sleep for a given number of seconds. |
|
Set a timeout in the currently running task. Returns immediately with the previous timeout (if any) |
|
Unset a timeout in the currently running task. previous is the value returned by the _set_timeout() call used to set the timeout. Returns immediately. |
|
Immediately returns the current monotonic clock value. |
Again, you’re unlikely to use any of these functions directly.
However, here’s a small taste of how they get used. For example, the
curio.io.Socket.recv()
method looks roughly like this:
class Socket:
...
def recv(self, maxbytes):
while True:
try:
return self._socket.recv(maxbytes)
except BlockingIOError:
await _read_wait(self._socket)
...
This method first tries to receive data. If none is available, the
_read_wait()
call is used to put the task to sleep until reading
can be performed. When it awakes, the receive operation is
retried. Just to emphasize, the _read_wait()
doesn’t actually
perform any I/O. It’s just scheduling a task for it.
The _scheduler_wait()
and _scheduler_wake()
traps are used to
implement high-level synchronization and queuing primitives. The
sched
argument to these calls is an instance of a class that
inherits from SchedBase
defined in the curio.sched
submodule.
The following specific classes are defined:
-
class
SchedFIFO
¶ A scheduling FIFO queue. Used to implement locks and queues.
-
class
SchedBarrier
¶ A scheduling barrier. Used to implement events.
The following public methods are defined on an instance s
of these classes:
|
Suspend the calling task. |
|
Wake one or more suspended tasks. |
|
Number of tasks suspended. |
Here is an example of how a scheduler primitive is used to implement an Event
:
from curio.sched import SchedBarrier
class Event:
def __init__(self):
self._value = 0
self._sched = SchedBarrier()
async def wait(self):
if self._value == 0:
await self._sched.suspend('EVENT_WAIT')
async def set(self):
self._value = 1
await self._sched.wake(len(self._sched))
Debugging and Diagnostics¶
Curio provides a few facilities for basic debugging and diagnostics. If you
print a Task
instance, it will tell you the name of the associated
coroutine along with the current file/linenumber of where the task is currently
executing. The output might look similar to this:
Task(id=3, name='child', state='TIME_SLEEP') at filename.py:9
You can additionally use the Task.traceback()
method to create a current
stack traceback of any given task. For example:
t = await spawn(coro)
...
print(t.traceback())
Instead of a full traceback, you can also get the current filename and line number:
filename, lineno = await t.where()
To find out more detailed information about what the kernel is doing, you can
supply one or more debugging modules to the run()
function. To trace
all task scheduling events, use the schedtrace
debugger as follows:
from curio.debug import schedtrace
run(coro, debug=schedtrace)
To additionally include information on low-level kernel traps, use the traptrace
debugger
instead:
from curio.debug import traptrace
run(coro, debug=traptrace)
To report all exceptions from crashed tasks, use the logcrash
debugger:
from curio.debug import logcrash
run(coro, debug=logcrash)
To report warnings about long-running tasks that appear to be stalling the
event loop, use the longblock
debugger:
from curio.debug import longblock
run(coro, debug=longblock(max_time=0.1))
The different debuggers may be combined together if you provide a list. For example:
run(coro, debug=[schedtrace, traptrace, logcrash])
The amount of output produced by the different debugging modules might be considerable. You
can filter it to a specific set of coroutine names using the filter
keyword argument.
For example:
async def spam():
...
async def coro():
t = await spawn(spam)
...
run(coro, debug=schedtrace(filter={'spam'}))
The logging level used by the different debuggers can be changed using the
level
keyword argument:
run(coro, debug=schedtrace(level=logging.DEBUG))
A different Logger
instance can be used using the log
keyword argument:
import logging
run(coro, debug=schedtrace(log=logging.getLogger('spam')))
Be aware that all diagnostic logging is synchronous. As such, all
logging operations might temporarily block the event loop–especially
if logging output involves file I/O or network operations. If this is
a concern, you should take steps to mitigate it in the configuration
of logging. For example, you might use the QueueHandler
and
QueueListener
objects from the logging
module to offload log
handling to a separate thread.