Note: you can try this tutorial in Binder.

Channels

Now we know how to make corountines and schedule them for execution. As we said before, for the coroutines to do IO safely and in a principled way, we will use meeting points, which in aiochan is called Chan, for “channel”. Constructing a channel is easy:

In [2]:
import aiochan as ac
import asyncio

c = ac.Chan()
c
Out[2]:
Chan<_unk_0 140664273474752>

Now suppose we have a producer that can be tasked to producing items, and a consumer that can consume items. The IO in this case is the outputs of the producer outputs, and the inputs of the consumer, and these two are linked in a channel. In code:

In [3]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        await c.put('product ' + str(i))

async def consumer(c):
    while True:
        product = await c.get()
        print('obtained:', product)

async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')

ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.

We see that Chan has two methods put and get. put is used to put stuff into the channel, and get is for getting stuff out. Both of these return awaitables, signaling that doing IO with channels involves potential waiting, as two parties need to come together for either of them to proceed. Awaiting a get produces the value that is just put into the channel.

In aiochan, you cannot put something turns out to be None into a channel (other falsy values such as 0, 0.0, [], {}, False are ok). The reason is that a channel can be closed, and we need to signal somehow to the users of channel that it is closed, and we use None for the signal. Another possibility is throwing exceptions, but throwing exceptions in async code quickly gets very confusing. So, following Clojure’s core.async, we don’t allow None values in channels.

Speaking of closing channels, note that in our previous example, main just walks away when it is determined that everyone should go home. But producer and consumer are just left there dangling, which is very rude of main. Closing the channel is a polite way of notifying them both:

In [4]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product ' + str(i))
        if not still_open:
            print('producer goes home')
            break


async def consumer(c):
    while True:
        product = await c.get()
        if product is not None:
            print('obtained:', product)
        else:
            print('consumer goes home')
            break

async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home

We see that after the channel is closed with c.close(), awaiting a get will produce a None, whereas awaiting a put will produce False (before closing it will return True).

By the way, on python 3.6 and above, we can simplify our consumer a bit: here we are just iterating over the values in the channel one by one, which is exactly what an asynchronous iterator does. So we can write

In [5]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product ' + str(i))
        if not still_open:
            print('producer goes home')
            break


async def consumer(c):
    async for product in c:
        print('obtained:', product)
    print('consumer goes home')

async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home

It is also no longer necessary to test whether product is None: the iteration stops automatically when the channel is closed.

Note that in aiochan, a channel is just an object — in some circles, this is called a “first-class construct”. This means that it can be passed as arguments to functions (which we just did), returned from functions, or stored in a datastructure for later use (unlike, say, in Erlang). It is even possible to go meta: a channel containing channels.

For example, we can make our producer producing the channel instead:

In [6]:
async def producer():
    c = ac.Chan()

    async def work():
        i = 0
        while True:
            await asyncio.sleep(0.1) # producing stuff takes time
            i += 1
            still_open = await c.put('product ' + str(i))
            if not still_open:
                print('producer goes home')
                break

    ac.go(work())
    return c


async def consumer(c):
    async for product in c:
        print('obtained:', product)
    print('consumer goes home')

async def main():
    c = await producer()
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home

But in this case, not letting the producer producing its own channel actually has benefit: we can easily have several producers working in parallel:

In [8]:
async def producer(c, tag):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product %s from %s' % (i, tag))
        if not still_open:
            print('producer %s goes home' % tag)
            break


async def consumer(c):
    async for product in c:
        print('obtained:', product)
    print('consumer goes home')

async def main():
    c = ac.Chan()
    for i in range(3):
        ac.go(producer(c, 'p%s' % i))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
obtained: product 1 from p0
obtained: product 1 from p1
obtained: product 1 from p2
obtained: product 2 from p0
obtained: product 2 from p1
obtained: product 2 from p2
obtained: product 3 from p0
obtained: product 3 from p1
obtained: product 3 from p2
obtained: product 4 from p0
obtained: product 4 from p1
obtained: product 4 from p2
obtained: product 5 from p0
obtained: product 5 from p1
obtained: product 5 from p2
It is late, let us call it a day.
consumer goes home
producer p0 goes home
producer p1 goes home
producer p2 goes home

This is call fan-in: different producers fanning their products into the same channel. We can also have fan-out:

In [10]:
async def producer(c, tag):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product %s from %s' % (i, tag))
        if not still_open:
            print('producer %s goes home' % tag)
            break


async def consumer(c, tag):
    async for product in c:
        print('%s obtained: %s' % (tag, product))
    print('consumer %s goes home' % tag)

async def main():
    c = ac.Chan()
    for i in range(3):
        ac.go(producer(c, 'p%s' % i))
    for i in range(3):
        ac.go(consumer(c, 'c%s' % i))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
c0 obtained: product 1 from p0
c1 obtained: product 1 from p1
c2 obtained: product 1 from p2
c0 obtained: product 2 from p0
c1 obtained: product 2 from p1
c2 obtained: product 2 from p2
c0 obtained: product 3 from p0
c1 obtained: product 3 from p1
c2 obtained: product 3 from p2
c0 obtained: product 4 from p0
c1 obtained: product 4 from p1
c2 obtained: product 4 from p2
c0 obtained: product 5 from p0
c1 obtained: product 5 from p1
c2 obtained: product 5 from p2
It is late, let us call it a day.
consumer c0 goes home
consumer c1 goes home
consumer c2 goes home
producer p0 goes home
producer p1 goes home
producer p2 goes home

We see that works are divided between producers and consumers evenly automatically. Even if producers produce things at different rate, this fan-in, fan-out pattern will automatically do the right thing:

In [11]:
async def producer(c, tag, interval):
    i = 0
    while True:
        await asyncio.sleep(interval) # producing stuff takes time
        i += 1
        still_open = await c.put('product %s from %s' % (i, tag))
        if not still_open:
            print('producer %s goes home' % tag)
            break


async def consumer(c, tag):
    async for product in c:
        print('%s obtained: %s' % (tag, product))
    print('consumer %s goes home' % tag)

async def main():
    c = ac.Chan()
    for i in range(3):
        ac.go(producer(c, ('p%s' % i), interval=(i+1)*0.1))
    for i in range(3):
        ac.go(consumer(c, 'c%s' % i))
    await asyncio.sleep(1)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer

ac.run(main())
c0 obtained: product 1 from p0
c1 obtained: product 1 from p1
c2 obtained: product 2 from p0
c0 obtained: product 1 from p2
c1 obtained: product 3 from p0
c2 obtained: product 2 from p1
c0 obtained: product 4 from p0
c1 obtained: product 5 from p0
c2 obtained: product 2 from p2
c0 obtained: product 3 from p1
c1 obtained: product 6 from p0
c2 obtained: product 7 from p0
c0 obtained: product 4 from p1
c1 obtained: product 8 from p0
c2 obtained: product 3 from p2
c0 obtained: product 9 from p0
It is late, let us call it a day.
consumer c1 goes home
consumer c2 goes home
consumer c0 goes home
producer p1 goes home
producer p0 goes home

We see that jobs are still divided evenly between consumers, but more jobs come from faster producers.

To recap:

  • The construct for inter-coroutine communication is the channel.
  • Getting and putting to channels facilitates IO between coroutines.
  • Channels are first-class construct: we can pass them around, return them, or store them.
  • Channels can be closed.
  • None values are not allowed on channels.
  • Strategically closing channels can be used for execution control.
  • Fan-in and fan-out can be used for distributing works among different coroutines.

Useful constructs:

  • aiochan.Chan
  • aiochan.Chan.put
  • aiochan.Chan.get
  • aiochan.Chan.close