Curio How-To ============ This document provides some recipes for using Curio to perform common tasks. How do you write a simple TCP server? ------------------------------------- Here is an example of a simple TCP echo server:: from curio import run, spawn, tcp_server async def echo_client(client, addr): print('Connection from', addr) while True: data = await client.recv(100000) if not data: break await client.sendall(data) print('Connection closed') if __name__ == '__main__': run(tcp_server('', 25000, echo_client)) This server uses sockets directly. If you want to a use a file-like streams interface, use the ``as_stream()`` method like this:: from curio import run, spawn, tcp_server async def echo_client(client, addr): print('Connection from', addr) s = client.as_stream() while True: data = await s.read(100000) if not data: break await s.write(data) print('Connection closed') if __name__ == '__main__': run(tcp_server('', 25000, echo_client)) How do you perform a blocking operation? ---------------------------------------- If you need to perform a blocking operation that runs outside of curio, use ``run_in_thread()``. For example:: import time import curio result = await curio.run_in_thread(time.sleep, 100) How do you perform a CPU intensive operation? --------------------------------------------- If you need to run a CPU-intensive operation, you can either run it in a thread (see above) or have it run in a separate process. For example:: import curio def fib(n): if n <= 2: return 1 else: return fib(n-1) + fib(n-2) ... result = await curio.run_in_process(fib, 40) How do you apply a timeout? --------------------------- You can make any curio operation timeout using ``timeout_after(seconds, coro)``. For example:: from curio import timeout_after, TaskTimeout try: result = await timeout_after(5, coro(args)) except TaskTimeout: print('Timed out') Since wrapping a timeout in an exception is common, you can also use ``ignore_after()`` which returns ``None`` instead. For example:: from curio import ignore_after result = await ignore_after(5, coro(args)) if result is None: print('Timeout out') How can a timeout be applied to a block of statements? ------------------------------------------------------ Use the ``timeout_after()`` or ``ignore_after()`` functions as a context manager. For example:: async with timeout_after(5): statement1 statement2 ... This is a cumulative timeout applied to the entire block. After the specified number of seconds has elapsed, a ``TaskTimeout`` exception will be raised in the current operation blocking in curio. How do you shield a coroutine from cancellation? ------------------------------------------------ The easiest way to shield a coroutine from cancellation is to spawn it as a separate task. For example:: async def func(): ... child = await spawn(coro(args)) result = await child.join() ... Cancellation only applies to the immediate task on which it is performed. So, if the outer coroutine ``func()`` is cancelled, the inner task created by ``spawn()`` will continue to run to completion. How do you make cancellation apply to child tasks? -------------------------------------------------- If you want to make a parent coroutine cancel all of its children when it's cancelled, it needs to keep track of the children and cancel them explicitly. For example:: async def func(): ... child = await spawn(coro(args)) try: ... ... await child.join() except CancelledError: await child.cancel() How does a coroutine get its enclosing Task instance? ----------------------------------------------------- Use the ``current_task()`` function like this:: from curio import current_task ... async def func(): ... myself = await current_task() ... Once you have a reference to the ``Task``, it can be passed around and use in other operations. For example, a different task could use it to cancel. How can tasks communicate? -------------------------- Similar to threads, one of the easiest ways to communicate between tasks is to use a queue. For example:: import curio async def producer(queue): for n in range(10): await queue.put(n) await queue.join() print('Producer done') async def consumer(queue): while True: item = await queue.get() print('Consumer got', item) await queue.task_done() async def main(): q = curio.Queue() prod_task = await curio.spawn(producer(q)) cons_task = await curio.spawn(consumer(q)) await prod_task.join() await cons_task.cancel() if __name__ == '__main__': curio.run(main()) How can a task and a thread communicate? ---------------------------------------- The most straightforward way to communicate between curio tasks and threads is to use a thread-safe queue from the built-in ``queue`` module in combination with the curio ``abide()`` function:: import curio import queue import threading # A thread - standard python def producer(queue): for n in range(10): queue.put(n) queue.join() print('Producer done') # A task - Curio async def consumer(queue): while True: item = await curio.abide(queue.get) print('Consumer got', item) await curio.abide(queue.task_done) async def main(): q = queue.Queue() # Thread-safe queue prod_task = threading.Thread(target=producer, args=(q,)).start() cons_task = await curio.spawn(consumer(q)) prod_task.join() await cons_task.cancel() if __name__ == '__main__': curio.run(main()) ``abide()`` is a special function that allows curio to adapt to foreign functions and synchronization primitives typically associated with threads and processes. In this example, the ``queue.get()`` and ``queue.task_done()`` functions will be executed in a separate thread to avoid blocking other running tasks. It is important to note that ``curio.abide(queue.get)`` is not a typo. ``abide()`` will call the supplied function on your behalf. If you try to use ``curio.abide(queue.get())``, you'll not only block the whole kernel loop, you'll also get an error when it finally wakes up. There's one other interesting feature of ``abide()``. If you use it on a coroutine that's native to curio, it will still work. Thus, the ``consumer()`` function above would actually work if the supplied queue is either a ``Queue`` from the built-in ``queue`` module or an async compatible ``Queue`` provided by curio. It's magic. How can coroutines and threads share a common lock? --------------------------------------------------- A lock can be shared as the lock in question is one from the ``threading`` module and you use the curio ``abide()`` function. For example:: import threading import curio lock = threading.Lock() # Must be a thread-lock # Function running in a thread def func(): ... with lock: critical_section ... # Coroutine running curio async def coro(): ... async with curio.abide(lock): critical_section ... ``curio.abide()`` adapts the given lock to work safely inside curio. If given a thread-lock, the various locking operations are executed in threads to avoid blocking other curio tasks. How do you run external commands in a subprocess? ------------------------------------------------- Curio provides it's own version of the subprocess module. Use the ``check_output()`` function as you would in normal Python code. For example:: from curio import subprocess async def func(): ... out = await subprocess.check_output(['cmd','arg1','arg2','arg3']) ... The ``check_output()`` function takes the same arguments and raises the same exceptions as its standard library counterpart. The underlying implementation is built entirely using the async I/O primitives of curio. It's fast and no backing threads are used. How can you communicate with a subprocess over a pipe? ------------------------------------------------------ Use the ``curio.subprocess`` module just like you would use the normal ``subprocess`` module. For example:: from curio import subprocess async def func(): ... p = subprocess.Popen(['cmd', 'arg1', 'arg2', ...], stdin=subprocess.PIPE, stdout=subprocess.PIPE) await p.stdin.write(b'Some data') ... resp = await p.stdout.read(maxsize) In this example, the ``p.stdin`` and ``p.stdout`` streams are replaced by curio-compatible file streams. You use the same I/O operations as before, but make sure you preface them with ``await``.