Curio Reference Manual¶
This manual describes the basic concepts and functionality provided by curio.
Coroutines¶
Curio is solely concerned with the execution of coroutines. A coroutine
is a function defined using async def
. For example:
async def hello(name):
return 'Hello ' + name
Coroutines call other coroutines using await
. For example:
async def main():
s = await hello('Guido')
print(s)
Unlike a normal function, a coroutine can never run all on its own.
It always has to execute under the supervision of a manager (e.g., an
event-loop, a kernel, etc.). In curio, an initial coroutine is
executed by a low-level kernel using the run()
function. For
example:
import curio
curio.run(main())
When executed by curio, a coroutine is considered to be a “Task.” Whenever the word “task” is used, it refers to the execution of a coroutine.
The Kernel¶
All coroutines in curio are executed by an underlying kernel. Normally, you would run a top-level coroutine using the following function:
-
run(coro, *, log_errors=True, selector=None,
-
with_monitor=False,
-
warn_if_task_blocks_for=None, **other_kernel_args)
Run the coroutine coro to completion and return its final return value. If log_errors is
True
, a traceback is written to the log on crash. If with_monitor isTrue
, then the monitor debugging task executes in the background. If selector is given, it should be an instance of a selector from theselectors
module. If warn_if_task_blocks_for is given, then any tasks which blocks the event loop for the given number of seconds will trigger acurio.BlockingTaskWarning
.
If you are going to repeatedly run coroutines one after the other, it
will be more efficient to create a Kernel
instance and submit
them using its run()
method as described below:
-
class
Kernel
(log_errors=True, selector=None, with_monitor=False)¶ Create an instance of a curio kernel. The arguments are the same as described above for the
run()
function.
There is only one method that may be used on a Kernel
outside of coroutines.
-
Kernel.
run
(coro=None, *, shutdown=False)¶ Runs the kernel until all non-daemonic tasks have finished execution. coro is a coroutine to run as a task. If shutdown is
True
, the kernel will cancel all daemonic tasks and perform a clean shutdown once all regular tasks have completed. Calling this method with no coroutine and shutdown set toTrue
will make the kernel cancel all remaining tasks and perform a clean shut down.
If submitting multiple tasks, one after another, from synchronous code, consider using a kernel as a context manager. For example:
with Kernel() as kernel:
kernel.run(coro1())
kernel.run(coro2())
...
# Kernel shuts down here
Tasks¶
The following functions are defined to help manage the execution of tasks.
-
await
spawn
(coro, daemon=False)¶ Create a new task that runs the coroutine coro. Returns a
Task
instance as a result. The daemon option, if supplied, specifies that the new task will run indefinitely in the background. Curio only runs as long as there are non-daemonic tasks to execute. Note: a daemonic task will still be cancelled if the underlying kernel is shut down.
-
await
current_task
()¶ Returns a reference to the
Task
instance corresponding to the caller. A coroutine can use this to get a self-reference to its currentTask
instance if needed.
The spawn()
and current_task()
both return a Task
instance
that serves as a kind of wrapper around the underlying coroutine that’s executing.
-
class
Task
¶ A class representing an executing coroutine. This class cannot be created directly.
-
await
Task.
join
()¶ Wait for the task to terminate. Returns the value returned by the task or raises a
curio.TaskError
exception if the task failed with an exception. This is a chained exception. The__cause__
attribute of this exception contains the actual exception raised by the task when it crashed. If called on a task that has been cancelled, the__cause__
attribute is set tocurio.TaskCancelled
.
-
await
Task.
cancel
(blocking=True)¶ Cancels the task. This raises a
curio.TaskCancelled
exception in the task which may choose to handle it in order to perform cleanup actions. Ifblocking=True
(the default), does not return until the task actually terminates. Curio only allows a task to be cancelled once. If this method is somehow invoked more than once on a still running task, the second request will merely wait until the task is cancelled from the first request. If the task has already run to completion, this method does nothing and returns immediately. ReturnsTrue
if the task was actually cancelled.False
is returned if the task was already finished prior to the cancellation request. Cancelling a task also cancels any previously set timeout.
The following public attributes are available of Task
instances:
-
Task.
id
¶ The task’s integer id.
-
Task.
coro
¶ The underlying coroutine associated with the task.
-
Task.
daemon
¶ Boolean flag that indicates whether or not a task is daemonic.
-
Task.
state
¶ The name of the task’s current state. Printing it can be potentially useful for debugging.
-
Task.
cycles
¶ The number of scheduling cycles the task has completed. This might be useful if you’re trying to figure out if a task is running or not. Or if you’re trying to monitor a task’s progress.
-
Task.
exc_info
¶ A tuple of exception information obtained from
sys.exc_info()
if the task crashes for some reason. Potentially useful for debugging.
-
Task.
cancelled
¶ A boolean flag that indicates whether or not the task was cancelled.
-
Task.
terminated
¶ A boolean flag that indicates whether or not the task has run to completion.
Task local storage¶
Curio supports “task local storage”. The API is modeled after the
“thread local storage” provided by threading.local
.
-
class
Local
¶ A class representing a bundle of task-local values. Objects of this class have no particular attributes or methods. Instead, they serve as a blank slate to which you can add whatever attributes you like. Modifications made from within one task will only be visible to that task – with one exception: when you create a new task using
curio.spawn
, then any values assigned toLocal
objects in the parent task will be inherited by the child. This inheritance takes the form of a shallow copy – further changes in the parent won’t affect the child, and further changes in the child won’t affect the parent.
Time¶
The following functions are used by tasks to help manage time.
-
await
sleep
(seconds)¶ Sleep for a specified number of seconds. If the number of seconds is 0, the kernel merely switches to the next task (if any).
-
await
wake_at
(clock)¶ Sleep until the monotonic clock reaches the given absolute clock value. Returns the value of the monotonic clock at the time the task awakes. Use this function if you need to have more precise interval timing.
-
await
clock
()¶ Returns the current value of the kernel clock. This is often used in conjunction with the
wake_at()
function (you’d use this to get an initial clock value for passing an argument).
Timeouts¶
Any blocking operation in curio can be cancelled after a timeout. The following functions can be used for this purpose:
-
await
timeout_after
(seconds, coro=None)¶ Execute the specified coroutine and return its result. However, issue a cancellation request to the calling task after seconds have elapsed. When this happens, a
curio.TaskTimeout
exception is raised. If coro isNone
, the result of this function serves as an asynchronous context manager that applies a timeout to a block of statements.timeout_after()
may be composed with othertimeout_after()
operations (i.e., nested timeouts). If an outer timeout expires first, thencurio.TimeoutCancellationError
is raised instead ofcurio.TaskTimeout
. If an inner timeout expires and fails to properly catchcurio.TaskTimeout
, acurio.UncaughtTimeoutError
is raised in the outer timeout.
-
await
ignore_after
(seconds, coro=None, *, timeout_result=None)¶ Execute the specified coroutine and return its result. Issue a cancellation request after seconds have elapsed. When a timeout occurs, no exception is raised. Instead,
None
or the value of timeout_result is returned. If coro isNone
, the result is an asynchronous context manager that applies a timeout to a block of statements. For the context manager case,result
attribute of the manager is set toNone
or the value of timeout_result if the block was cancelled.Note:
ignore_after()
may also be composed with other timeout operations.curio.TimeoutCancellationError
andcurio.UncaughtTimeoutError
exceptions might be raised according to the same rules as fortimeout_after()
.
Here is an example that shows how these functions can be used:
# Execute coro(args) with a 5 second timeout
try:
result = await timeout_after(5, coro(args))
except TaskTimeout as e:
result = None
# Execute multiple statements with a 5 second timeout
try:
async with timeout_after(5):
await coro1(args)
await coro2(args)
...
except TaskTimeout as e:
# Handle the timeout
...
The difference between timeout_after()
and ignore_after()
concerns
the exception handling behavior when time expires. The latter function
returns None
instead of raising an exception which might be more
convenient in certain cases. For example:
result = await ignore_after(5, coro(args))
if result is None:
# Timeout occurred (if you care)
...
# Execute multiple statements with a 5 second timeout
async with ignore_after(5) as s:
await coro1(args)
await coro2(args)
...
s.result = successful_result
if s.result is None:
# Timeout occurred
It’s important to note that every curio operation can be cancelled by timeout.
Rather than having every possible call take an explicit timeout argument,
you should wrap the call using timeout_after()
or ignore_after()
as
appropriate.
Cancellation Control¶
-
disable_cancellation
(coro=None)¶ Disables the delivery of cancellation-related exceptions to the calling task. Cancellations will be delivered to the first blocking operation that’s performed once cancellation delivery is reenabled. This function may be used to shield a single coroutine or used as a context manager (see example below).
-
enable_cancellation
(coro=None)¶ Reenables the delivery of cancellation-related exceptions. This function is used as a context manager. It may only be used inside a context in which cancellation has been disabled. This function may be used to shield a single coroutine or used as a context manager (see example below).
-
await
check_cancellation
()¶ Checks to see if any cancellation is pending for the calling task. If cancellation is allowed, a cancellation exception is raised immediately. If cancellation is not allowed, it returns the pending cancellation exception instance (if any). Returns
None
if no cancellation is pending.
Use of these functions is highly specialized and is probably best avoided. Here is an example that shows typical usage:
async def coro():
async with disable_cancellation():
while True:
await coro1()
await coro2()
async with enable_cancellation():
await coro3() # May be cancelled
await coro4() # May be cancelled
if await check_cancellation():
break # Bail out!
await blocking_op() # Cancellation (if any) delivered here
If you only need to shield a single operation, you can write statements like this:
async def coro():
...
await disabled_cancellation(some_operation())
...
This is shorthand for writing the following:
async def coro():
...
async with disable_cancellation():
await some_operation()
...
See the section on cancellation in the Curio Developer’s Guide for more detailed information.
Performing External Work¶
Sometimes you need to perform work outside the kernel. This includes CPU-intensive calculations and blocking operations. Use the following functions to do that:
-
await
curio.workers.
run_in_process
(callable, *args, **kwargs)¶ Run
callable(*args, **kwargs)
in a separate process and returns the result. If cancelled, the underlying worker process (if started) is immediately cancelled by aSIGTERM
signal.
-
await
curio.workers.
run_in_thread
(callable, *args, **kwargs)¶ Run
callable(*args, **kwargs)
in a separate thread and return the result. If the calling task is cancelled, the underlying worker thread (if started) is set aside and sent a termination request. However, since there is no underlying mechanism to forcefully kill threads, the thread won’t recognize the termination request until it runs the requested work to completion. It’s important to note that a cancellation won’t block other tasks from using threads. Instead, cancellation produces a kind of “zombie thread” that executes the requested work, discards the result, and then disappears. For reliability, work submitted to threads should have a timeout or some other mechanism that puts a bound on execution time.
-
await
curio.workers.
block_in_thread
(callable, *args, **kwargs)¶ The same as
run_in_thread()
, but guarantees that only one background thread is used for each unique callable regardless of how many tasks simultaneously try to carry out the same operation at once. Only use this function if there is an expectation that the provided callable is going to block for an undetermined amount of time and that there might be a large amount of contention from multiple tasks on the same resource. The primary use is on waiting operations involving foreign locks and queues. For example, if you launched a hundred Curio tasks and they all decided to block on a shared thread queue, using this would be much more efficient thanrun_in_thread()
.
-
await
curio.workers.
run_in_executor
(exc, callable, *args, **kwargs)¶ Run
callable(*args, **kwargs)
callable in a user-supplied executor and returns the result. exc is an executor from theconcurrent.futures
module in the standard library. This executor is expected to implement asubmit()
method that executes the given callable and returns aFuture
instance for collecting its result.
When performing external work, it’s almost always better to use the
run_in_process()
and run_in_thread()
functions instead
of run_in_executor()
. These functions have no external library
dependencies, have substantially less communication overhead, and more
predictable cancellation semantics.
The following values in curio.workers
define how many
worker threads and processes are used. If you are going to
change these values, do it before any tasks are executed.
-
curio.workers.
MAX_WORKER_THREADS
¶ Specifies the maximum number of threads used by a single kernel using the
run_in_thread()
function. Default value is 64.
-
curio.workers.
MAX_WORKER_PROCESSES
¶ Specifies the maximum number of processes used by a single kernel using the
run_in_process()
function. Default value is the number of CPUs on the host system.
I/O Layer¶
I/O in curio is performed by classes in curio.io
that
wrap around existing sockets and streams. These classes manage the
blocking behavior and delegate their methods to an existing socket or
file.
Socket¶
The Socket
class is used to wrap existing an socket. It is compatible with
sockets from the built-in socket
module as well as SSL-wrapped sockets created
by functions by the built-in ssl
module. Sockets in curio should be fully
compatible most common socket features.
-
class
curio.io.
Socket
(sockobj)¶ Creates a wrapper the around an existing socket sockobj. This socket is set in non-blocking mode when wrapped. sockobj is not closed unless the created instance is explicitly closed or used as a context manager.
The following methods are redefined on Socket
objects to be
compatible with coroutines. Any socket method not listed here will be
delegated directly to the underlying socket. Be aware
that not all methods have been wrapped and that using a method not
listed here might block the kernel or raise a BlockingIOError
exception.
-
await
Socket.
recv
(maxbytes, flags=0)¶ Receive up to maxbytes of data.
-
await
Socket.
recv_into
(buffer, nbytes=0, flags=0)¶ Receive up to nbytes of data into a buffer object.
-
await
Socket.
recvfrom
(maxsize, flags=0)¶ Receive up to maxbytes of data. Returns a tuple (data, client_address).
-
await
Socket.
recvfrom_into
(buffer, nbytes=0, flags=0)¶ Receive up to nbytes of data into a buffer object.
-
await
Socket.
recvmsg
(bufsize, ancbufsize=0, flags=0)¶ Receive normal and ancillary data.
-
await
Socket.
recvmsg_into
(buffers, ancbufsize=0, flags=0)¶ Receive normal and ancillary data.
-
await
Socket.
send
(data, flags=0)¶ Send data. Returns the number of bytes of data actually sent (which may be less than provided in data).
-
await
Socket.
sendall
(data, flags=0)¶ Send all of the data in data. If cancelled, the
bytes_sent
attribute of the resulting exception contains the actual number of bytes sent.
-
await
Socket.
sendto
(data, address)¶
-
await
Socket.
sendto
(data, flags, address)¶ Send data to the specified address.
-
await
Socket.
sendmsg
(buffers, ancdata=(), flags=0, address=None)¶ Send normal and ancillary data to the socket.
-
await
Socket.
accept
()¶ Wait for a new connection. Returns a tuple (sock, address).
-
await
Socket.
connect
(address)¶ Make a connection.
-
await
Socket.
connect_ex
(address)¶ Make a connection and return an error code instead of raising an exception.
-
await
Socket.
writeable
()¶ Waits until the socket is writeable.
-
await
Socket.
close
()¶ Close the connection. This method is not called on garbage collection.
-
await
curio.io.
do_handshake
()¶ Perform an SSL client handshake. The underlying socket must have already be wrapped by SSL using the
curio.ssl
module.
-
Socket.
makefile
(mode, buffering=0)¶ Make a file-like object that wraps the socket. The resulting file object is a
curio.io.FileStream
instance that supports non-blocking I/O. mode specifies the file mode which must be one of'rb'
or'wb'
. buffering specifies the buffering behavior. By default unbuffered I/O is used. Note: It is not currently possible to create a stream with Unicode text encoding/decoding applied to it so those options are not available. If you are trying to put a file-like interface on a socket, it is usually better to use theSocket.as_stream()
method below.
-
Socket.
as_stream
()¶ Wrap the socket as a stream using
curio.io.SocketStream
. The result is a file-like object that can be used for both reading and writing on the socket.
-
Socket.
blocking
()¶ A context manager that temporarily places the socket into blocking mode and returns the raw socket object used internally. This can be used if you need to pass the socket to existing synchronous code.
Socket
objects may be used as an asynchronous context manager
which cause the underlying socket to be closed when done. For
example:
async with sock:
# Use the socket
...
# socket closed here
FileStream¶
The FileStream
class puts a non-blocking wrapper around an
existing file-like object. Certain other functions in curio use this
(e.g., the Socket.makefile()
method).
-
class
curio.io.
FileStream
(fileobj)¶ Create a file-like wrapper around an existing file. fileobj must be in in binary mode. The file is placed into non-blocking mode using
os.set_blocking(fileobj.fileno())
. fileobj is not closed unless the resulting instance is explicitly closed or used as a context manager.
The following methods are available on instances of FileStream
:
-
await
FileStream.
read
(maxbytes=- 1)¶ Read up to maxbytes of data on the file. If omitted, reads as much data as is currently available and returns it.
-
await
FileStream.
readall
()¶ Return all of the data that’s available on a file up until an EOF is read.
-
await
FileStream.
readline
()¶ Read a single line of data from a file.
-
await
FileStream.
readlines
()¶ Read all of the lines from a file. If cancelled, the
lines_read
attribute of the resulting exception contains all of the lines that were read so far.
-
await
FileStream.
write
(bytes)¶ Write all of the data in bytes to the file.
-
await
FileStream.
writelines
(lines)¶ Writes all of the lines in lines to the file. If cancelled, the
bytes_written
attribute of the exception contains the total bytes written so far.
-
await
FileStream.
flush
()¶ Flush any unwritten data from buffers to the file.
-
await
FileStream.
close
()¶ Flush any unwritten data and close the file. This method is not called on garbage collection.
-
FileStream.
blocking
()¶ A context manager that temporarily places the stream into blocking mode and returns the raw file object used internally. This can be used if you need to pass the file to existing synchronous code.
Other file methods (e.g., tell()
, seek()
, etc.) are available
if the supplied fileobj
also has them.
A FileStream
may be used as an asynchronous context manager. For example:
async with stream:
# Use the stream object
...
# stream closed here
SocketStream¶
The SocketStream
class puts a non-blocking file-like interface
around a socket. This is normally created by the Socket.as_stream()
method.
-
class
curio.io.
SocketStream
(sock)¶ Create a file-like wrapper around an existing socket. sock must be a
socket
instance from Python’s built-insocket
module. The socket is placed into non-blocking mode. sock is not closed unless the resulting instance is explicitly closed or used as a context manager.
A SocketStream
instance supports the same methods as FileStream
above.
One subtle issue concerns the blocking()
method below.
-
SocketStream.
blocking
()¶ A context manager that temporarily places the stream into blocking mode and returns a raw file object that wraps the underlying socket. It is important to note that the return value of this operation is a file created
open(sock.fileno(), 'rb+', closefd=False)
. You can pass this object to code that is expecting to work with a file. The file is not closed when garbage collected.
socket wrapper module¶
The curio.socket
module provides a wrapper around the built-in
socket
module–allowing it to be used as a stand-in in
curio-related code. The module provides exactly the same
functionality except that certain operations have been replaced by
coroutine equivalents.
-
curio.socket.
socket
(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)¶ Creates a
curio.io.Socket
wrapper the aroundsocket
objects created in the built-insocket
module. The arguments for construction are identical and have the same meaning. The resultingsocket
instance is set in non-blocking mode.
The following module-level functions have been modified so that the returned socket objects are compatible with curio:
-
curio.socket.
socketpair
(family=AF_UNIX, type=SOCK_STREAM, proto=0)¶
-
curio.socket.
fromfd
(fd, family, type, proto=0)¶
-
curio.socket.
create_connection
(address, source_address)¶
The following module-level functions have been redefined as coroutines so that they don’t block the kernel when interacting with DNS:
-
await
curio.socket.
getaddrinfo
(host, port, family=0, type=0, proto=0, flags=0)¶
-
await
curio.socket.
getfqdn
(name)¶
-
await
curio.socket.
gethostbyname
(hostname)¶
-
await
curio.socket.
gethostbyname_ex
(hostname)¶
-
await
curio.socket.
gethostname
()¶
-
await
curio.socket.
gethostbyaddr
(ip_address)¶
-
await
curio.socket.
getnameinfo
(sockaddr, flags)¶
ssl wrapper module¶
The curio.ssl
module provides curio-compatible functions for creating an SSL
layer around curio sockets. The following functions are redefined (and have the same
calling signature as their counterparts in the standard ssl
module:
-
curio.ssl.
wrap_socket
(*args, **kwargs)¶
-
await
curio.ssl.
get_server_certificate
(*args, **kwargs)¶
-
curio.ssl.
create_default_context
(*args, **kwargs)¶
-
class
curio.ssl.
SSLContext
¶ A redefined and modified variant of
ssl.SSLContext
so that thewrap_socket()
method returns a socket compatible with curio.
Don’t attempt to use the curio.ssl
module without a careful read of Python’s official documentation
at https://docs.python.org/3/library/ssl.html.
For the purposes of curio, it is usually easier to apply SSL to a connection using some of the high level network functions described in the next section. For example, here’s how you make an outgoing SSL connection:
sock = await curio.open_connection('www.python.org', 443,
ssl=True,
server_hostname='www.python.org')
Here’s how you might define a server that uses SSL:
import curio
from curio import ssl
KEYFILE = "privkey_rsa" # Private key
CERTFILE = "certificate.crt" # Server certificat
async def handler(client, addr):
...
if __name__ == '__main__':
kernel = curio.Kernel()
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
kernel.run(curio.tcp_server('', 10000, handler, ssl=ssl_context))
High Level Networking¶
The following functions are provided to simplify common tasks related to making network connections and writing servers.
-
await
curio.
open_connection
(host, port, *, ssl=None, source_addr=None, server_hostname=None, alpn_protocols=None)¶ Creates an outgoing connection to a server at host and port. This connection is made using the
socket.create_connection()
function and might be IPv4 or IPv6 depending on the network configuration (although you’re not supposed to worry about it). ssl specifies whether or not SSL should be used. ssl can beTrue
or an instance ofcurio.ssl.SSLContext
. source_addr specifies the source address to use on the socket. server_hostname specifies the hostname to check against when making SSL connections. It is highly advised that this be supplied to avoid man-in-the-middle attacks. alpn_protocols specifies a list of protocol names for use with the TLS ALPN extension (RFC7301). A typical value might be['h2', 'http/1.1']
for negotiating either a HTTP/2 or HTTP/1.1 connection.
-
await
curio.
open_unix_connection
(path, *, ssl=None, server_hostname=None, alpn_protocols=None)¶ Creates a connection to a Unix domain socket with optional SSL applied.
-
await
curio.
tcp_server
(host, port, client_connected_task, *, family=AF_INET, backlog=100, ssl=None, reuse_address=True, reuse_port=False)¶ Creates a server for receiving TCP connections on a given host and port. client_connected_task is a coroutine that is to be called to handle each connection. Family specifies the address family and is either
socket.AF_INET
orsocket.AF_INET6
. backlog is the argument to thesocket.socket.listen()
method. ssl specifies ancurio.ssl.SSLContext
instance to use. reuse_address specifies whether to reuse a previously used port. reuse_port specifies whether to use theSO_REUSEPORT
socket option prior to binding.
-
await
curio.
unix_server
(path, client_connected_task, *, backlog=100, ssl=None)¶ Creates a Unix domain server on a given path. client_connected_task is a coroutine to execute on each connection. backlog is the argument given to the
socket.socket.listen()
method. ssl is an optionalcurio.ssl.SSLContext
to use if setting up an SSL connection.
Message Passing and Channels¶
Curio provides a Channel
class that can be used to perform message
passing between interpreters running in separate processes.
-
class
curio.channel.
Channel
(address, family=socket.AF_INET)¶ Represents a communications endpoint for message passing. address is the address and family is the protocol family.
The following methods are used to establish a connection on a Channel
instance.
-
await
Channel.
accept
(*, authkey=None)¶ Wait for an incoming connection. authkey is an optional authentication key that can be used to authenticate the client. Authentication involves computing an HMAC-based cryptographic digest. The key itself is not transmitted. Returns an
Connection
instance.
-
await
Channel.
connect
(*, authkey=None)¶ Make an outgoing connection. authkey is an optional authentication key. This method repeatedly attempts to make a connection if the other endpoint is not responding. Returns a
Connection
instance.
-
Channel.
bind
()¶ Performs the address binding step of the
accept()
method and returns. Can use this if you want the host operating system to assign a port number for you. For example, you can supply an initial address of('localhost', socket.INADDR_ANY)
and callbind()
. Afterwards, theaddress
attribute of theChannel
instance contains the assigned address.
-
await
Channel.
close
()¶ Close the channel.
The connect()
and accept()
methods of Channel
instances return a
Connection
instance.
-
class
curio.channel.
Connection
(reader, writer)¶ Represents a connection on which message passing of Python objects is supported. reader and writer are Curio I/O streams on which reading and writing are to take place.
Instances of Connection
support the following methods:
-
await
curio.channel.
close
()¶ Close the connection by closing both the reader and writer streams.
-
await
curio.channel.
recv
()¶ Receive a Python object. The received object is unserialized using the
pickle
module.
-
await
curio.channel.
recv_bytes
(maxlength=None)¶ Receive a raw message of bytes. maxlength specifies a maximum message size. By default messages may be of arbitrary size.
-
await
curio.channel.
send
(obj)¶ Send a Python object. The object must be compatible with the
pickle
module.
-
await
curio.channel.
send_bytes
(buf, offset=0, size=None)¶ Send a buffer of bytes as a single message. offset and size specify an optional byte offset and size into the underlying memory buffer.
-
await
curio.channel.
authenticate_server
(authkey)¶ Authenticate the connection for a server.
-
await
curio.channel.
authenticate_client
(authkey)¶ Authenticate the connection for a client.
A Connection
instance may also be used as a context manager.
Here is an example of a producer program using channels:
# producer.py
from curio import Channel, run
async def producer(ch):
c = await ch.accept(authkey=b'peekaboo')
for i in range(10):
await c.send(i)
await c.send(None) # Sentinel
if __name__ == '__main__':
ch = Channel(('localhost', 30000))
run(producer(ch))
Here is an example of a correspoinding consumer program using a channel:
# consumer.py
from curio import Channel, run
async def consumer(ch):
c = await ch.connect(authkey=b'peekaboo')
while True:
msg = await c.recv()
if msg is None:
break
print('Got:', msg)
if __name__ == '__main__':
ch = Channel(('localhost', 30000))
run(consumer(ch))
subprocess wrapper module¶
The curio.subprocess
module provides a wrapper around the built-in
subprocess
module.
-
class
curio.subprocess.
Popen
(*args, **kwargs)¶ A wrapper around the
subprocess.Popen
class. The same arguments are accepted. On the resultingPopen
instance, thestdin
,stdout
, andstderr
file attributes have been wrapped by thecurio.io.FileStream
class. You can use these in an asynchronous context.
Here is an example of using Popen
to read streaming output off of a
subprocess with curio:
import curio
from curio import subprocess
async def main():
p = subprocess.Popen(['ping', 'www.python.org'], stdout=subprocess.PIPE)
async for line in p.stdout:
print('Got:', line.decode('ascii'), end='')
if __name__ == '__main__':
kernel = curio.Kernel()
kernel.add_task(main())
kernel.run()
The following methods of Popen
have been replaced by asynchronous equivalents:
-
await
Popen.
wait
()¶ Wait for a subprocess to exit. Cancellation does not terminate the process.
-
await
Popen.
communicate
(input=b'')¶ Communicate with the subprocess, sending the specified input on standard input. Returns a tuple
(stdout, stderr)
with the resulting output of standard output and standard error. If cancelled, the resulting exception hasstdout
andstderr
attributes that contain the output read prior to cancellation. Cancellation does not terminate the underlying subprocess.
The following functions are also available. They accept the same arguments as their
equivalents in the subprocess
module:
-
await
curio.subprocess.
run
(args, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False)¶ Run a command in a subprocess. Returns a
subprocess.CompletedProcess
instance. If cancelled, the underlying process is terminated using the processkill()
method. The resulting exception will havestdout
andstderr
attributes containing output read prior to cancellation.
-
await
curio.subprocess.
check_output
(args, stdout=None, stderr=None, shell=False)¶ Run a command in a subprocess and return the resulting output. Raises a
subprocess.CalledProcessError
exception if an error occurred. The behavior on cancellation is the same as forrun()
.
file wrapper module¶
One problem concerning coroutines and async concerns access to files on the
normal file system. Yes, you can use the built-in open()
function, but
what happens afterwards is hard to predict. Under the covers, the operating
system might have to access a disk drive or perform networking of its own.
Either way, the operation might take a long time to complete and while it does,
the whole Curio kernel will be blocked. You really don’t want that–especially
if the system is under heavy load.
The curio.file
module provides an asynchronous compatible
replacement for the built-in open()
function and associated file
objects, should you want to read and write traditional files on the
filesystem. The underlying implementation avoids blocking. How this
is accomplished is an implementation detail (although threads are used
in the initial version).
-
curio.file.
aopen
(*args, **kwargs)¶ Creates a
curio.file.AsyncFile
wrapper around a traditional file object as returned by Python’s builtinopen()
function. The arguments are exactly the same as foropen()
. The returned file object must be used as an asynchronous context manager.
-
class
curio.file.
AsyncFile
(fileobj)¶ This class represents an asynchronous file as returned by the
aopen()
function. Normally, instances are created by theaopen()
function. However, it can be wrapped around an already-existing file object that was opened using the built-inopen()
function.
The following methods are redefined on AsyncFile
objects to be
compatible with coroutines. Any method not listed here will be
delegated directly to the underlying file. These methods take the same arguments
as the underlying file object. Be aware that not all of these methods are
available on all kinds of files (e.g., read1()
, readinto()
and similar
methods are only available in binary-mode files).
-
await
AsyncFile.
read
(*args, **kwargs)¶
-
await
AsyncFile.
read1
(*args, **kwargs)¶
-
await
AsyncFile.
readline
(*args, **kwargs)¶
-
await
AsyncFile.
readlines
(*args, **kwargs)¶
-
await
AsyncFile.
readinto
(*args, **kwargs)¶
-
await
AsyncFile.
readinto1
(*args, **kwargs)¶
-
await
AsyncFile.
write
(*args, **kwargs)¶
-
await
AsyncFile.
writelines
(*args, **kwargs)¶
-
await
AsyncFile.
truncate
(*args, **kwargs)¶
-
await
AsyncFile.
seek
(*args, **kwargs)¶
-
await
AsyncFile.
tell
(*args, **kwargs)¶
-
await
AsyncFile.
flush
()¶
-
await
AsyncFile.
close
()¶
AsyncFile
objects should always be used as an asynchronous
context manager. For example:
async with aopen(filename) as f:
# Use the file
data = await f.read()
AsyncFile
objects may also be used with asynchronous iteration.
For example:
async with open(filename) as f:
async for line in f:
...
AsyncFile
objects are intentionally incompatible with code
that uses files in a synchronous manner. Partly, this is to help
avoid unintentional errors in your program where blocking might
occur with you realizing it. If you know what you’re doing and you
need to access the underlying file in synchronous code, use the
blocking() context manager like this:
async with open(filename) as f:
...
# Pass to synchronous code (danger: might block)
with f.blocking() as sync_f:
# Use synchronous I/O operations
data = sync_f.read()
...
Synchronization Primitives¶
The following synchronization primitives are available. Their behavior
is similar to their equivalents in the threading
module. None
of these primitives are safe to use with threads created by the
built-in threading
module.
-
class
Event
¶ An event object.
Event
instances support the following methods:
-
Event.
is_set
()¶ Return
True
if the event is set.
-
Event.
clear
()¶ Clear the event.
-
await
Event.
wait
()¶ Wait for the event.
-
await
Event.
set
()¶ Set the event. Wake all waiting tasks (if any).
Here is an Event example:
import curio
async def waiter(evt):
print('Waiting')
await evt.wait()
print('Running')
async def main():
evt = curio.Event()
# Create a few waiters
await curio.spawn(waiter(evt))
await curio.spawn(waiter(evt))
await curio.spawn(waiter(evt))
await curio.sleep(5)
# Set the event. All waiters should wake up
await evt.set()
curio.run(main)
-
class
Lock
¶ This class provides a mutex lock. It can only be used in tasks. It is not thread safe.
Lock
instances support the following methods:
-
await
Lock.
acquire
()¶ Acquire the lock.
-
await
Lock.
release
()¶ Release the lock.
-
Lock.
locked
()¶ Return
True
if the lock is currently held.
The preferred way to use a Lock is as an asynchronous context manager. For example:
import curio
async def child(lck):
async with lck:
print('Child has the lock')
async def main():
lck = curio.Lock()
async with lck:
print('Parent has the lock')
await curio.spawn(child(lck))
await curio.sleep(5)
curio.run(main())
-
class
RLock
¶ This class provides a recursive lock funtionality, that could be acquired multiple times within the same task. The behavior of this lock is identical to the
threading.RLock
, except that the owner of the lock will be a task, wich acquired it, instead of a thread.
RLock
instances support the following methods:
-
await
Lock.
acquire
()¶ Acquire the lock, incrementing the recursion by 1. Can be used multiple times within the same task, that owns this lock.
-
await
Lock.
release
()¶ Release the lock, decrementing the recursion level by 1. If recursion level reaches 0, the lock is unlocked. Raises
RuntimeError
if called not by the owner or if lock is not locked.
-
Lock.
locked
()¶ Return
True
if the lock is currently held, i.e. recursion level is greater than 0.
-
class
Semaphore
(value=1)¶ Create a semaphore. Semaphores are based on a counter. If the count is greater than 0, it is decremented and the semaphore is acquired. Otherwise, the task has to wait until the count is incremented by another task.
-
class
BoundedSemaphore
(value=1)¶ This class is the same as
Semaphore
except that the semaphore value is not allowed to exceed the initial value.
Semaphores support the following methods:
-
await
Semaphore.
acquire
()¶ Acquire the semaphore, decrementing its count. Blocks if the count is 0.
-
await
Semaphore.
release
()¶ Release the semaphore, incrementing its count. Never blocks.
-
Semaphore.
locked
()¶ Return
True
if the Semaphore is locked.
Like locks, semaphores support the async-with statement. A common use of semaphores is to limit the number of tasks performing an operation. For example:
import curio
async def worker(sema):
async with sema:
print('Working')
await curio.sleep(5)
async def main():
sema = curio.Semaphore(2) # Allow two tasks at a time
# Launch a bunch of tasks
for n in range(10):
await curio.spawn(worker(sema))
# After this point, you should see two tasks at a time run. Every 5 seconds.
curio.run(main())
-
class
Condition
(lock=None)¶ Condition variable. lock is the underlying lock to use. If none is provided, then a
Lock
object is used.
Condition
objects support the following methods:
-
Condition.
locked
()¶ Return
True
if the condition variable is locked.
-
await
Condition.
acquire
()¶ Acquire the condition variable lock.
-
await
Condition.
release
()¶ Release the condition variable lock.
-
await
Condition.
wait
()¶ Wait on the condition variable. This releases the underlying lock.
-
await
Condition.
wait_for
(predicate)¶ Wait on the condition variable until a supplied predicate function returns
True
. predicate is a callable that takes no arguments.
-
await
notify
(n=1)¶ Notify one or more tasks, causing them to wake from the
Condition.wait()
method.
-
await
notify_all
()¶ Notify all tasks waiting on the condition.
Condition variables are often used to signal between tasks. For example, here is a simple producer-consumer scenario:
import curio
from collections import deque
items = deque()
async def consumer(cond):
while True:
async with cond:
while not items:
await cond.wait() # Wait for items
item = items.popleft()
print('Got', item)
async def producer(cond):
for n in range(10):
async with cond:
items.append(n)
await cond.notify()
await curio.sleep(1)
async def main():
cond = curio.Condition()
await curio.spawn(producer(cond))
await curio.spawn(consumer(cond))
curio.run(main())
Queues¶
If you want to communicate between tasks, it’s usually much easier to use
a Queue
instead.
-
class
Queue
(maxsize=0)¶ Creates a queue with a maximum number of elements in maxsize. If not specified, the queue can hold an unlimited number of items.
A Queue
instance supports the following methods:
-
Queue.
empty
()¶ Returns
True
if the queue is empty.
-
Queue.
full
()¶ Returns
True
if the queue is full.
-
Queue.
qsize
()¶ Return the number of items currently in the queue.
-
await
Queue.
get
()¶ Returns an item from the queue.
-
await
Queue.
put
(item)¶ Puts an item on the queue.
-
await
Queue.
join
()¶ Wait for all of the elements put onto a queue to be processed. Consumers must call
Queue.task_done()
to indicate completion.
-
await
Queue.
task_done
()¶ Indicate that processing has finished for an item. If all items have been processed and there are tasks waiting on
Queue.join()
they will be awakened.
Here is an example of using queues in a producer-consumer problem:
import curio
async def producer(queue):
for n in range(10):
await queue.put(n)
await queue.join()
print('Producer done')
async def consumer(queue):
while True:
item = await queue.get()
print('Consumer got', item)
await queue.task_done()
async def main():
q = curio.Queue()
prod_task = await curio.spawn(producer(q))
cons_task = await curio.spawn(consumer(q))
await prod_task.join()
await cons_task.cancel()
curio.run(main())
-
class
PriorityQueue
(maxsize=0)¶ Creates a priority queue with a maximum number of elements in maxsize.
In a PriorityQueue
items are retrieved in priority order with the
lowest priority first:
import curio
async def main():
q = curio.PriorityQueue()
await q.put((0, 'highest priority'))
await q.put((100, 'very low priority'))
await q.put((3, 'higher priority'))
while not q.empty():
print(await q.get())
curio.run(main())
This will output
(0, 'highest priority')
(3, 'higher priority')
(100, 'very low priority')
-
class
LifoQueue
(maxsize=0)¶ A queue with “Last In First Out” retrieving policy
import curio
async def main():
q = curio.LifoQueue()
await q.put('first')
await q.put('second')
await q.put('last')
while not q.empty():
print(await q.get())
curio.run(main())
This will output
last
second
first
Here is an example a producer-consumer problem with a UniversalQueue
:
from curio import run, UniversalQueue, spawn, run_in_thread
import time
import threading
# An async task
async def consumer(q):
print('Consumer starting')
while True:
item = await q.get()
if item is None:
break
print('Got:', item)
await q.task_done()
print('Consumer done')
# A threaded producer
def producer(q):
for i in range(10):
q.put(i)
time.sleep(1)
q.join()
print('Producer done')
async def main():
q = UniversalQueue()
t1 = await spawn(consumer(q))
t2 = threading.Thread(target=producer, args=(q,))
t2.start()
await run_in_thread(t2.join)
await q.put(None)
await t1.join()
await q.shutdown()
run(main())
In this code, the consumer()
is a Curio task and producer()
is a thread.
If the withfd=True
option is given to a UniveralQueue
, it additionally
has a fileno()
method that can be passed to various functions that might
poll for I/O events. When enabled, putting something in the queue will also
write a byte of I/O. This might be useful if trying to pass data from Curio
to a foreign event loop.
Synchronizing with Threads and Processes¶
Curio’s synchronization primitives aren’t safe to use with externel threads or
processes. However, Curio can work with existing thread or process-level
synchronization primitives if you use the abide()
function.
-
await
abide
(op, *args, **kwargs)¶ Execute an operation in a manner that safely works with async code. If
op
is a coroutine function, thenop(*args, **kwargs)
is returned. Ifop
is a synchronous function, thenblock_in_thread(op, *args, **kwargs)
is returned. In both cases, you would useawait
to obtain the result. Ifop
is an asynchronous context manager, it is returned unmodified. Ifop
is a synchronous context manager, it is wrapped in a manner that carries out its execution in a backing thread. For this latter case, a special keyword argumentreserve_thread=True
may be given that instructs Curio to use the same backing thread for the entire duraction of the context manager.
The main use of this function is in code that wants to safely synchronize curio with threads and processes. For example, here is how you would synchronize a thread with a curio task using a threading lock:
import curio
import threading
import time
# A curio task
async def child(lock):
async with curio.abide(lock):
print('Child has the lock')
# A thread
def parent(lock):
with lock:
print('Parent has the lock')
time.sleep(5)
lock = threading.Lock()
threading.Thread(target=parent, args=(lock,)).start()
curio.run(child(lock))
If you wanted to trigger or wait for a thread-event, you might do this:
import curio
import threading
evt = threading.Event()
async def worker():
await abide(evt.wait)
print('Working')
...
def main():
...
evt.set()
...
For condition variables and reentrant locks, you should use
reserve_thread=True
keyword argument to make sure the same thread is used
throughout the block. For example:
import curio
import threading
import collections
# A thread
def producer(cond, items):
for n in range(10):
with cond:
items.append(n)
cond.notify()
print('Producer done')
# A curio task
async def consumer(cond, items):
while True:
async with abide(cond, reserve_thread=True) as c:
while not items:
await c.wait()
item = items.popleft()
if item is None:
break
print('Consumer got:', item)
print('Consumer done')
cond = threading.Condition()
items = collections.deque()
threading.Thread(target=producer, args=(cond, items)).start()
curio.run(consumer(cond, items))
A notable feature of abide()
is that it also accepts Curio’s
own synchronization primitives. Thus, you can write code that
works independently of the lock type. For example, the first locking
example could be rewritten as follows and the child would still work:
import curio
# A curio task (works with any lock)
async def child(lock):
async with curio.abide(lock):
print('Child has the lock')
# Another curio task
async def main():
lock = curio.Lock()
async with lock:
print('Parent has the lock')
await curio.spawn(child(lock))
await curio.sleep(5)
curio.run(main())
A special circle of hell awaits code that combines the use of
the abide()
function with task cancellation. Although
cancellation is mostly supported, there are a few things to keep in mind
about it. First, if you are using abide(func, arg1, arg2, ...)
to
run a synchronous function, that function will fully run to completion
in a separate thread regardless of the cancellation. So, if there are
any side-effects associated with that code executing, you’ll need to
take them into account. Second, if you are using async with
abide(lock)
with a thread-lock and a cancellation request is
received while waiting for the lock.__enter__()
method to execute,
a background thread continues to run waiting for the eventual lock
acquisition. Once acquired, curio releases it again. However, fully
figuring out what’s happening might be mind-bending.
The abide()
function can be used to synchronize with a thread
reentrant lock (e.g., threading.RLock
). However, reentrancy is
not supported. Each lock acquisition using abide()
involves a
different backing thread. Repeated acquisitions would violate the
constraint that reentrant locks have on only being acquired by a
single thread.
All things considered, it’s probably best to try and avoid code that synchronizes Curio tasks with threads. However, if you must, Curio abides.
Asynchronous Threads¶
If you need to perform a lot of synchronous operations, but still
interact with Curio, you might consider launching an asynchronous
thread. An asynchronous thread flips the whole world around–instead
of executing synchronous operations using run_in_thread()
, you
kick everything out to a thread and perform the asynchronous
operations using a magic AWAIT()
function.
-
class
AsyncThread
(target, args=(), kwargs={}, daemon=True)¶ Creates an asynchronous thread. The arguments are the same as for the
threading.Thread
class.target
is a synchronous callable.args
andkwargs
are its arguments.daemon
specifies if the thread runs in daemonic mode.
-
await
AsyncThread.
start
()¶ Starts the asynchronous thread.
-
await
join
()¶ Waits for the thread to terminate, returning the callables final result. The final result is returned in the same manner as the usual
Task.join()
method used on Curio tasks.
-
await
cancel
()¶ Cancels the asynchronous thread. The behavior is the same as cancellation performed on Curio tasks. An asynchronous thread can only be cancelled when it performs blocking operations on asynchronous objects (e.g., using
AWAIT()
.
Within a thread, the following function can be used to execute a coroutine.
-
AWAIT
(coro)¶ Execute a coroutine on behalf of an asynchronous thread. The requested coroutine executes in Curio’s main execution thread. The caller is blocked until it completes. If used outside of an asynchronous thread, an
AsyncOnlyError
exception is raised. Ifcoro
is not a coroutine, it is returned unmodified. The reasonAWAIT
is all-caps is to make it more easily heard when there are all of these coders yelling at you to just use pure async code instead of launching a thread. Also,await
is likely to be a reserved keyword in Python 3.7.
Here is a simple example of an asynchronous thread that reads data off a Curio queue:
from curio import run, Queue, sleep, CancelledError
from curio.thread import AsyncThread, AWAIT
def consumer(queue):
try:
while True:
item = AWAIT(queue.get())
print('Got:', item)
AWAIT(queue.task_done())
except CancelledError:
print('Consumer goodbye!')
raise
async def main():
q = Queue()
t = AsyncThread(target=consumer, args=(q,))
await t.start()
for i in range(10):
await q.put(i)
await sleep(1)
await q.join()
await t.cancel()
run(main())
Asynchronous threads can also be created using the following decorator.
-
async_thread
(callable)¶ A decorator that adapts a synchronous callable into an asynchronous function that runs an asynchronous thread.
Using this decorator, you can write a function like this:
@async_thread
def consumer(queue):
try:
while True:
item = AWAIT(queue.get())
print('Got:', item)
AWAIT(queue.task_done())
except CancelledError:
print('Consumer goodbye!')
raise
Now, whenever the code executes (e.g., await consumer(q)
), a
thread will automatically be created. The decorator might also
be useful in combination with spawn()
like this:
def consumer(queue):
try:
while True:
item = AWAIT(queue.get())
print('Got:', item)
AWAIT(queue.task_done())
except CancelledError:
print('Consumer goodbye!')
raise
async def main():
q = Queue()
t = await spawn(async_thread(consumer)(q))
...
Asynchronous threads can use all of Curio’s features including
coroutines, asynchronous context managers, asynchronous iterators,
timeouts and more. For coroutines, use the AWAIT()
function. For
context managers and iterators, use the synchronous counterpart. For
example, you could write this:
from curio.thread import async_thread, AWAIT
from curio import run, tcp_server
@async_thread
def echo_client(client, addr):
print('Connection from:', addr)
with client:
f = client.as_stream()
for line in f:
AWAIT(client.sendall(line))
print('Client goodbye')
run(tcp_server('', 25000, echo_client))
In this code, the with client
and for line in f
statements are
actually executing asynchronous code behind the scenes.
Asynchronous threads can perform any combination of blocking operations
including those that might involve normal thread-related primitives such
as locks and queues. These operations will block the thread itself, but
will not block the Curio kernel loop. In a sense, this is the whole
point–if you run things in an async threads, the rest of Curio is
protected. Asynchronous threads can be cancelled in the same manner
as normal Curio tasks. However, the same rules apply–an asynchronous
thread can only be cancelled on blocking operations involving AWAIT()
.
A final curious thing about async threads is that the AWAIT()
function is no-op if you don’t give it a coroutine. This means that
code, in many cases, can be made to be compatible with regular Python
threads. For example, this code actually runs:
from curio.thread import AWAIT
from curio import CancelledError
from threading import Thread
from queue import Queue
from time import sleep
def consumer(queue):
try:
while True:
item = AWAIT(queue.get())
print('Got:', item)
AWAIT(queue.task_done())
except CancelledError:
print('Consumer goodbye!')
raise
def main():
q = Queue()
t = Thread(target=consumer, args=(q,), daemon=True)
t.start()
for i in range(10):
q.put(i)
sleep(1)
q.join()
main()
In this code, consumer()
is simply launched in a regular thread
with a regular thread queue. The AWAIT()
operations do
nothing–the queue operations aren’t coroutines and their results
return unmodified. Certain Curio features such as cancellation aren’t
supported by normal threads so that would be ignored. However, it’s
interesting that you can write a kind of hybrid code that works in
both a threaded and asynchronous world.
Signals¶
Unix signals are managed by the SignalSet
class. This class operates
as an asynchronous context manager. The recommended usage looks like this:
import signal
async def coro():
...
async with SignalSet(signal.SIGUSR1, signal.SIGHUP) as sigset:
...
signo = await sigset.wait()
print('Got signal', signo)
...
For all of the statements inside the context-manager, signals will be queued. The sigset.wait() operation will return received signals one at a time from the signal queue.
Signals can be temporarily ignored using a normal context manager:
async def coro():
...
sigset = SignalSet(signal.SIGINT)
with sigset.ignore():
...
# Signals temporarily disabled
...
Caution: Signal handling only works if the curio kernel is running in Python’s main execution thread. Also, mixing signals with threads, subprocesses, and other concurrency primitives is a well-known way to make your head shatter into small pieces. Tread lightly.
-
class
SignalSet
(*signals)¶ Represents a set of one or more Unix signals. signals is a list of signals as defined in the built-in
signal
module.
The following methods are available on a SignalSet
instance. They
may only be used in coroutines.
-
await
SignalSet.
wait
()¶ Wait for one of the signals in the signal set to arrive. Returns the signal number of the signal received. Normally this method is used inside an
async with
statement because this allows received signals to be properly queued. It can be used in isolation, but be aware that this will only catch a single signal right at that line of code. It’s possible that you might lose signals if you use this method outside of a context manager.
-
SignalSet.
ignore
()¶ Returns a context manager wherein signals from the signal set are temporarily disabled. Note: This is a normal context manager– use a normal
with
-statement.
Asynchronous Metaprogramming¶
The curio.meta
module provides some decorators and metaclasses that might
be useful if writing larger programs involving coroutines.
-
class
curio.meta.
AsyncABC
¶ A base class that provides the functionality of a normal abstract base class, but additionally enforces coroutine-correctness on methods in subclasses. That is, if a method is defined as a coroutine in a parent class, then it must also be a coroutine in child classes.
Here is an example:
from curio.abc import AsyncABC, abstractmethod
class Base(AsyncABC):
@abstractmethod
async def spam(self):
pass
@abstractmethod
async def grok(self):
pass
class Child(Base):
async def spam(self):
pass
c = Child() # Error -> grok() not defined
class Child2(Base):
def spam(self): # Error -> Not defined using async def
pass
async def grok(self):
pass
The enforcement of coroutines is applied to all methods. Thus, the following classes would also generate an error:
class Base(AsyncABC):
async def spam(self):
pass
async def grok(self):
pass
class Child(Base):
def spam(self): # Error -> Not defined using async def
pass
-
class
curio.meta.
AsyncObject
¶ A base class that provides all of the functionality of
AsyncABC
, but additionally requires instances to be created inside of coroutines. The__init__()
method must be defined as a coroutine and may call other coroutines.
Here is an example using AsyncObject
:
from curio.meta import AsyncObject
class Spam(AsyncObject):
async def __init__(self, x, y):
self.x = x
self.y = y
# To create an instance
async def func():
s = await Spam(2, 3)
...
-
curio.meta.
blocking
(func)¶ A decorator that indicates that the function performs a blocking operation. If the function is called from within a coroutine, the function is executed in a separate thread and
await
is used to obtain the result. If the function is called from normal synchronous code, then the function executes normally. The Curiorun_in_thread()
coroutine is used to execute the function in a thread.
-
curio.meta.
cpubound
(func)¶ A decorator that indicates that the function performs CPU intensive work. If the function is called from within a coroutine, the function is executed in a separate process and
await
is used to obtain the result. If the function is called from normal synchronous code, then the function executes normally. The Curiorun_in_process()
coroutine is used to execute the function in a process.
The @blocking
and @cpubound
decorators are interesting in that they make
normal Python functions usable from both asynchronous and synchronous code.
For example, consider this example:
import curio
from curio.meta import blocking
import time
@blocking
def slow(name):
time.sleep(30)
return 'Hello ' + name
async def main():
result = await slow('Dave') # Async execution
print(result)
if __name__ == '__main__':
result = slow('Guido') # Sync execution
print(result)
curio.run(main())
In this example, the slow()
function can be used from both
coroutines and normal synchronous code. However, when called in
a coroutine, await
must be used. Behind the scenes, the function
runs in a thread–preventing the function from blocking the
execution of other coroutines.
-
curio.meta.
awaitable
(syncfunc)¶ A decorator that allows an asynchronous implementation of a function to be attached to an existing synchronous function. If the resulting function is called from synchronous code, the synchronous function is used. If the function is called from asynchronous code, the asynchronous function is used.
Here is an example that illustrates:
import curio
from curio.meta import awaitable
def spam(x, y):
print('Synchronous ->', x, y)
@awaitable(spam)
async def spam(x, y):
print('Asynchronous ->', x, y)
async def main():
await spam(2, 3) # Calls asynchronous spam()
if __name__ == '__main__':
spam(2, 3) # Calls synchronous spam()
curio.run(main())
Exceptions¶
The following exceptions are defined. All are subclasses of the
CurioError
base class.
-
exception
curio.
CurioError
¶ Base class for all Curio-specific exceptions.
-
exception
curio.
CancelledError
¶ Base class for all cancellation-related exceptions.
-
exception
curio.
TaskCancelled
¶ Exception raised in a coroutine if it has been cancelled using the
Task.cancel()
method. If ignored, the coroutine is silently terminated. If caught, a coroutine can continue to run, but should work to terminate execution. Ignoring a cancellation request and continuing to execute will likely cause some other task to hang.
-
exception
curio.
TaskTimeout
¶ Exception raised in a coroutine if it has been cancelled by timeout.
-
exception
curio.
TimeoutCancellationError
¶ Exception raised in a coroutine if it has been cancelled due to a timeout, but not one related to the inner-most timeout operation.
-
exception
curio.
TaskError
¶ Exception raised by the
Task.join()
method if an uncaught exception occurs in a task. It is a chained exception. The__cause__
attribute contains the exception that causes the task to fail.
Low-level Kernel System Calls¶
The following system calls are available, but not typically used
directly in user code. They are used to implement higher level
objects such as locks, socket wrappers, and so forth. If you find
yourself using these, you’re probably doing something wrong–or
implementing a new curio primitive. These calls are found in the
curio.traps
submodule.
Traps come in two flavors: blocking and synchronous. A blocking trap might block for an indefinite period of time while allowing other tasks to run, and always checks for and raises any pending timeouts or cancellations. A synchronous trap is implemented by trapping into the kernel, but semantically it acts like a regular synchronous function call. Specifically, this means that it always returns immediately without running any other task, and that it does not act as a cancellation point.
-
await
curio.traps.
_read_wait
(fileobj)¶ Blocking trap. Sleep until data is available for reading on fileobj. fileobj is any file-like object with a fileno() method.
-
await
curio.traps.
_write_wait
(fileobj)¶ Blocking trap. Sleep until data can be written on fileobj. fileobj is any file-like object with a fileno() method.
-
await
curio.traps.
_future_wait
(future)¶ Blocking trap. Sleep until a result is set on future. future is an instance of
concurrent.futures.Future
.
-
await
curio.traps.
_join_task
(task)¶ Blocking trap. Sleep until the indicated task completes. After this trap completes, then the task’s return value or raised exception information is available in
task.next_value
ortask.exc_info
, respectively.
-
await
curio.traps.
_cancel_task
(task)¶ Synchronous trap. Cancel the indicated task.
-
await
curio.traps.
_wait_on_ksync
(ksync, state_name)¶ Blocking trap. Go to sleep on a kernel synchronization primitive. ksync is an instance of
curio.kernel.KernelSyncBase
. state_name is the name of the wait state (used in debugging).
-
await
curio.traps.
_reschedule_tasks
(ksync, n=1, value=None, exc=None)¶ Synchronous trap. Reschedule one or more tasks from a kernel synchronization primitive. n is the number of tasks to release. value and exc specify the return value or exception to raise in the task when it resumes execution.
-
await
curio.traps.
_sigwatch
(sigset)¶ Synchronous trap. Tell the kernel to start queuing signals in the given signal set sigset.
-
await
curio.traps.
_sigunwatch
(sigset)¶ Synchronous trap. Tell the kernel to stop queuing signals in the given signal set.
-
await
curio.traps.
_sigwait
(sigset)¶ Blocking trap. Wait for the arrival of a signal in a given signal set. Returns the signal number of the received signal.
-
await
curio.traps.
_get_kernel
()¶ Synchronous trap. Get a reference to the running
Kernel
object.
-
await
curio.traps.
_get_current
()¶ Synchronous trap. Get a reference to the currently running
Task
instance.
-
await
curio.traps.
_set_timeout
(seconds)¶ Synchronous trap. Set a timeout in the currently running task. Returns the previous timeout (if any)
-
await
curio.traps.
_unset_timeout
(previous)¶ Synchronous trap. Unset a timeout in the currently running task. previous is the value returned by the _set_timeout() call used to set the timeout.
-
await
curio.traps.
_ksync_reschedule_function
(queue)¶ Synchronous trap. Return a function that allows tasks to be rescheduled on a kernel sychronization primitive without the use of await. Can be used in synchronous code as long as it runs in the same thread as the Curio kernel.
-
_clock():
Synchronous trap. Returns the current time according to the Curio kernel’s clock.
Again, you’re unlikely to use any of these functions directly. However, here’s a small taste
of how they’re used. For example, the curio.io.Socket.recv()
method
looks roughly like this:
class Socket(object):
...
def recv(self, maxbytes):
while True:
try:
return self._socket.recv(maxbytes)
except BlockingIOError:
await _read_wait(self._socket)
...
This method first tries to receive data. If none is available, the
_read_wait()
call is used to put the task to sleep until reading
can be performed. When it awakes, the receive operation is
retried. Just to emphasize, the _read_wait()
doesn’t actually
perform any I/O. It’s just scheduling a task for it.