Testing made easier with trio.testing

The trio.testing module provides various utilities to make it easier to test Trio code. Unlike the other submodules in the trio namespace, trio.testing is not automatically imported when you do import trio; you must import trio.testing explicitly.

Test harness integration

@trio.testing.trio_test

Time and timeouts

trio.testing.MockClock is a Clock with a few tricks up its sleeve to help you efficiently test code involving timeouts:

  • By default, it starts at time 0, and clock time only advances when you explicitly call jump(). This provides an extremely controllable clock for testing.

  • You can set rate to 1.0 if you want it to start running in real time like a regular clock. You can stop and start the clock within a test. You can set rate to 10.0 to make clock time pass at 10x real speed (so e.g. await trio.sleep(10) returns after 1 second).

  • But even more interestingly, you can set autojump_threshold to zero or a small value, and then it will watch the execution of the run loop, and any time things have settled down and everyone’s waiting for a timeout, it jumps the clock forward to that timeout. In many cases this allows natural-looking code involving timeouts to be automatically run at near full CPU utilization with no changes. (Thanks to fluxcapacitor for this awesome idea.)

  • And of course these can be mixed and matched at will.

Regardless of these shenanigans, from “inside” Trio the passage of time still seems normal so long as you restrict yourself to Trio’s time functions (see Time and clocks). Below is an example demonstrating two different ways of making time pass quickly. Notice how in both cases, the two tasks keep a consistent view of reality and events happen in the expected order, despite being wildly divorced from real time:

# across-realtime.py

import time
import trio
import trio.testing

YEAR = 365 * 24 * 60 * 60  # seconds

async def task1():
    start = trio.current_time()

    print("task1: sleeping for 1 year")
    await trio.sleep(YEAR)

    duration = trio.current_time() - start
    print("task1: woke up; clock says I've slept {} years"
          .format(duration / YEAR))

    print("task1: sleeping for 1 year, 100 times")
    for _ in range(100):
        await trio.sleep(YEAR)

    duration = trio.current_time() - start
    print("task1: slept {} years total".format(duration / YEAR))

async def task2():
    start = trio.current_time()

    print("task2: sleeping for 5 years")
    await trio.sleep(5 * YEAR)

    duration = trio.current_time() - start
    print("task2: woke up; clock says I've slept {} years"
          .format(duration / YEAR))

    print("task2: sleeping for 500 years")
    await trio.sleep(500 * YEAR)

    duration = trio.current_time() - start
    print("task2: slept {} years total".format(duration / YEAR))

async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(task1)
        nursery.start_soon(task2)

def run_example(clock):
    real_start = time.perf_counter()
    trio.run(main, clock=clock)
    real_duration = time.perf_counter() - real_start
    print("Total real time elapsed: {} seconds".format(real_duration))

print("Clock where time passes at 100 years per second:\n")
run_example(trio.testing.MockClock(rate=100 * YEAR))

print("\nClock where time automatically skips past the boring parts:\n")
run_example(trio.testing.MockClock(autojump_threshold=0))

Output:

Clock where time passes at 100 years per second:

task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0365006048232317 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0572111969813704 years
task2: sleeping for 500 years
task1: slept 104.77677842136472 years total
task2: slept 505.25014589075 years total
Total real time elapsed: 5.053582429885864 seconds

Clock where time automatically skips past the boring parts:

task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0 years
task2: sleeping for 500 years
task1: slept 101.0 years total
task2: slept 505.0 years total
Total real time elapsed: 0.019298791885375977 seconds
class trio.testing.MockClock(rate=0.0, autojump_threshold=inf)

A user-controllable clock suitable for writing tests.

Parameters
rate

How many seconds of clock time pass per second of real time. Default is 0.0, i.e. the clock only advances through manuals calls to jump() or when the autojump_threshold is triggered. You can assign to this attribute to change it.

autojump_threshold

The clock keeps an eye on the run loop, and if at any point it detects that all tasks have been blocked for this many real seconds (i.e., according to the actual clock, not this clock), then the clock automatically jumps ahead to the run loop’s next scheduled timeout. Default is math.inf, i.e., to never autojump. You can assign to this attribute to change it.

Basically the idea is that if you have code or tests that use sleeps and timeouts, you can use this to make it run much faster, totally automatically. (At least, as long as those sleeps/timeouts are happening inside Trio; if your test involves talking to external service and waiting for it to timeout then obviously we can’t help you there.)

You should set this to the smallest value that lets you reliably avoid “false alarms” where some I/O is in flight (e.g. between two halves of a socketpair) but the threshold gets triggered and time gets advanced anyway. This will depend on the details of your tests and test environment. If you aren’t doing any I/O (like in our sleeping example above) then just set it to zero, and the clock will jump whenever all tasks are blocked.

Note

If you use autojump_threshold and wait_all_tasks_blocked at the same time, then you might wonder how they interact, since they both cause things to happen after the run loop goes idle for some time. The answer is: wait_all_tasks_blocked takes priority. If there’s a task blocked in wait_all_tasks_blocked, then the autojump feature treats that as active task and does not jump the clock.

jump(seconds)

Manually advance the clock by the given number of seconds.

Parameters

seconds (float) – the number of seconds to jump the clock forward.

Raises

ValueError – if you try to pass a negative value for seconds.

Inter-task ordering

class trio.testing.Sequencer

A convenience class for forcing code in different tasks to run in an explicit linear order.

Instances of this class implement a __call__ method which returns an async context manager. The idea is that you pass a sequence number to __call__ to say where this block of code should go in the linear sequence. Block 0 starts immediately, and then block N doesn’t start until block N-1 has finished.

Example

An extremely elaborate way to print the numbers 0-5, in order:

async def worker1(seq):
    async with seq(0):
        print(0)
    async with seq(4):
        print(4)

async def worker2(seq):
    async with seq(2):
        print(2)
    async with seq(5):
        print(5)

async def worker3(seq):
    async with seq(1):
        print(1)
    async with seq(3):
        print(3)

async def main():
   seq = trio.testing.Sequencer()
   async with trio.open_nursery() as nursery:
       nursery.start_soon(worker1, seq)
       nursery.start_soon(worker2, seq)
       nursery.start_soon(worker3, seq)
await trio.testing.wait_all_tasks_blocked(cushion=0.0)

Block until there are no runnable tasks.

This is useful in testing code when you want to give other tasks a chance to “settle down”. The calling task is blocked, and doesn’t wake up until all other tasks are also blocked for at least cushion seconds. (Setting a non-zero cushion is intended to handle cases like two tasks talking to each other over a local socket, where we want to ignore the potential brief moment between a send and receive when all tasks are blocked.)

Note that cushion is measured in real time, not the Trio clock time.

If there are multiple tasks blocked in wait_all_tasks_blocked(), then the one with the shortest cushion is the one woken (and this task becoming unblocked resets the timers for the remaining tasks). If there are multiple tasks that have exactly the same cushion, then all are woken.

You should also consider trio.testing.Sequencer, which provides a more explicit way to control execution ordering within a test, and will often produce more readable tests.

Example

Here’s an example of one way to test that Trio’s locks are fair: we take the lock in the parent, start a child, wait for the child to be blocked waiting for the lock (!), and then check that we can’t release and immediately re-acquire the lock:

async def lock_taker(lock):
    await lock.acquire()
    lock.release()

async def test_lock_fairness():
    lock = trio.Lock()
    await lock.acquire()
    async with trio.open_nursery() as nursery:
        nursery.start_soon(lock_taker, lock)
        # child hasn't run yet, we have the lock
        assert lock.locked()
        assert lock._owner is trio.lowlevel.current_task()
        await trio.testing.wait_all_tasks_blocked()
        # now the child has run and is blocked on lock.acquire(), we
        # still have the lock
        assert lock.locked()
        assert lock._owner is trio.lowlevel.current_task()
        lock.release()
        try:
            # The child has a prior claim, so we can't have it
            lock.acquire_nowait()
        except trio.WouldBlock:
            assert lock._owner is not trio.lowlevel.current_task()
            print("PASS")
        else:
            print("FAIL")

Streams

Connecting to an in-process socket server

await trio.testing.open_stream_to_socket_listener(socket_listener)

Connect to the given SocketListener.

This is particularly useful in tests when you want to let a server pick its own port, and then connect to it:

listeners = await trio.open_tcp_listeners(0)
client = await trio.testing.open_stream_to_socket_listener(listeners[0])
Parameters

socket_listener (SocketListener) – The SocketListener to connect to.

Returns

a stream connected to the given listener.

Return type

SocketStream

Virtual, controllable streams

One particularly challenging problem when testing network protocols is making sure that your implementation can handle data whose flow gets broken up in weird ways and arrives with weird timings: localhost connections tend to be much better behaved than real networks, so if you only test on localhost then you might get bitten later. To help you out, Trio provides some fully in-memory implementations of the stream interfaces (see The abstract Stream API), that let you write all kinds of interestingly evil tests.

There are a few pieces here, so here’s how they fit together:

memory_stream_pair() gives you a pair of connected, bidirectional streams. It’s like socket.socketpair(), but without any involvement from that pesky operating system and its networking stack.

To build a bidirectional stream, memory_stream_pair() uses two unidirectional streams. It gets these by calling memory_stream_one_way_pair().

memory_stream_one_way_pair(), in turn, is implemented using the low-ish level classes MemorySendStream and MemoryReceiveStream. These are implementations of (you guessed it) trio.abc.SendStream and trio.abc.ReceiveStream that on their own, aren’t attached to anything – “sending” and “receiving” just put data into and get data out of a private internal buffer that each object owns. They also have some interesting hooks you can set, that let you customize the behavior of their methods. This is where you can insert the evil, if you want it. memory_stream_one_way_pair() takes advantage of these hooks in a relatively boring way: it just sets it up so that when you call send_all, or when you close the send stream, then it automatically triggers a call to memory_stream_pump(), which is a convenience function that takes data out of a MemorySendStream´s buffer and puts it into a MemoryReceiveStream´s buffer. But that’s just the default – you can replace this with whatever arbitrary behavior you want.

Trio also provides some specialized functions for testing completely unbuffered streams: lockstep_stream_one_way_pair() and lockstep_stream_pair(). These aren’t customizable, but they do exhibit an extreme kind of behavior that’s good at catching out edge cases in protocol implementations.

API details

class trio.testing.MemorySendStream(send_all_hook=None, wait_send_all_might_not_block_hook=None, close_hook=None)

An in-memory SendStream.

Parameters
  • send_all_hook – An async function, or None. Called from send_all(). Can do whatever you like.

  • wait_send_all_might_not_block_hook – An async function, or None. Called from wait_send_all_might_not_block(). Can do whatever you like.

  • close_hook – A synchronous function, or None. Called from close() and aclose(). Can do whatever you like.

send_all_hook
wait_send_all_might_not_block_hook
close_hook

All of these hooks are also exposed as attributes on the object, and you can change them at any time.

await aclose()

Same as close(), but async.

close()

Marks this stream as closed, and then calls the close_hook (if any).

await get_data(max_bytes=None)

Retrieves data from the internal buffer, blocking if necessary.

Parameters

max_bytes (int or None) – The maximum amount of data to retrieve. None (the default) means to retrieve all the data that’s present (but still blocks until at least one byte is available).

Returns

If this stream has been closed, an empty bytearray. Otherwise, the requested data.

get_data_nowait(max_bytes=None)

Retrieves data from the internal buffer, but doesn’t block.

See get_data() for details.

Raises

trio.WouldBlock – if no data is available to retrieve.

await send_all(data)

Places the given data into the object’s internal buffer, and then calls the send_all_hook (if any).

await wait_send_all_might_not_block()

Calls the wait_send_all_might_not_block_hook (if any), and then returns immediately.

class trio.testing.MemoryReceiveStream(receive_some_hook=None, close_hook=None)

An in-memory ReceiveStream.

Parameters
  • receive_some_hook – An async function, or None. Called from receive_some(). Can do whatever you like.

  • close_hook – A synchronous function, or None. Called from close() and aclose(). Can do whatever you like.

receive_some_hook
close_hook

Both hooks are also exposed as attributes on the object, and you can change them at any time.

await aclose()

Same as close(), but async.

close()

Discards any pending data from the internal buffer, and marks this stream as closed.

put_data(data)

Appends the given data to the internal buffer.

put_eof()

Adds an end-of-file marker to the internal buffer.

await receive_some(max_bytes=None)

Calls the receive_some_hook (if any), and then retrieves data from the internal buffer, blocking if necessary.

trio.testing.memory_stream_pump(memory_send_stream, memory_receive_stream, *, max_bytes=None)

Take data out of the given MemorySendStream’s internal buffer, and put it into the given MemoryReceiveStream’s internal buffer.

Parameters
  • memory_send_stream (MemorySendStream) – The stream to get data from.

  • memory_receive_stream (MemoryReceiveStream) – The stream to put data into.

  • max_bytes (int or None) – The maximum amount of data to transfer in this call, or None to transfer all available data.

Returns

True if it successfully transferred some data, or False if there was no data to transfer.

This is used to implement memory_stream_one_way_pair() and memory_stream_pair(); see the latter’s docstring for an example of how you might use it yourself.

trio.testing.memory_stream_one_way_pair()

Create a connected, pure-Python, unidirectional stream with infinite buffering and flexible configuration options.

You can think of this as being a no-operating-system-involved Trio-streamsified version of os.pipe() (except that os.pipe() returns the streams in the wrong order – we follow the superior convention that data flows from left to right).

Returns

A tuple (MemorySendStream, MemoryReceiveStream), where the MemorySendStream has its hooks set up so that it calls memory_stream_pump() from its send_all_hook and close_hook.

The end result is that data automatically flows from the MemorySendStream to the MemoryReceiveStream. But you’re also free to rearrange things however you like. For example, you can temporarily set the send_all_hook to None if you want to simulate a stall in data transmission. Or see memory_stream_pair() for a more elaborate example.

trio.testing.memory_stream_pair()

Create a connected, pure-Python, bidirectional stream with infinite buffering and flexible configuration options.

This is a convenience function that creates two one-way streams using memory_stream_one_way_pair(), and then uses StapledStream to combine them into a single bidirectional stream.

This is like a no-operating-system-involved, Trio-streamsified version of socket.socketpair().

Returns

A pair of StapledStream objects that are connected so that data automatically flows from one to the other in both directions.

After creating a stream pair, you can send data back and forth, which is enough for simple tests:

left, right = memory_stream_pair()
await left.send_all(b"123")
assert await right.receive_some() == b"123"
await right.send_all(b"456")
assert await left.receive_some() == b"456"

But if you read the docs for StapledStream and memory_stream_one_way_pair(), you’ll see that all the pieces involved in wiring this up are public APIs, so you can adjust to suit the requirements of your tests. For example, here’s how to tweak a stream so that data flowing from left to right trickles in one byte at a time (but data flowing from right to left proceeds at full speed):

left, right = memory_stream_pair()
async def trickle():
    # left is a StapledStream, and left.send_stream is a MemorySendStream
    # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream
    while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1):
        # Pause between each byte
        await trio.sleep(1)
# Normally this send_all_hook calls memory_stream_pump directly without
# passing in a max_bytes. We replace it with our custom version:
left.send_stream.send_all_hook = trickle

And here’s a simple test using our modified stream objects:

async def sender():
    await left.send_all(b"12345")
    await left.send_eof()

async def receiver():
    async for data in right:
        print(data)

async with trio.open_nursery() as nursery:
    nursery.start_soon(sender)
    nursery.start_soon(receiver)

By default, this will print b"12345" and then immediately exit; with our trickle stream it instead sleeps 1 second, then prints b"1", then sleeps 1 second, then prints b"2", etc.

Pro-tip: you can insert sleep calls (like in our example above) to manipulate the flow of data across tasks… and then use MockClock and its autojump_threshold functionality to keep your test suite running quickly.

If you want to stress test a protocol implementation, one nice trick is to use the random module (preferably with a fixed seed) to move random numbers of bytes at a time, and insert random sleeps in between them. You can also set up a custom receive_some_hook if you want to manipulate things on the receiving side, and not just the sending side.

trio.testing.lockstep_stream_one_way_pair()

Create a connected, pure Python, unidirectional stream where data flows in lockstep.

Returns

A tuple (SendStream, ReceiveStream).

This stream has absolutely no buffering. Each call to send_all() will block until all the given data has been returned by a call to receive_some().

This can be useful for testing flow control mechanisms in an extreme case, or for setting up “clogged” streams to use with check_one_way_stream() and friends.

In addition to fulfilling the SendStream and ReceiveStream interfaces, the return objects also have a synchronous close method.

trio.testing.lockstep_stream_pair()

Create a connected, pure-Python, bidirectional stream where data flows in lockstep.

Returns

A tuple (StapledStream, StapledStream).

This is a convenience function that creates two one-way streams using lockstep_stream_one_way_pair(), and then uses StapledStream to combine them into a single bidirectional stream.

Testing custom stream implementations

Trio also provides some functions to help you test your custom stream implementations:

await trio.testing.check_one_way_stream(stream_maker, clogged_stream_maker)

Perform a number of generic tests on a custom one-way stream implementation.

Parameters
  • stream_maker – An async (!) function which returns a connected (SendStream, ReceiveStream) pair.

  • clogged_stream_maker – Either None, or an async function similar to stream_maker, but with the extra property that the returned stream is in a state where send_all and wait_send_all_might_not_block will block until receive_some has been called. This allows for more thorough testing of some edge cases, especially around wait_send_all_might_not_block.

Raises

AssertionError – if a test fails.

await trio.testing.check_two_way_stream(stream_maker, clogged_stream_maker)

Perform a number of generic tests on a custom two-way stream implementation.

This is similar to check_one_way_stream(), except that the maker functions are expected to return objects implementing the Stream interface.

This function tests a superset of what check_one_way_stream() checks – if you call this, then you don’t need to also call check_one_way_stream().

await trio.testing.check_half_closeable_stream(stream_maker, clogged_stream_maker)

Perform a number of generic tests on a custom half-closeable stream implementation.

This is similar to check_two_way_stream(), except that the maker functions are expected to return objects that implement the HalfCloseableStream interface.

This function tests a superset of what check_two_way_stream() checks – if you call this, then you don’t need to also call check_two_way_stream().

Virtual networking for testing

In the previous section you learned how to use virtual in-memory streams to test protocols that are written against Trio’s Stream abstraction. But what if you have more complicated networking code – the kind of code that makes connections to multiple hosts, or opens a listening socket, or sends UDP packets?

Trio doesn’t itself provide a virtual in-memory network implementation for testing – but trio.socket module does provide the hooks you need to write your own! And if you’re interested in helping implement a reusable virtual network for testing, then please get in touch.

Note that these APIs are actually in trio.socket and trio.abc, but we document them here because they’re primarily intended for testing.

trio.socket.set_custom_hostname_resolver(hostname_resolver)

Set a custom hostname resolver.

By default, Trio’s getaddrinfo() and getnameinfo() functions use the standard system resolver functions. This function allows you to customize that behavior. The main intended use case is for testing, but it might also be useful for using third-party resolvers like c-ares (though be warned that these rarely make perfect drop-in replacements for the system resolver). See trio.abc.HostnameResolver for more details.

Setting a custom hostname resolver affects all future calls to getaddrinfo() and getnameinfo() within the enclosing call to trio.run(). All other hostname resolution in Trio is implemented in terms of these functions.

Generally you should call this function just once, right at the beginning of your program.

Parameters

hostname_resolver (trio.abc.HostnameResolver or None) – The new custom hostname resolver, or None to restore the default behavior.

Returns

The previous hostname resolver (which may be None).

class trio.abc.HostnameResolver

If you have a custom hostname resolver, then implementing HostnameResolver allows you to register this to be used by Trio.

See trio.socket.set_custom_hostname_resolver().

abstractmethod await getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)

A custom implementation of getaddrinfo().

Called by trio.socket.getaddrinfo().

If host is given as a numeric IP address, then getaddrinfo() may handle the request itself rather than calling this method.

Any required IDNA encoding is handled before calling this function; your implementation can assume that it will never see U-labels like "café.com", and only needs to handle A-labels like b"xn--caf-dma.com".

abstractmethod await getnameinfo(sockaddr, flags)

A custom implementation of getnameinfo().

Called by trio.socket.getnameinfo().

trio.socket.set_custom_socket_factory(socket_factory)

Set a custom socket object factory.

This function allows you to replace Trio’s normal socket class with a custom class. This is very useful for testing, and probably a bad idea in any other circumstance. See trio.abc.HostnameResolver for more details.

Setting a custom socket factory affects all future calls to socket() within the enclosing call to trio.run().

Generally you should call this function just once, right at the beginning of your program.

Parameters

socket_factory (trio.abc.SocketFactory or None) – The new custom socket factory, or None to restore the default behavior.

Returns

The previous socket factory (which may be None).

class trio.abc.SocketFactory

If you write a custom class implementing the Trio socket interface, then you can use a SocketFactory to get Trio to use it.

See trio.socket.set_custom_socket_factory().

abstractmethod socket(family=None, type=None, proto=None)

Create and return a socket object.

Your socket object must inherit from trio.socket.SocketType, which is an empty class whose only purpose is to “mark” which classes should be considered valid Trio sockets.

Called by trio.socket.socket().

Note that unlike trio.socket.socket(), this does not take a fileno= argument. If a fileno= is specified, then trio.socket.socket() returns a regular Trio socket object instead of calling this method.

Testing checkpoints

with trio.testing.assert_checkpoints()

Use as a context manager to check that the code inside the with block either exits with an exception or executes at least one checkpoint.

Raises

AssertionError – if no checkpoint was executed.

Example

Check that trio.sleep() is a checkpoint, even if it doesn’t block:

with trio.testing.assert_checkpoints():
    await trio.sleep(0)
with trio.testing.assert_no_checkpoints()

Use as a context manager to check that the code inside the with block does not execute any checkpoints.

Raises

AssertionError – if a checkpoint was executed.

Example

Synchronous code never contains any checkpoints, but we can double-check that:

send_channel, receive_channel = trio.open_memory_channel(10)
with trio.testing.assert_no_checkpoints():
    send_channel.send_nowait(None)