Testing made easier with trio.testing
¶
The trio.testing
module provides various utilities to make it
easier to test Trio code. Unlike the other submodules in the
trio
namespace, trio.testing
is not automatically
imported when you do import trio
; you must import trio.testing
explicitly.
Time and timeouts¶
trio.testing.MockClock
is a Clock
with a
few tricks up its sleeve to help you efficiently test code involving
timeouts:
By default, it starts at time 0, and clock time only advances when you explicitly call
jump()
. This provides an extremely controllable clock for testing.You can set
rate
to 1.0 if you want it to start running in real time like a regular clock. You can stop and start the clock within a test. You can setrate
to 10.0 to make clock time pass at 10x real speed (so e.g.await trio.sleep(10)
returns after 1 second).But even more interestingly, you can set
autojump_threshold
to zero or a small value, and then it will watch the execution of the run loop, and any time things have settled down and everyone’s waiting for a timeout, it jumps the clock forward to that timeout. In many cases this allows natural-looking code involving timeouts to be automatically run at near full CPU utilization with no changes. (Thanks to fluxcapacitor for this awesome idea.)And of course these can be mixed and matched at will.
Regardless of these shenanigans, from “inside” Trio the passage of time still seems normal so long as you restrict yourself to Trio’s time functions (see Time and clocks). Below is an example demonstrating two different ways of making time pass quickly. Notice how in both cases, the two tasks keep a consistent view of reality and events happen in the expected order, despite being wildly divorced from real time:
# across-realtime.py
import time
import trio
import trio.testing
YEAR = 365 * 24 * 60 * 60 # seconds
async def task1():
start = trio.current_time()
print("task1: sleeping for 1 year")
await trio.sleep(YEAR)
duration = trio.current_time() - start
print("task1: woke up; clock says I've slept {} years"
.format(duration / YEAR))
print("task1: sleeping for 1 year, 100 times")
for _ in range(100):
await trio.sleep(YEAR)
duration = trio.current_time() - start
print("task1: slept {} years total".format(duration / YEAR))
async def task2():
start = trio.current_time()
print("task2: sleeping for 5 years")
await trio.sleep(5 * YEAR)
duration = trio.current_time() - start
print("task2: woke up; clock says I've slept {} years"
.format(duration / YEAR))
print("task2: sleeping for 500 years")
await trio.sleep(500 * YEAR)
duration = trio.current_time() - start
print("task2: slept {} years total".format(duration / YEAR))
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(task1)
nursery.start_soon(task2)
def run_example(clock):
real_start = time.perf_counter()
trio.run(main, clock=clock)
real_duration = time.perf_counter() - real_start
print("Total real time elapsed: {} seconds".format(real_duration))
print("Clock where time passes at 100 years per second:\n")
run_example(trio.testing.MockClock(rate=100 * YEAR))
print("\nClock where time automatically skips past the boring parts:\n")
run_example(trio.testing.MockClock(autojump_threshold=0))
Output:
Clock where time passes at 100 years per second:
task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0365006048232317 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0572111969813704 years
task2: sleeping for 500 years
task1: slept 104.77677842136472 years total
task2: slept 505.25014589075 years total
Total real time elapsed: 5.053582429885864 seconds
Clock where time automatically skips past the boring parts:
task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0 years
task2: sleeping for 500 years
task1: slept 101.0 years total
task2: slept 505.0 years total
Total real time elapsed: 0.019298791885375977 seconds
-
class
trio.testing.
MockClock
(rate=0.0, autojump_threshold=inf)¶ A user-controllable clock suitable for writing tests.
- Parameters
autojump_threshold (float) – the initial
autojump_threshold
.
-
rate
¶ How many seconds of clock time pass per second of real time. Default is 0.0, i.e. the clock only advances through manuals calls to
jump()
or when theautojump_threshold
is triggered. You can assign to this attribute to change it.
-
autojump_threshold
¶ The clock keeps an eye on the run loop, and if at any point it detects that all tasks have been blocked for this many real seconds (i.e., according to the actual clock, not this clock), then the clock automatically jumps ahead to the run loop’s next scheduled timeout. Default is
math.inf
, i.e., to never autojump. You can assign to this attribute to change it.Basically the idea is that if you have code or tests that use sleeps and timeouts, you can use this to make it run much faster, totally automatically. (At least, as long as those sleeps/timeouts are happening inside Trio; if your test involves talking to external service and waiting for it to timeout then obviously we can’t help you there.)
You should set this to the smallest value that lets you reliably avoid “false alarms” where some I/O is in flight (e.g. between two halves of a socketpair) but the threshold gets triggered and time gets advanced anyway. This will depend on the details of your tests and test environment. If you aren’t doing any I/O (like in our sleeping example above) then just set it to zero, and the clock will jump whenever all tasks are blocked.
Warning
If you’re using
wait_all_tasks_blocked()
andautojump_threshold
together, then you have to be careful. Settingautojump_threshold
acts like a background task calling:while True: await wait_all_tasks_blocked( cushion=clock.autojump_threshold, tiebreaker=float("inf"))
This means that if you call
wait_all_tasks_blocked()
with a cushion larger than your autojump threshold, then your call towait_all_tasks_blocked()
will never return, because the autojump task will keep waking up before your task does, and each time it does it’ll reset your task’s timer. However, if your cushion and the autojump threshold are the same, then the autojump’s tiebreaker will prevent them from interfering (unless you also set your tiebreaker to infinity for some reason. Don’t do that). As an important special case: this means that if you set an autojump threshold of zero and usewait_all_tasks_blocked()
with the default zero cushion, then everything will work fine.Summary: you should set
autojump_threshold
to be at least as large as the largest cushion you plan to pass towait_all_tasks_blocked()
.
-
jump
(seconds)¶ Manually advance the clock by the given number of seconds.
- Parameters
seconds (float) – the number of seconds to jump the clock forward.
- Raises
ValueError – if you try to pass a negative value for
seconds
.
Inter-task ordering¶
-
class
trio.testing.
Sequencer
¶ A convenience class for forcing code in different tasks to run in an explicit linear order.
Instances of this class implement a
__call__
method which returns an async context manager. The idea is that you pass a sequence number to__call__
to say where this block of code should go in the linear sequence. Block 0 starts immediately, and then block N doesn’t start until block N-1 has finished.Example
An extremely elaborate way to print the numbers 0-5, in order:
async def worker1(seq): async with seq(0): print(0) async with seq(4): print(4) async def worker2(seq): async with seq(2): print(2) async with seq(5): print(5) async def worker3(seq): async with seq(1): print(1) async with seq(3): print(3) async def main(): seq = trio.testing.Sequencer() async with trio.open_nursery() as nursery: nursery.start_soon(worker1, seq) nursery.start_soon(worker2, seq) nursery.start_soon(worker3, seq)
-
await
trio.testing.
wait_all_tasks_blocked
(cushion=0.0, tiebreaker=0)¶ Block until there are no runnable tasks.
This is useful in testing code when you want to give other tasks a chance to “settle down”. The calling task is blocked, and doesn’t wake up until all other tasks are also blocked for at least
cushion
seconds. (Setting a non-zerocushion
is intended to handle cases like two tasks talking to each other over a local socket, where we want to ignore the potential brief moment between a send and receive when all tasks are blocked.)Note that
cushion
is measured in real time, not the Trio clock time.If there are multiple tasks blocked in
wait_all_tasks_blocked()
, then the one with the shortestcushion
is the one woken (and this task becoming unblocked resets the timers for the remaining tasks). If there are multiple tasks that have exactly the samecushion
, then the one with the lowesttiebreaker
value is woken first. And if there are multiple tasks with the samecushion
and the sametiebreaker
, then all are woken.You should also consider
trio.testing.Sequencer
, which provides a more explicit way to control execution ordering within a test, and will often produce more readable tests.Example
Here’s an example of one way to test that Trio’s locks are fair: we take the lock in the parent, start a child, wait for the child to be blocked waiting for the lock (!), and then check that we can’t release and immediately re-acquire the lock:
async def lock_taker(lock): await lock.acquire() lock.release() async def test_lock_fairness(): lock = trio.Lock() await lock.acquire() async with trio.open_nursery() as nursery: nursery.start_soon(lock_taker, lock) # child hasn't run yet, we have the lock assert lock.locked() assert lock._owner is trio.hazmat.current_task() await trio.testing.wait_all_tasks_blocked() # now the child has run and is blocked on lock.acquire(), we # still have the lock assert lock.locked() assert lock._owner is trio.hazmat.current_task() lock.release() try: # The child has a prior claim, so we can't have it lock.acquire_nowait() except trio.WouldBlock: assert lock._owner is not trio.hazmat.current_task() print("PASS") else: print("FAIL")
Streams¶
Connecting to an in-process socket server¶
-
await
trio.testing.
open_stream_to_socket_listener
(socket_listener)¶ Connect to the given
SocketListener
.This is particularly useful in tests when you want to let a server pick its own port, and then connect to it:
listeners = await trio.open_tcp_listeners(0) client = await trio.testing.open_stream_to_socket_listener(listeners[0])
- Parameters
socket_listener (SocketListener) – The
SocketListener
to connect to.- Returns
a stream connected to the given listener.
- Return type
Virtual, controllable streams¶
One particularly challenging problem when testing network protocols is making sure that your implementation can handle data whose flow gets broken up in weird ways and arrives with weird timings: localhost connections tend to be much better behaved than real networks, so if you only test on localhost then you might get bitten later. To help you out, Trio provides some fully in-memory implementations of the stream interfaces (see The abstract Stream API), that let you write all kinds of interestingly evil tests.
There are a few pieces here, so here’s how they fit together:
memory_stream_pair()
gives you a pair of connected,
bidirectional streams. It’s like socket.socketpair()
, but
without any involvement from that pesky operating system and its
networking stack.
To build a bidirectional stream, memory_stream_pair()
uses
two unidirectional streams. It gets these by calling
memory_stream_one_way_pair()
.
memory_stream_one_way_pair()
, in turn, is implemented using the
low-ish level classes MemorySendStream
and
MemoryReceiveStream
. These are implementations of (you
guessed it) trio.abc.SendStream
and
trio.abc.ReceiveStream
that on their own, aren’t attached to
anything – “sending” and “receiving” just put data into and get data
out of a private internal buffer that each object owns. They also have
some interesting hooks you can set, that let you customize the
behavior of their methods. This is where you can insert the evil, if
you want it. memory_stream_one_way_pair()
takes advantage of
these hooks in a relatively boring way: it just sets it up so that
when you call send_all
, or when you close the send stream, then it
automatically triggers a call to memory_stream_pump()
, which is
a convenience function that takes data out of a
MemorySendStream
´s buffer and puts it into a
MemoryReceiveStream
´s buffer. But that’s just the default –
you can replace this with whatever arbitrary behavior you want.
Trio also provides some specialized functions for testing completely
unbuffered streams: lockstep_stream_one_way_pair()
and
lockstep_stream_pair()
. These aren’t customizable, but they do
exhibit an extreme kind of behavior that’s good at catching out edge
cases in protocol implementations.
API details¶
-
class
trio.testing.
MemorySendStream
(send_all_hook=None, wait_send_all_might_not_block_hook=None, close_hook=None)¶ An in-memory
SendStream
.- Parameters
send_all_hook – An async function, or None. Called from
send_all()
. Can do whatever you like.wait_send_all_might_not_block_hook – An async function, or None. Called from
wait_send_all_might_not_block()
. Can do whatever you like.close_hook – A synchronous function, or None. Called from
close()
andaclose()
. Can do whatever you like.
-
send_all_hook
¶ -
wait_send_all_might_not_block_hook
¶ -
close_hook
¶ All of these hooks are also exposed as attributes on the object, and you can change them at any time.
-
close
()¶ Marks this stream as closed, and then calls the
close_hook
(if any).
-
await
get_data
(max_bytes=None)¶ Retrieves data from the internal buffer, blocking if necessary.
-
get_data_nowait
(max_bytes=None)¶ Retrieves data from the internal buffer, but doesn’t block.
See
get_data()
for details.- Raises
trio.WouldBlock – if no data is available to retrieve.
-
await
send_all
(data)¶ Places the given data into the object’s internal buffer, and then calls the
send_all_hook
(if any).
-
await
wait_send_all_might_not_block
()¶ Calls the
wait_send_all_might_not_block_hook
(if any), and then returns immediately.
-
class
trio.testing.
MemoryReceiveStream
(receive_some_hook=None, close_hook=None)¶ An in-memory
ReceiveStream
.- Parameters
receive_some_hook – An async function, or None. Called from
receive_some()
. Can do whatever you like.close_hook – A synchronous function, or None. Called from
close()
andaclose()
. Can do whatever you like.
-
receive_some_hook
¶ -
close_hook
¶ Both hooks are also exposed as attributes on the object, and you can change them at any time.
-
close
()¶ Discards any pending data from the internal buffer, and marks this stream as closed.
-
put_data
(data)¶ Appends the given data to the internal buffer.
-
put_eof
()¶ Adds an end-of-file marker to the internal buffer.
-
await
receive_some
(max_bytes=None)¶ Calls the
receive_some_hook
(if any), and then retrieves data from the internal buffer, blocking if necessary.
-
trio.testing.
memory_stream_pump
(memory_send_stream, memory_receive_stream, *, max_bytes=None)¶ Take data out of the given
MemorySendStream
’s internal buffer, and put it into the givenMemoryReceiveStream
’s internal buffer.- Parameters
memory_send_stream (MemorySendStream) – The stream to get data from.
memory_receive_stream (MemoryReceiveStream) – The stream to put data into.
max_bytes (int or None) – The maximum amount of data to transfer in this call, or None to transfer all available data.
- Returns
True if it successfully transferred some data, or False if there was no data to transfer.
This is used to implement
memory_stream_one_way_pair()
andmemory_stream_pair()
; see the latter’s docstring for an example of how you might use it yourself.
-
trio.testing.
memory_stream_one_way_pair
()¶ Create a connected, pure-Python, unidirectional stream with infinite buffering and flexible configuration options.
You can think of this as being a no-operating-system-involved Trio-streamsified version of
os.pipe()
(except thatos.pipe()
returns the streams in the wrong order – we follow the superior convention that data flows from left to right).- Returns
A tuple (
MemorySendStream
,MemoryReceiveStream
), where theMemorySendStream
has its hooks set up so that it callsmemory_stream_pump()
from itssend_all_hook
andclose_hook
.
The end result is that data automatically flows from the
MemorySendStream
to theMemoryReceiveStream
. But you’re also free to rearrange things however you like. For example, you can temporarily set thesend_all_hook
to None if you want to simulate a stall in data transmission. Or seememory_stream_pair()
for a more elaborate example.
-
trio.testing.
memory_stream_pair
()¶ Create a connected, pure-Python, bidirectional stream with infinite buffering and flexible configuration options.
This is a convenience function that creates two one-way streams using
memory_stream_one_way_pair()
, and then usesStapledStream
to combine them into a single bidirectional stream.This is like a no-operating-system-involved, Trio-streamsified version of
socket.socketpair()
.- Returns
A pair of
StapledStream
objects that are connected so that data automatically flows from one to the other in both directions.
After creating a stream pair, you can send data back and forth, which is enough for simple tests:
left, right = memory_stream_pair() await left.send_all(b"123") assert await right.receive_some() == b"123" await right.send_all(b"456") assert await left.receive_some() == b"456"
But if you read the docs for
StapledStream
andmemory_stream_one_way_pair()
, you’ll see that all the pieces involved in wiring this up are public APIs, so you can adjust to suit the requirements of your tests. For example, here’s how to tweak a stream so that data flowing from left to right trickles in one byte at a time (but data flowing from right to left proceeds at full speed):left, right = memory_stream_pair() async def trickle(): # left is a StapledStream, and left.send_stream is a MemorySendStream # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1): # Pause between each byte await trio.sleep(1) # Normally this send_all_hook calls memory_stream_pump directly without # passing in a max_bytes. We replace it with our custom version: left.send_stream.send_all_hook = trickle
And here’s a simple test using our modified stream objects:
async def sender(): await left.send_all(b"12345") await left.send_eof() async def receiver(): async for data in right: print(data) async with trio.open_nursery() as nursery: nursery.start_soon(sender) nursery.start_soon(receiver)
By default, this will print
b"12345"
and then immediately exit; with our trickle stream it instead sleeps 1 second, then printsb"1"
, then sleeps 1 second, then printsb"2"
, etc.Pro-tip: you can insert sleep calls (like in our example above) to manipulate the flow of data across tasks… and then use
MockClock
and itsautojump_threshold
functionality to keep your test suite running quickly.If you want to stress test a protocol implementation, one nice trick is to use the
random
module (preferably with a fixed seed) to move random numbers of bytes at a time, and insert random sleeps in between them. You can also set up a customreceive_some_hook
if you want to manipulate things on the receiving side, and not just the sending side.
-
trio.testing.
lockstep_stream_one_way_pair
()¶ Create a connected, pure Python, unidirectional stream where data flows in lockstep.
- Returns
A tuple (
SendStream
,ReceiveStream
).
This stream has absolutely no buffering. Each call to
send_all()
will block until all the given data has been returned by a call toreceive_some()
.This can be useful for testing flow control mechanisms in an extreme case, or for setting up “clogged” streams to use with
check_one_way_stream()
and friends.In addition to fulfilling the
SendStream
andReceiveStream
interfaces, the return objects also have a synchronousclose
method.
-
trio.testing.
lockstep_stream_pair
()¶ Create a connected, pure-Python, bidirectional stream where data flows in lockstep.
- Returns
A tuple (
StapledStream
,StapledStream
).
This is a convenience function that creates two one-way streams using
lockstep_stream_one_way_pair()
, and then usesStapledStream
to combine them into a single bidirectional stream.
Testing custom stream implementations¶
Trio also provides some functions to help you test your custom stream implementations:
-
await
trio.testing.
check_one_way_stream
(stream_maker, clogged_stream_maker)¶ Perform a number of generic tests on a custom one-way stream implementation.
- Parameters
stream_maker – An async (!) function which returns a connected (
SendStream
,ReceiveStream
) pair.clogged_stream_maker – Either None, or an async function similar to stream_maker, but with the extra property that the returned stream is in a state where
send_all
andwait_send_all_might_not_block
will block untilreceive_some
has been called. This allows for more thorough testing of some edge cases, especially aroundwait_send_all_might_not_block
.
- Raises
AssertionError – if a test fails.
-
await
trio.testing.
check_two_way_stream
(stream_maker, clogged_stream_maker)¶ Perform a number of generic tests on a custom two-way stream implementation.
This is similar to
check_one_way_stream()
, except that the maker functions are expected to return objects implementing theStream
interface.This function tests a superset of what
check_one_way_stream()
checks – if you call this, then you don’t need to also callcheck_one_way_stream()
.
-
await
trio.testing.
check_half_closeable_stream
(stream_maker, clogged_stream_maker)¶ Perform a number of generic tests on a custom half-closeable stream implementation.
This is similar to
check_two_way_stream()
, except that the maker functions are expected to return objects that implement theHalfCloseableStream
interface.This function tests a superset of what
check_two_way_stream()
checks – if you call this, then you don’t need to also callcheck_two_way_stream()
.
Virtual networking for testing¶
In the previous section you learned how to use virtual in-memory
streams to test protocols that are written against Trio’s
Stream
abstraction. But what if you have more
complicated networking code – the kind of code that makes connections
to multiple hosts, or opens a listening socket, or sends UDP packets?
Trio doesn’t itself provide a virtual in-memory network implementation
for testing – but trio.socket
module does provide the hooks you
need to write your own! And if you’re interested in helping implement
a reusable virtual network for testing, then please get in touch.
Note that these APIs are actually in trio.socket
and
trio.abc
, but we document them here because they’re primarily
intended for testing.
-
trio.socket.
set_custom_hostname_resolver
(hostname_resolver)¶ Set a custom hostname resolver.
By default, Trio’s
getaddrinfo()
andgetnameinfo()
functions use the standard system resolver functions. This function allows you to customize that behavior. The main intended use case is for testing, but it might also be useful for using third-party resolvers like c-ares (though be warned that these rarely make perfect drop-in replacements for the system resolver). Seetrio.abc.HostnameResolver
for more details.Setting a custom hostname resolver affects all future calls to
getaddrinfo()
andgetnameinfo()
within the enclosing call totrio.run()
. All other hostname resolution in Trio is implemented in terms of these functions.Generally you should call this function just once, right at the beginning of your program.
- Parameters
hostname_resolver (trio.abc.HostnameResolver or None) – The new custom hostname resolver, or None to restore the default behavior.
- Returns
The previous hostname resolver (which may be None).
-
class
trio.abc.
HostnameResolver
¶ If you have a custom hostname resolver, then implementing
HostnameResolver
allows you to register this to be used by Trio.See
trio.socket.set_custom_hostname_resolver()
.-
abstractmethod await
getaddrinfo
(host, port, family=0, type=0, proto=0, flags=0)¶ A custom implementation of
getaddrinfo()
.Called by
trio.socket.getaddrinfo()
.If
host
is given as a numeric IP address, thengetaddrinfo()
may handle the request itself rather than calling this method.Any required IDNA encoding is handled before calling this function; your implementation can assume that it will never see U-labels like
"café.com"
, and only needs to handle A-labels likeb"xn--caf-dma.com"
.
-
abstractmethod await
getnameinfo
(sockaddr, flags)¶ A custom implementation of
getnameinfo()
.Called by
trio.socket.getnameinfo()
.
-
abstractmethod await
-
trio.socket.
set_custom_socket_factory
(socket_factory)¶ Set a custom socket object factory.
This function allows you to replace Trio’s normal socket class with a custom class. This is very useful for testing, and probably a bad idea in any other circumstance. See
trio.abc.HostnameResolver
for more details.Setting a custom socket factory affects all future calls to
socket()
within the enclosing call totrio.run()
.Generally you should call this function just once, right at the beginning of your program.
- Parameters
socket_factory (trio.abc.SocketFactory or None) – The new custom socket factory, or None to restore the default behavior.
- Returns
The previous socket factory (which may be None).
-
class
trio.abc.
SocketFactory
¶ If you write a custom class implementing the Trio socket interface, then you can use a
SocketFactory
to get Trio to use it.See
trio.socket.set_custom_socket_factory()
.-
abstractmethod
socket
(family=None, type=None, proto=None)¶ Create and return a socket object.
Your socket object must inherit from
trio.socket.SocketType
, which is an empty class whose only purpose is to “mark” which classes should be considered valid Trio sockets.Called by
trio.socket.socket()
.Note that unlike
trio.socket.socket()
, this does not take afileno=
argument. If afileno=
is specified, thentrio.socket.socket()
returns a regular Trio socket object instead of calling this method.
-
abstractmethod
Testing checkpoints¶
-
with
trio.testing.
assert_checkpoints
()¶ Use as a context manager to check that the code inside the
with
block either exits with an exception or executes at least one checkpoint.- Raises
AssertionError – if no checkpoint was executed.
Example
Check that
trio.sleep()
is a checkpoint, even if it doesn’t block:with trio.testing.assert_checkpoints(): await trio.sleep(0)
-
with
trio.testing.
assert_no_checkpoints
()¶ Use as a context manager to check that the code inside the
with
block does not execute any checkpoints.- Raises
AssertionError – if a checkpoint was executed.
Example
Synchronous code never contains any checkpoints, but we can double-check that:
send_channel, receive_channel = trio.open_memory_channel(10) with trio.testing.assert_no_checkpoints(): send_channel.send_nowait(None)