Curio Reference Manual¶
This manual lists the basic functionality provided by curio.
The Kernel¶
The kernel is responsible for running all of the tasks. It should normally be created and used in the main execution thread.
-
class
Kernel
(selector=None, with_monitor=False)¶ Create an instance of a curio kernel. If selector is given, it should be an instance of a selector from the
selectors
module. If not given, thenselectors.DefaultSelector
is used to poll for I/O. If with_monitor isTrue
, the monitor task executes in the background. The monitor responds to the keyboard-interrupt and allows you to inspect the state of the running kernel.
There are only a few methods that may be used on a Kernel
outside of coroutines.
-
Kernel.
run
(coro=None, pdb=False, log_errors=True)¶ Runs the kernel until all non-daemonic tasks have finished execution. coro is a coroutine to run as a task. If omitted, then tasks should have already been added using the
add_task()
method below. If pdb isTrue
, then the kernel enters the Python debugger if any task crashes with an uncaught exception. If log_errors isTrue
, then uncaught exceptions in tasks are logged.
-
Kernel.
add_task
(coro, daemon=False)¶ Adds a new task to the kernel. coro is a newly instantiated coroutine. If daemon is
True
, the task is created without a parent and runs in the background. Returns aTask
instance. This method may not be used to add a task to a running kernel and may not be used inside a coroutine.
-
Kernel.
stop
()¶ Force the kernel to stop execution. Since the kernel normally runs in the main thread, this operation would normally have to be performed in a separate thread or possibly inside a coroutine. This method merely sets a flag in the kernel and returns immediately. The kernel will stop only after the currently running task yields.
-
Kernel.
shutdown
()¶ Performs a clean shutdown of the kernel by issuing a cancellation request to all remaining tasks (including daemonic tasks). This function will not return until all tasks have terminated. This method may only be invoked on a kernel that is not actively running. It may not be used inside coroutines or from separate threads. Normally, you would not call this method since the kernel runs until all tasks have terminated anyways. The main use case would be cleaning up after a premature kernel shutdown due to a crash, system exit, or some other event.
Tasks¶
Once the kernel is running, a coroutine can create a new task using the following function:
-
await
new_task
(coro, daemon=False)¶ Create a new task. coro is a newly called coroutine. Does not return to the caller until the new task has been scheduled and executed for at least one cycle. Returns a
Task
instance as a result. The daemon option, if supplied, creates the new task without a parent. The task will run indefinitely in the background. Note: The kernel only runs as long as there are non-daemonic tasks to execute.
-
class
Task
¶ Tasks created by
new_task()
are represented as aTask
instance. It is illegal to create aTask
instance directly by calling the class. The following methods are available on tasks:
-
await
Task.
join
(timeout=None)¶ Wait for the task to terminate. Returns the value returned by the task or raises a
TaskError
exception if the task failed with an exception. This is a chained exception. The __cause__ attribute of this exception contains the actual exception raised in the task.
-
await
Task.
cancel
(*, timeout=None, exc=CancelledError)¶ Cancels the task. This raises a
CancelledError
exception in the task which may choose to handle it. Does not return until the task is actually cancelled. If you want to change the exception raised, supply a different exception as the exc argument.
-
await
Task.
cancel_children
(*, timeout=None, exc=CancelledError)¶ Cancels all of the immediate children of this task. exc specifies a different exception if desired.
The following public attributes are available of Task
instances:
-
Task.
id
¶ The task’s integer id.
-
Task.
coro
¶ The coroutine associated with the task.
-
Task.
state
¶ The name of the task’s current state. Printing it can be potentially useful for debugging.
-
Task.
exc_info
¶ A tuple of exception information obtained from
sys.exc_info()
if the task crashes for some reason. Potentially useful for debugging.
-
Task.
children
¶ A set of the immediate child tasks created by this task. Useful if writing code that needs to supervise a collection of tasks. Be aware that the contents of the set may change as tasks are scheduled. To safely iterate and perform asynchronous operations, make a copy first.
If you need to make a task sleep for awhile, use the following function:
-
await
sleep
(seconds)¶ Sleep for a specified number of seconds. If the number of seconds is 0, the kernel merely switches to the next task (if any).
Performing External Work¶
Sometimes you need to perform work outside the kernel. This includes CPU-intensive calculations and blocking operations. Use the following functions to do that:
-
await
run_cpu_bound
(callable, *args, timeout=None)¶ Run
callable(*args)
in a process pool created byconcurrent.futures.ProcessPoolExecutor
. Returns the result.
-
await
run_blocking
(callable, *args, timeout=None)¶ Run
callable(*args)
in a thread pool created byconcurrent.futures.ThreadPoolExecutor
. Returns the result.
-
await
run_in_executor
(exc, callable, *args, timeout=None)¶ Run
callable(*args)
callable in a user-supplied executor and returns the result. exc is an executor from theconcurrent.futures
module in the standard library.
-
set_cpu_executor
(exc)¶ Set the default executor used for CPU-bound processing.
-
set_blocking_executor
(exc)¶ Set the default executor used for blocking processing.
Note that the callables supplied to these functions are only given positional
arguments. If you need to pass keyword arguments use
functools.partial()
to do it. For example:
from functools import partial
await run_blocking(partial(callable, arg1=value, arg2=value))
I/O Layer¶
I/O in curio is performed by classes in curio.io
that
wrap around existing sockets and streams. These classes manage the
blocking behavior and delegate their methods to an existing socket or
file.
Socket¶
The Socket
class is used to wrap existing an socket. It is compatible with
sockets from the built-in socket
module as well as SSL-wrapped sockets created
by functions by the built-in ssl
module. Sockets in curio should be fully
compatible with timeouts and other common socket features.
-
class
curio.io.
Socket
(sockobj)¶ Creates a wrapper the around an existing socket sockobj. This socket is set in non-blocking mode when wrapped.
The following methods are redefined on Socket
objects to be
compatible with coroutines. Any socket method not listed here will be
delegated directly to the underlying socket. Be aware
that not all methods have been wrapped and that using a method not
listed here might block the kernel or raise a BlockingIOError
exception.
-
await
Socket.
recv
(maxbytes, flags=0)¶ Receive up to maxbytes of data.
-
await
Socket.
recv_into
(buffer, nbytes=0, flags=0)¶ Receive up to nbytes of data into a buffer object.
-
await
Socket.
recvfrom
(maxsize, flags=0)¶ Receive up to maxbytes of data. Returns a tuple (data, client_address).
-
await
Socket.
recvfrom_into
(buffer, nbytes=0, flags=0)¶ Receive up to nbytes of data into a buffer object.
-
await
Socket.
recvmsg
(bufsize, ancbufsize=0, flags=0)¶ Receive normal and ancillary data.
-
await
Socket.
recvmsg_into
(buffers, ancbufsize=0, flags=0)¶ Receive normal and ancillary data.
-
await
Socket.
send
(data, flags=0)¶ Send data. Returns the number of bytes of data actually sent (which may be less than provided in data).
-
await
Socket.
sendall
(data, flags=0)¶ Send all of the data in data.
-
await
Socket.
sendto
(data, address)¶
-
await
Socket.
sendto
(data, flags, address)¶ Send data to the specified address.
-
await
Socket.
sendmsg
(buffers, ancdata=(), flags=0, address=None)¶ Send normal and ancillary data to the socket.
-
await
Socket.
accept
()¶ Wait for a new connection. Returns a tuple (sock, address).
-
await
Socket.
connect
(address)¶ Make a connection.
-
await
Socket.
connect_ex
(address)¶ Make a connection and return an error code instead of raising an exception.
-
await
Socket.
close
()¶ Close the connection.
-
await
curio.io.
do_handshake
()¶ Perform an SSL client handshake. The underlying socket must have already be wrapped by SSL using the
curio.ssl
module.
-
Socket.
makefile
(mode, buffering=0)¶ Make a file-like object that wraps the socket. The resulting file object is a
curio.io.Stream
instance that supports non-blocking I/O. mode specifies the file mode which must be one of'rb'
or'wb'
. buffering specifies the buffering behavior. By default unbuffered I/O is used. Note: It is not currently possible to create a stream with Unicode text encoding/decoding applied to it so those options are not available.
-
Socket.
make_streams
(buffering=0)¶ Make a pair of files for reading and writing. Returns a tuple
(reader, writer)
wherereader
andwriter
are streams created by themakefile()
method.
-
Socket.
blocking
()¶ A context manager that temporarily places the socket into blocking mode and returns the raw socket object used internally. This can be used if you need to pass the socket to existing synchronous code.
Socket
objects may be used as an asynchronous context manager which
causes it to be closed when done. For example:
async with sock:
# Use the socket
...
# socket closed here
Stream¶
The Stream
class puts a non-blocking wrapper around an
existing file-like object. Certain other functions in curio use this
(e.g., the Socket.makefile()
method).
-
class
curio.io.
Stream
(fileobj)¶ Create a file-like wrapper around an existing file. fileobj must be in in binary mode. The file is placed into non-blocking mode using
os.set_blocking(fileobj.fileno())
.
The following methods are available on instances of Stream
:
-
await
Stream.
read
(maxbytes=- 1)¶ Read up to maxbytes of data on the file. If omitted, reads as much data as is currently available and returns it.
-
await
Stream.
readall
()¶ Return all of the data that’s available on a file up until an EOF is read.
-
Stream.readline():
Read a single line of data from a file.
-
await
Stream.
write
(bytes)¶ Write all of the data in bytes to the file.
-
await
Stream.
writelines
(lines)¶ Writes all of the lines in lines to the file.
-
await
Stream.
flush
()¶ Flush any unwritten data from buffers to the file.
-
await
Stream.
close
()¶ Flush any unwritten data and close the file.
-
Stream.
settimeout
(seconds)¶ Sets a timeout on all file I/O operations. If seconds is None, any previously set timeout is cleared.
-
Stream.
blocking
()¶ A context manager that temporarily places the stream into blocking mode and returns the raw file object used internally. This can be used if you need to pass the file to existing synchronous code.
Other file methods (e.g., tell()
, seek()
, etc.) are available
if the supplied fileobj
also has them.
Streams may be used as an asynchronous context manager. For example:
async with stream:
# Use the stream object
...
# stream closed here
socket wrapper module¶
The curio.socket
module provides a wrapper around the built-in
socket
module–allowing it to be used as a stand-in in
curio-related code. The module provides exactly the same
functionality except that certain operations have been replaced by
coroutine equivalents.
-
curio.socket.
socket
(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)¶ Creates a
curio.io.Socket
wrapper the aroundsocket
objects created in the built-insocket
module. The arguments for construction are identical and have the same meaning. The resultingsocket
instance is set in non-blocking mode.
The following module-level functions have been modified so that the returned socket objects are compatible with curio:
-
curio.socket.
socketpair
(family=AF_UNIX, type=SOCK_STREAM, proto=0)¶
-
curio.socket.
fromfd
(fd, family, type, proto=0)¶
-
curio.socket.
create_connection
(address, timeout, source_address)¶
The following module-level functions have been redefined as coroutines so that they don’t block the kernel when interacting with DNS:
-
await
curio.socket.
getaddrinfo
(host, port, family=0, type=0, proto=0, flags=0)¶
-
await
curio.socket.
getfqdn
(name)¶
-
await
curio.socket.
gethostbyname
(hostname)¶
-
await
curio.socket.
gethostbyname_ex
(hostname)¶
-
await
curio.socket.
gethostname
()¶
-
await
curio.socket.
gethostbyaddr
(ip_address)¶
-
await
curio.socket.
getnameinfo
(sockaddr, flags)¶
subprocess wrapper module¶
The curio.subprocess
module provides a wrapper around the built-in subprocess
module.
-
class
curio.subprocess.
Popen
(*args, **kwargs)¶ A wrapper around the
subprocess.Popen
class. The same arguments are accepted. On the resultingPopen
instance, thestdin
,stdout
, andstderr
file attributes have been wrapped by thecurio.io.Stream
class. You can use these in an asynchronous context.
Here is an example of using Popen
to read streaming output off of a
subprocess with curio:
import curio
from curio import subprocess
async def main():
p = subprocess.Popen(['ping', 'www.python.org'], stdout=subprocess.PIPE)
async for line in p.stdout:
print('Got:', line.decode('ascii'), end='')
if __name__ == '__main__':
kernel = curio.Kernel()
kernel.add_task(main())
kernel.run()
The following methods of Popen
have been replaced by asynchronous equivalents:
-
await
Popen.
wait
(timeout=None)¶ Wait for a subprocess to exit.
-
await
Popen.
communicate
(input=b'', timeout=None)¶ Communicate with the subprocess, sending the specified input on standard input. Returns a tuple
(stdout, stderr)
with the resulting output of standard output and standard error.
The following functions are also available. They accept the same arguments as their
equivalents in the subprocess
module:
-
await
curio.subprocess.
run
(args, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False)¶ Run a command in a subprocess. Returns a
subprocess.CompletedProcess
instance.
-
await
curio.subprocess.
check_output
(args, stdout=None, stderr=None, shell=False, timeout=None)¶ Run a command in a subprocess and return the resulting output. Raises a
subprocess.CalledProcessError
exception if an error occurred.
ssl wrapper module¶
The curio.ssl
module provides curio-compatible functions for creating an SSL
layer around curio sockets. The following functions are redefined (and have the same
calling signature as their counterparts in the standard ssl
module:
-
curio.ssl.
wrap_socket
(*args, **kwargs)¶
-
await
curio.ssl.
get_server_certificate
(*args, **kwargs)¶
-
curio.ssl.
create_default_context
(*args, **kwargs)¶
The SSLContext
class is also redefined and modified so that the wrap_socket()
method
returns a socket compatible with curio.
Don’t attempt to use the curio.ssl
module without a careful read of Python’s official documentation
at https://docs.python.org/3/library/ssl.html.
For the purposes of curio, it is usually easier to apply SSL to a connection using some of the high level network functions described in the next section. For example, here’s how you make an outgoing SSL connection:
sock = await curio.open_connection('www.python.org', 443,
ssl=True,
server_hostname='www.python.org')
Here’s how you might define a server that uses SSL:
import curio
from curio import ssl
KEYFILE = "privkey_rsa" # Private key
CERTFILE = "certificate.crt" # Server certificat
async def handler(client, addr):
...
if __name__ == '__main__':
kernel = curio.Kernel()
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=CERTFILE, keyfile=KEYFILE)
kernel.run(curio.run_server('', 10000, handler, ssl=ssl_context))
High Level Networking¶
The following functions are provided to simplify common tasks related to making network connections and writing servers.
-
await
curio.
open_connection
(host, port, *, ssl=None, source_addr=None, server_hostname=None, timeout=None)¶ Creates an outgoing connection to a server at host and port. This connection is made using the
socket.create_connection()
function and might be IPv4 or IPv6 depending on the network configuration (although you’re not supposed to worry about it). ssl specifies whether or not SSL should be used. ssl can beTrue
or an instance ofcurio.ssl.SSLContext
. source_addr specifies the source address to use on the socket. server_hostname specifies the hostname to check against when making SSL connections. It is highly advised that this be supplied to avoid man-in-the-middle attacks.
-
open_unix_connection(path, *, ssl=None, server_hostname=None):
Creates a connection to a Unix domain socket with optional SSL applied.
-
curio.
create_server
(host, port, client_connected_task, *, family=AF_INET, backlog=100, ssl=None, reuse_address=True)¶ Creates a
Server
instance for receiving TCP connections on a given host and port. client_connected_task is a coroutine that is to be called to handle each connection. Family specifies the address family and is eithersocket.AF_INET
orsocket.AF_INET6
. backlog is the argument to thesocket.socket.listen()
method. ssl specifies ancurio.ssl.SSLContext
instance to use. reuse_address specifies whether to reuse a previously used port. This method does not actually start running the created server. To do that, you need to useServer.serve_forever()
method on the returnedServer
instance. Normally, it’s easier to userun_server()
instead. Only usecreate_server()
if you need to do something else with theServer
instance for some reason.
-
await
curio.
run_server
(host, port, client_connected_task, *, family=AF_INET, backlog=100, ssl=None, reuse_address=True)¶ Creates a server using
create_server()
and immediately starts running it.
-
curio.
create_unix_server
(path, client_connected_task, *, backlog=100, ssl=None)¶ Creates a Unix domain server on a given path. client_connected_task is a coroutine to execute on each connection. backlog is the argument given to the
socket.socket.listen()
method. ssl is an optionalcurio.ssl.SSLContext
to use if setting up an SSL connection. Returns aServer
instance. To start running the server useServer.serve_forever()
.
-
await
curio.
run_unix_server
(path, client_connected_task, *, backlog=100, ssl=None)¶ Creates a Unix domain server using
create_unix_server()
and immediately starts running it.
Synchronization Primitives¶
The following synchronization primitives are available. Their behavior is
similar to their equivalents in the threading
module. None of these
primitives are safe to use with threads created by the built-in threading
module.
-
class
curio.
Event
¶ An event object.
Event
instances support the following methods:
-
Event.
is_set
()¶ Return
True
if the event is set.
-
Event.
clear
()¶ Clear the event.
-
await
Event.
wait
(timeout=None)¶ Wait for the event with an optional timeout.
-
await
Event.
set
()¶ Set the event. Wake all waiting tasks (if any).
Here is an Event example:
import curio
async def waiter(evt):
print('Waiting')
await evt.wait()
print('Running')
async def main():
evt = curio.Event()
# Create a few waiters
await curio.new_task(waiter(evt))
await curio.new_task(waiter(evt))
await curio.new_task(waiter(evt))
await curio.sleep(5)
# Set the event. All waiters should wake up
await evt.set()
-
class
curio.
Lock
¶ This class provides a mutex lock. It can only be used in tasks. It is not thread safe.
Lock
instances support the following methods:
-
await
Lock.
acquire
(timeout=None)¶ Acquire the lock.
-
await
Lock.
release
()¶ Release the lock.
-
Lock.
locked
()¶ Return
True
if the lock is currently held.
The preferred way to use a Lock is as an asynchronous context manager. For example:
import curio
async def child(lck):
async with lck:
print('Child has the lock')
async def main():
lck = curio.Lock()
async with lck:
print('Parent has the lock')
await curio.new_task(child(lck))
await curio.sleep(5)
-
class
curio.
Semaphore
(value=1)¶ Create a semaphore. Semaphores are based on a counter. If the count is greater than 0, it is decremented and the semaphore is acquired. Otherwise, the task has to wait until the count is incremented by another task.
-
class
curio.
BoundedSemaphore
(value=1)¶ This class is the same as
Semaphore
except that the semaphore value is not allowed to exceed the initial value.
Semaphores support the following methods:
-
await
Semaphore.
acquire
(timeout=None)¶ Acquire the semaphore, decrementing its count. Blocks if the count is 0.
-
await
Semaphore.
release
()¶ Release the semaphore, incrementing its count. Never blocks.
-
Semaphore.
locked
()¶ Return
True
if the Semaphore is locked.
Like locks, semaphores support the async-with statement. A common use of semaphores is to limit the number of tasks performing an operation. For example:
import curio
async def worker(sema):
async with sema:
print('Working')
await curio.sleep(5)
async def main():
sema = curio.Semaphore(2) # Allow two tasks at a time
# Launch a bunch of tasks
for n in range(10):
await curio.new_task(worker(sema))
# After this point, you should see two tasks at a time run. Every 5 seconds.
-
class
curio.
Condition
(lock=None)¶ Condition variable. lock is the underlying lock to use. If none is provided, then a
Lock
object is used.
Condition
objects support the following methods:
-
Condition.
locked
()¶ Return
True
if the condition variable is locked.
-
await
Condition.
acquire
(*, timeout=None)¶ Acquire the condition variable lock.
-
await
Condition.
release
()¶ Release the condition variable lock.
-
await
Condition.
wait
(*, timeout=None)¶ Wait on the condition variable with a timeout. This releases the underlying lock.
-
await
Condition.
wait_for
(predicate, *, timeout=None)¶ Wait on the condition variable until a supplied predicate function returns
True
. predicate is a callable that takes no arguments.
-
await
curio.
notify
(n=1)¶ Notify one or more tasks, causing them to wake from the
Condition.wait()
method.
-
await
curio.
notify_all
()¶ Notify all tasks waiting on the condition.
Condition variables are often used to signal between tasks. For example, here is a simple producer-consumer scenario:
import curio
from collections import deque
items = deque()
async def consumer(cond):
while True:
async with cond:
while not items:
await cond.wait() # Wait for items
item = items.popleft()
print('Got', item)
async def producer(cond):
for n in range(10):
async with cond:
items.append(n)
await cond.notify()
await curio.sleep(1)
async def main():
cond = curio.Condition()
await curio.new_task(producer(cond))
await curio.new_task(consumer(cond))
Queues¶
If you want to communicate between tasks, it’s usually much easier to use
a Queue
instead.
-
class
curio.
Queue
(maxsize=0)¶ Creates a queue with a maximum number of elements in maxsize. If not specified, the queue can hold an unlimited number of items.
A Queue
instance supports the following methods:
-
Queue.
empty
()¶ Returns
True
if the queue is empty.
-
Queue.
full
()¶ Returns
True
if the queue is full.
-
Queue.
qsize
()¶ Return the number of items currently in the queue.
-
await
Queue.
get
(*, timeout=None)¶ Returns an item from the queue with an optional timeout.
-
await
Queue.
put
(item, *, timeout=None)¶ Puts an item on the queue with an optional timeout in the event that the queue is full.
-
await
Queue.
join
(*, timeout=None)¶ Wait for all of the elements put onto a queue to be processed. Consumers must call
Queue.task_done()
to indicate completion.
-
await
Queue.
task_done
()¶ Indicate that processing has finished for an item. If all items have been processed and there are tasks waiting on
Queue.join()
they will be awakened.
Here is an example of using queues in a producer-consumer problem:
import curio
async def producer(queue):
for n in range(10):
await queue.put(n)
await queue.join()
print('Producer done')
async def consumer(queue):
while True:
item = await queue.get()
print('Consumer got', item)
await queue.task_done()
async def main():
q = curio.Queue()
prod_task = await curio.new_task(producer(q))
cons_task = await curio.new_task(consumer(q))
await prod_task.join()
await cons_task.cancel()
Signals¶
Unix signals are managed by the SignalSet
class. This class operates
as an asynchronous context manager. The recommended usage looks like this:
import signal
async def coro():
...
async with SignalSet(signal.SIGUSR1, signal.SIGHUP) as sigset:
...
signo = await sigset.wait()
print('Got signal', signo)
...
For all of the statements inside the context-manager, signals will be queued. The sigset.wait() operation will return received signals one at a time from the signal queue.
Signals can be temporarily ignored using a normal context manager:
async def coro():
...
sigset = SignalSet(signal.SIGINT)
with sigset.ignore():
...
# Signals temporarily disabled
...
Caution: Signal handling only works if the curio kernel is running in Python’s main execution thread. Also, mixing signals with threads, subprocesses, and other concurrency primitives is a well-known way to make your head shatter into small pieces. Tread lightly.
-
class
curio.
SignalSet
(*signals)¶ Represents a set of one or more Unix signals. signals is a list of signals as defined in the built-in
signal
module.
The following methods are available on a SignalSet
instance. They
may only be used in coroutines.
-
await
SignalSet.
wait
(*, timeout=None)¶ Wait for one of the signals in the signal set to arrive. Returns the signal number of the signal received. timeout gives an optional timeout. Normally this method is used inside an
async with
statement because this allows received signals to be properly queued. It can be used in isolation, but be aware that this will only catch a single signal right at that line of code. It’s possible that you might lose signals if you use this method outside of a context manager.
-
SignalSet.
ignore
()¶ Returns a context manager wherein signals from the signal set are temporarily disabled. Note: This is a normal context manager– use a normal
with
-statement.
Exceptions¶
-
exception
curio.
CancelledError
¶ Exception raised in a coroutine if it has been cancelled. If ignored, the coroutine is silently terminated. If caught, a coroutine can continue to run, but should work to terminate execution. Ignoring a cancellation request and continuing to execute will likely cause some other task to hang.
-
exception
curio.
TaskError
¶ Exception raised by the
Task.join()
method if an uncaught exception occurs in a task. It is a chained exception. The__cause__
attribute contains the exception that causes the task to fail.
Low-level Kernel System Calls¶
The following system calls are available, but not typically used directly in user code. They are used to implement higher level objects such as locks, socket wrappers, and so forth. If you find yourself using these, you’re probably doing something wrong–or implementing a new curio primitive.
-
await
curio.
_read_wait
(fileobj, timeout=None)¶ Sleep until data is available for reading on fileobj. fileobj is any file-like object with a fileno() method. timeout gives an optional timeout in seconds.
-
await
curio.
_write_wait
(fileobj, timeout=None)¶ Sleep until data can be written on fileobj. fileobj is any file-like object with a fileno() method. timeout gives an optional timeout in seconds.
-
await
curio.
_future_wait
(future, timeout=None)¶ Sleep until a result is set on future. future is an instance of
concurrent.futures.Future
.
-
await
curio.
_join_task
(task, timeout=None)¶ Sleep until the indicated task completes. The final return value of the task is returned if it completed successfully. If the task failed with an exception, a
TaskError
exception is raised. This is a chained exception. TheTaskError.__cause__
attribute of this exception contains the actual exception raised in the task.
-
await
curio.
_cancel_task
(task, exc=CancelledError, timeout=None)¶ Cancel the indicated task. Does not return until the task actually completes the cancellation. Note: It is usually better to use
Task.cancel()
instead of this function.
-
await
curio.
_wait_on_queue
(kqueue, state_name, timeout=None)¶ Go to sleep on a queue. kqueue is an instance of a kernel queue which is typically a
collections.deque
instance. state_name is the name of the wait state (used in debugging).
-
await
curio.
_reschedule_tasks
(kqueue, n=1, value=None, exc=None)¶ Reschedule one or more tasks from a queue. kqueue is an instance of a kernel queue. n is the number of tasks to release. value and exc specify the return value or exception to raise in the task when it resumes execution.
-
await
curio.
_sigwatch
(sigset)¶ Tell the kernel to start queuing signals in the given signal set sigset.
-
await
curio.
_sigunwatch
(sigset)¶ Tell the kernel to stop queuing signals in the given signal set.
-
await
curio.
_sigwait
(sigset, timeout=None)¶ Wait for the arrival of a signal in a given signal set. Returns the signal number of the received signal.
Again, you’re unlikely to use any of these functions directly. However, here’s a small taste
of how they’re used. For example, the curio.io.Socket.recv()
method
looks roughly like this:
class Socket(object):
...
def recv(self, maxbytes):
while True:
try:
return self._socket.recv(maxbytes)
except BlockingIOError:
await _read_wait(self._socket)
...
This method first tries to receive data. If none is available, the _read_wait()
call is used to
put the task to sleep until reading can be performed. When it awakes, the receive operation
is retried. Just to emphasize, the _read_wait()
doesn’t actually perform any I/O. It’s just
scheduling a task for it.
Here’s an example of code that implements a mutex lock:
from collections import deque
class Lock(object):
def __init__(self):
self._acquired = False
self._waiting = deque()
async def acquire(self):
if self._acquired:
await _wait_on_queue(self._waiting, 'LOCK_ACQUIRE')
async def release(self):
if self._waiting:
await _reschedule_tasks(self._waiting, n=1)
else:
self._acquired = False
In this code you can see the low-level calls related to managing a wait queue. This code is not significantly different than the actual implementation of a lock in curio. If you wanted to make your own task synchronization objects, the code would look similar.