Note: you can try this tutorial in Binder.

Coroutines and event loops

Programming languages begin in different ways, with different goals. Unfortunately, in the case of python, concurrency isn’t one of the goals (unlike, say, in the case of Erlang and Elixir). As a result, concurrency in python feel a bit foreign, cumbersome and unnatural.

In particular, when you start your python interactive interpreter, or when you run your python script, your program begins in an environment that can be roughly described as a single-threaded process, with no obvious ladder that allows you to climb up onto a concurrent environment. So what are “processes” and “threads”?

“Process” here means something different from the “process” in CSP: here it refers to an instance of your program that is executed for you by the operating system. A process has its own memory space and file descriptors provided by the operating system, and these are by default isolated from the other processes on the same system.

A process may have one or more threads of execution running at the same time, with each thread executing its part of the code in sequence, but different threads can share memory. If the sharing is not done carefully and in a principled way, as we have seen it will lead to non-deterministic results.

Python supports threads by providing the thread and threading module. But these are not too useful (compared to other languages, at least): in python, the danger of threads stepping on each other is so great that the default implementation, CPython, has a global interpreter lock, or GIL, that ensures that only a single python statement can execute at a given instance! Even with such a big fat lock always in place, we still need locks in order to prevent unwanted interleaving! So the GIL prevents us from using multiple processors. Locking has overheads. To make things still worse, python schedules thread execution in a somewhat unintuitive manner which results in favouring slow (or CPU-intensive) operations over fast ones, the opposite of what most operating system does. The end result: python code utilizing threads often runs slower than those that do not.

Python also supports spawning processes within python itself using the multiprocessing module. But by default processes don’t share memory or resources, hence inter-process communicating is restricted and cumbersome. The overhead of using processes is even greater than using threads. Well, the picture isn’t very pretty.

Still, not being able to deal with multiple things at once is stupid. Considering the situation we are in, it seems the best way to go forward is to have something that is single-threaded (lower overhead, hopefully) that can imitate multiple threads of execution. And there is something along this way built into the language since python 3.4, it is called asyncio.

Compared to plain python, asyncio utilizes two further keywords (since python 3.5, at least): async and await. async is applied to functions (and methods). An example:

In [2]:
async def silly(x):
    print('executing silly with', x)
    return x+1

It seems normal enough. But when you call it:

In [3]:
silly(2)
Out[3]:
<coroutine object silly at 0x7f902c9c0b48>

It seems that the function doesn’t execute but instead returns something called a coroutine. Calling async function is a two step process: first you call it normally and obtain a coroutine, and the coroutine needs to be given to some scheduler, or event loop, for execution. The function aiochan.run will do the scheduling and executing part for you:

In [4]:
import aiochan as ac
ac.run(silly(2))
executing silly with 2
Out[4]:
3

Every call to ac.run creates a new event loop, which runs until the passed in async function finishes executing.

All this ceremony of using async and running coroutines in a strange way sets up the stage for using the await keyword:

In [5]:
import asyncio

async def count(tag, n_max=3):
    i = 0
    while i < n_max:
        await asyncio.sleep(0.5)
        i += 1
        print(tag, i)

ac.run(count('counter:'))
counter: 1
counter: 2
counter: 3

Whatever after the await keyword must be an awaitable, which roughly says that “this computation here will eventually produce something, but maybe not right now, and while waiting you may go off and do something else: the scheduler will let you continue when the result is ready”.

Let’s see what happens when we run two counters (remember that the function count, when called, produces a coroutine, which is an awaitable):

In [6]:
async def main():
    await count('counter a:')
    await count('counter b:')

ac.run(main())
counter a: 1
counter a: 2
counter a: 3
counter b: 1
counter b: 2
counter b: 3

Hmm … this doesn’t look very concurrent: the second counter starts counting only after the first counter finishes. But this is what we asked for: we awaited for the completion of the first counter!

To make the two counters execute together, we use the aiochan.go function, which takes a coroutine and schedules it for execution but do not wait for the result:

In [7]:
async def main():
    ac.go(count('counter a:'))
    await count('counter b:')

ac.run(main())
counter b: 1
counter a: 1
counter b: 2
counter a: 2
counter b: 3
counter a: 3

Much better now. Note that you must pass the coroutine to aiochan.go for execution: calling the function itself has no effect (other than a possible warning):

In [8]:
async def main():
    count('counter a:')
    await count('counter b:')

ac.run(main())
/home/zh217/.pyenv/versions/3.6.6/lib/python3.6/site-packages/ipykernel_launcher.py:2: RuntimeWarning: coroutine 'count' was never awaited

counter b: 1
counter b: 2
counter b: 3

What happens we replace both counter calls with aiochan.go?

In [9]:
async def main():
    ac.go(count('counter a:'))
    ac.go(count('counter b:'))

ac.run(main())

Nothing happens! Remember that ac.run returns when the coroutine passed in returns, and our main returns after having two counters scheduled for execution, without actually executing them!

To make this clearer, note that if we sleep in the main function at the end, the two counters will be executed:

In [10]:
async def main():
    ac.go(count('counter a:'))
    ac.go(count('counter b:'))
    await asyncio.sleep(3)

ac.run(main())
counter a: 1
counter b: 1
counter a: 2
counter b: 2
counter a: 3
counter b: 3

If you have done thread-based programming before, you may think now that asyncio is no different from threading. This is not true. To illustrate, consider:

In [11]:
async def main():
    async def work():
        print('do work')
    print('before')
    ac.go(work())
    print('after')

ac.run(main())
before
after
do work

What you get is always before, after, and do work, in that order. In some languages, using thread, it is possible to get garbled texts, since the various calls to print (or whatever it is called) can step on each other. By contrast, asyncio event loops uses only a single thread, and it is guaranteed that unless you await, nothing else will get in your way when you are executing. (If you read the documentations for asyncio, you will find that even things like locks and semaphores are marked “not thread-safe” — they are only safe with respect to the non-interrupting guaruantees provided by asyncio.)

So asyncio guarantees “no break unless await”, in other words, it implements co-operative multitasking, in constrast to the pre-emptive multitasking provided by threads (and your operating system) where your work may be interrupted at any time. Programming co-operative multitasking is in general easier, however it is sometimes necessary to explicitly give up control to the scheduler in order for other tasks to have a chance to run. In aiochan, you can await for aiochan.nop() to achieve this:

In [12]:
async def main():
    async def work():
        print('do work')
    print('before')
    ac.go(work())
    await ac.nop()
    print('after')

ac.run(main())
before
do work
after

Note the order. Also note that in this case, theoretically the order isn’t guaranteed — that you always get this order back should be considered an implementation detail.

Now you know how to make coroutines and run them. That roughly corresponds to the “sequential processes” that we talked about before (and remember not to touch global states). In the next section, we will learn about the “communicating” part.

To recap:

  • Python was not designed for concurrency.
  • There are a number of ways to do concurrency in python: processes, threads, and asyncio event-loops.
  • Asyncio event loops are single-threaded schedulers responsible for executing coroutines.
  • Coroutine functions are made with async and await keywords. No interleaving of execution can occur unless an await keyword is encountered.

Useful functions:

  • aiochan.run
  • aiochan.go
  • aiochan.nop
  • asyncio.sleep

There is also aiochan.run_in_thread, which is recommended for scripts. aiochan.run is recommended when programming interactively.

Note about ac.run in Jupyter notebooks

If you use ac.run in Jupyter notebooks to run examples, sometimes you will see warnings saying something like:

Task was destroyed but it is pending!

In the notebook, this is mostly harmless and it is due to our not always closing all our coroutines when our main coroutine exits. We do this in order to make our examples simple, and to avoid using constructs not already introduced. However, in production, any warning is a cause for concern.

Appendix: running async functions without aiochan

In our exposition we used the function aiochan.run to run all our async functions. How to do it with native python libraries? We use asyncio event loops to do the execution:

import asyncio

loop = asyncio.get_event_loop()
result = loop.run_until_complete(silly(2))
print('the result is', result)

You can try running the above code. What you get depends on how you run it: if you run it in a script or in an interactive interpreter, then you will see printed:

executing silly with 2
result: 3

However, if you run it in jupyter notebooks or jupyterlab, there is a possibility that you will get an exception thrown at your face (or maybe not, it all depends):

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-6-69c4a90e423b> in <module>()
      2
      3 loop = asyncio.get_event_loop()
----> 4 result = loop.run_until_complete(silly(2))
      5 print('the result is', result)

~/.pyenv/versions/3.6.6/lib/python3.6/asyncio/base_events.py in run_until_complete(self, future)
    453         future.add_done_callback(_run_until_complete_cb)
    454         try:
--> 455             self.run_forever()
    456         except:
    457             if new_task and future.done() and not future.cancelled():

~/.pyenv/versions/3.6.6/lib/python3.6/asyncio/base_events.py in run_forever(self)
    407         self._check_closed()
    408         if self.is_running():
--> 409             raise RuntimeError('This event loop is already running')
    410         if events._get_running_loop() is not None:
    411             raise RuntimeError(

RuntimeError: This event loop is already running

So we already have a loop running? Ok, it is still possible to proceed in this case

In [13]:
import asyncio

loop = asyncio.get_event_loop()
result = loop.create_task(silly(2))
print('the result is', result)
the result is <Task pending coro=<silly() running at <ipython-input-2-709c439f84a4>:1>>
executing silly with 2

So apparently our async function is executed now, but now we only get a task back, not the result itself! To get the result:

In [14]:
result.result()
Out[14]:
3

… which seems to be fine, but that is only because you are executing it interactively. If you put this line directly below print, you most certainly will get:

---------------------------------------------------------------------------
InvalidStateError                         Traceback (most recent call last)
<ipython-input-17-cda1a6adb807> in <module>()
      4 result = loop.create_task(silly(2))
      5 print('the result is', result)
----> 6 result.result()

InvalidStateError: Result is not set.

which tells you that you are calling the function too soon! You will need to wait a little bit (but if you do it wrong it will deadlock), or you create some future and set the result use a callback. If you really want to figure it out you can read the python documentations.

So now you believe me when I say that doing concurrency in python feels foreign, cumbersome and unnatural. Running everything in a script is a solution, but one of the appeal of python is its interactivity.

You can also replace calls to aiochan.go with asyncio.get_running_loop().create_task, but … what a mouthful! asyncio.ensure_future is also a possibility, but in addition to its questionable name, its use in spawning tasks for execution is deprecated in python 3.7 in favour of asyncio.create_task. However, asyncio.create_task doesn’t exist prior to python 3.7. So … if you intend to use aiochan at all, we urge you to stay with aiochan.go.