Note: you can try this tutorial in Binder.

Select: the quitessential operation

Channels with their put and get operations can already be used to build rather complicated systems. Now we introduce the operation select, which hugely increases the expressive power of channels further.

Basically, if we have channels c1, c2 and c3 and we write

result = await select(c1, c2, c3)

then result will hold the result of one and only one get operation on c1, c2 and c3. Only one operation will be attempted. If we have several operations that can be completed at the same time, only one will complete, and the non-completing ones will not run at all. This is in constrast with, say, asyncio.wait.

Let’s have some examples:

In [2]:
import asyncio
import aiochan as ac

async def main():
    c1 = ac.Chan(name='c1').add(1, 2, 3).close()
    c2 = ac.Chan(name='c2').add('a', 'b', 'c').close()
    c3 = ac.Chan(name='c3').add('x', 'y', 'z').close()

    result, chan = await, c2, c3)
    print('the result is', result)
    print('the result is from', chan)

    async for v in c1:
        print('c1 still has value:', v)

    async for v in c2:
        print('c2 still has value:', v)

    async for v in c3:
        print('c3 still has value:', v)
the result is 1
the result is from Chan<c1 140594564470264>
c1 still has value: 2
c1 still has value: 3
c2 still has value: a
c2 still has value: b
c2 still has value: c
c3 still has value: x
c3 still has value: y
c3 still has value: z

Here we have also used some new operations on channels:

  • We can give names to channels: Chan(name='some name'),
  • ch.add(...) adds elements to channels on the background when it is possible to do so,
  • close closes the channel immediately, but all pending puts (here those by add) will still have an opportunity to complete,
  • add and close can be chained as both these methods return the channel.

And for our select:

  • it returns a tuple: the value together with the channel that is involved,
  • if several operations can all be completed, which one is completed is non-deterministic (try running the above script several times to see).

Actually, it is not only get operations that can be selected:

In [3]:
async def receive(c):
    r = await c.get()
    print('received', r, 'on', c)

async def main():
    c1 = ac.Chan(name='c1')
    c2 = ac.Chan(name='c2')


    await ac.nop()

    result, chan = await, 'A'), (c2, 'B'))
    print('select completes on', chan)
select completes on Chan<c2 140594564470264>
received B on Chan<c2 140594564470264>

we see that if we give an argument like (chan, value) it is interpreted as a put operation akin to chan.put(value). Again, one and only one operation will complete. You can also mix get operations with put operations.

Also, if you are careful, you will have noticed that we have inserted a nop above. If it is not there, the select will always complete on c1. You may want to think about why.

The more non-trivial the application is, the more use of select you can find. One of its simplest use is for stopping many workers at once:

In [5]:
async def worker(out, stop, tag):
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.1)
        result, c = await, (out, '%s-%s' % (tag, i)), priority=True)
        if c is stop:
            print('%s stopped' % tag)

async def consumer(c, stop):
    while True:
        result, c = await, c, priority=True)
        if c is stop:
            print('consumer stopped')
            print('received', result)

async def main():
    c = ac.Chan()
    stop = ac.Chan()
    for i in range(3):
        ac.go(worker(c, stop, 'worker%s' % i))
    ac.go(consumer(c, stop))
    await asyncio.sleep(0.6)
    await asyncio.sleep(0.2)
received worker0-1
received worker1-1
received worker2-1
received worker0-2
received worker1-2
received worker2-2
received worker0-3
received worker1-3
received worker2-3
received worker0-4
received worker1-4
received worker2-4
received worker0-5
received worker1-5
received worker2-5
consumer stopped
worker0 stopped
worker1 stopped
worker2 stopped

Here stopping can actually be signaled by simply closing the fan-in-fan-out channel, but in more complicated situations (for example, closing down in response to any one of several conditions) select is essential.

We have also seen that select takes an argument priority, which defaults to False. Here we set it to true, so when several operations become completable at the same time, it is guaranteed that the leftmost one will complete. Here we use this priority select to make sure that the operation stops at the earliest instance.

There is also a default argument to select, which if set, will produce the set value immediately when none of the operations can be completed immediately, with None in the place where you usually find the completed channel. The following snippet completes the put only if it can be done immediately:

In [6]:
async def main():
    ch = ac.Chan()
    result, c = await, 'value'), default='giveup')
    if c is None:
        print('put cannot complete immediately and was given up')
put cannot complete immediately and was given up

By now you should know how to use select. It certainly seems a simple enough operation to understand. However, select is non-trivial. What we mean by that is that, using only channels and put and get operations on channels, it is not possible to write a select clone that has the correct semantics. The semantics of select has three requirements:

  • at least one operation is completed;
  • at most one operation is completed;
  • an operation is completed at the earliest possible time (no unnecessary waiting).

Writing an operation satisfying any two of the above is easy. But to satisfy all three, you need to submit your operations to the involved channels at the time of calling, and at the time of completion of any operation, you will need to notify all other operations to cancel themselves. Thus the semantics of select must be implemented inside Chan, not outside.

select is actually the whole point of aiochan: asyncio do provide us with futures, locks and things, which are somewhat like our channels superficially. But select is conspicuously missing. Channels are made to make select possible. Rob Pike, the inventor of golang, mentions select as the reason why channels in golang is provided by the language itself instead of as a library.

Another way of putting this is: in the hierarchy of concurrency operations, select is on the highest level of abstraction. Consider the following:

  • unlike python, Java was designed with concurrency (with threads) in mind, so thread primitives exist from the beginning;
  • but as working with the primitives were too low-level, java.util.concurrent was added as a libray;
  • Clojure runs on the JVM so can use all the Java concurrency libraries. Clojure also adds its own flavour of concurrency-friendly constructs in the form of refs (atoms, agents, and even STM)
  • BUT Clojure still needs core.async as a library, since writing a select that works well on all the previous stuff is not possible! (By the way, select is called alt!, alts!, alt!! and alts!! in core.async. Yes there are four of them.)

By the way, python has a built-in library called select, and a higher-level one doing essentially the same thing called selectors. But these libraries only work with files or sockets, not plain python objects, and the availability of the various operations in theses libraries depend on the operating system. That is because the library just offloads it work to system calls. Usually we think of system calls as pretty low level. How many times have you encountered some abstraction that is provided by the lower-level operating system but not by the higher-level programming language?

To recap:

  • The select operator completes exactly one operation from the given operations,
  • select can be used as a control structure,
  • select is non-trivial.

Useful constructs:

  • select
  • aiochan.Chan.add
  • Channel operations can be chained (more to come)