Note: you can try this tutorial in Binder.

Channel buffering

Channels are used as meating points: a pair of put/get operations can only be completed when both involved parties are present at the same time. However, in practice, it is sometimes necessary to relax this requirement a little bit so that puts can complete immediately even when no one is there to get, and if previous puts are available, a get can complete without a put sitting there waiting. This behaviour where we further decouple put and get operations in time is called buffering.

In principle, buffering can be done without any further support from the library: we can have a pair of channels, ch_in and ch_out, acting as one, and a coroutine busy working in the background, promptly getting values from ch_in whenever they come in and store them onto some data structure, and at the same time feeding values to getters of ch_out whenever they come by. You can do this as an exercise (hint: use select).

However, to reduce clutter, and to improve performance, Chan has built-in support for buffer. In aiochan, buffering is always bounded: you have to decide at the onset how much pending stuff stored in your buffer you can tolerate. Some languages like Erlang nominally support unbounded buffering as the default, but the limit imposed by the operating system is always there.

Let’s have an example:

[2]:
import asyncio
import aiochan as ac

async def main():
    c = ac.Chan(1)

    await c.put('a')
    result = await c.get()
    print('result', result)

ac.run(main())
result a

As we can see, a buffered channel is created by having a positive number as the argument to the channel constructor. In this example, if there were no buffer, the example would deadlock: the first await would never complete.

The positive number to the constructor signifies the size of the buffer. In the example, the size is one, so if we have two puts in a row the example would block.

This is an example of fixed length buffers. The constructor call Chan(1) is actually a shorthand for Chan('f', 1), 'f' for fixed length. These buffers block on put when they are full.

Fixed length buffers are often used to implement back-pressure:

[3]:
async def worker(c):
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.05)
        print('producing', i)
        await c.put(i)

async def consumer(c):
    while True:
        await asyncio.sleep(0.2)
        result = await c.get()
        print('consuming', result)

async def main():
    c = ac.Chan(3)
    ac.go(worker(c))
    ac.go(consumer(c))
    await asyncio.sleep(1)

ac.run(main())
producing 1
producing 2
producing 3
consuming 1
producing 4
producing 5
consuming 2
producing 6
consuming 3
producing 7
consuming 4
producing 8

Here, producers and consumers are working at different rates. We want to ensure the consumer always have something to work with, so producers have to work ahead of consumers, but we also want to ensure that producers don’t work so fast that the consumers can never catch up. A buffer solves the problem well. Our buffering solution still works even if the time taken to produce/consume items are somewhat random: within bounds, appropriate buffering can ensure minimal waiting while preventing producing and consuming rates from diverging.

In situations that getters just can’t keep up with putters and you definitely cannot tolerate blocking for producers (maybe because you don’t control the producers), you have to make some compromise and use some other kinds of buffers which will discard some elements in exchange for non-blocking puts. We have built-in support for two of them: dropping buffers will just silent drop any more incoming puts when they become full:

[4]:
async def main():
    c = ac.Chan('d', 2) # 'd' for 'dropping'
    await c.put(1)
    await c.put(2)
    await c.put(3)
    c.close()
    async for v in c:
        print(v)

ac.run(main())
1
2

Look: the last value is missing. We also have sliding buffers, which when full, will drop the earliest pending value:

[5]:
async def main():
    c = ac.Chan('s', 2) # 'd' for 'dropping'
    await c.put(1)
    await c.put(2)
    await c.put(3)
    c.close()
    async for v in c:
        print(v)

ac.run(main())
2
3

At the beginning we have said that channels are used to circumvent the use of locks and semaphores so that our programs are easier to develop and easier to reason about. Well, sometimes locks and semaphores are the most natural solutions to a problem. And in such situations, buffered channels can be used as locks and semaphores.

An example:

[6]:
async def worker(lock, tag):
    while True:
        await lock.get()
        print('%s is now working' % tag)
        await asyncio.sleep(0.1)
        await lock.put(True)

async def main():
    lock = ac.Chan(1).add(True)
    for i in range(10):
        ac.go(worker(lock, i))
    await asyncio.sleep(1)

ac.run(main())
0 is now working
1 is now working
2 is now working
3 is now working
4 is now working
5 is now working
6 is now working
7 is now working
8 is now working
9 is now working

In 1 second, only 10 operations complete even though we have 10 workers, whose maximum productivity is 100 operations in 1 second. In the presence of the lock, work becomes serial.

Using a buffer size greater than 1 gives you a semaphore, which in our case increases the throughput:

[7]:
async def worker(lock, tag):
    while True:
        await lock.get()
        print('%s is now working' % tag)
        await asyncio.sleep(0.1)
        await lock.put(True)

async def main():
    lock = ac.Chan(2).add(True, True)
    for i in range(10):
        ac.go(worker(lock, i))
    await asyncio.sleep(1)

ac.run(main())
0 is now working
1 is now working
2 is now working
3 is now working
4 is now working
5 is now working
6 is now working
7 is now working
8 is now working
9 is now working
0 is now working
1 is now working
2 is now working
3 is now working
4 is now working
5 is now working
6 is now working
7 is now working
8 is now working
9 is now working

But why would you want to use channels as locks when you can use the builtin locks from asyncio? Consistency and flexibility. Remember select? Now we can select on locks! You can do all kinds of funky stuff with select and locks:

[9]:
import random

async def worker(locks, tag):
    while True:
        _, lock = await ac.select(*locks)
        print('%s working' % tag)
        await asyncio.sleep(0.1)
        await lock.put(True)

async def main():
    locks = [ac.Chan(1, name='lock%s' % i).add(True) for i in range(3)]
    for i in range(3):
        ac.go(worker(locks, 'worker-%s' % i))
    await asyncio.sleep(0.5)

ac.run(main())
worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working

Now the worker proceeds whenever it can get its hand on any of a sequence of locks. With 3 locks we got 15 units of work done in half a second. You can change to 2 locks, in which case only 10 units of work would be done.

To recap:

  • Channels support buffering.
  • Fixed length buffering blocks on put when full, whereas dropping and sliding buffering never blocks but may throw away items when full.
  • Buffering can be used to implement back-pressure.
  • Buffered channels can be used as locks and semaphores, and you can select on them.

Congratulations! Now you know almost everything you need to write non-trivial concurrency applications with aiochan. You are only limited by your imagination! Still, there are various patterns of concurrency programs that occur so often so that we have implemented them as additional functions and methods that you can readily use. None of them is essential, but using the provided convenience functions make your code easier to read and reason about.