Coroutines and event loops¶
Programming languages begin in different ways, with different goals. Unfortunately, in the case of python, concurrency isn’t one of the goals (unlike, say, in the case of Erlang and Elixir). As a result, concurrency in python feel a bit foreign, cumbersome and unnatural.
In particular, when you start your python interactive interpreter, or when you run your python script, your program begins in an environment that can be roughly described as a single-threaded process, with no obvious ladder that allows you to climb up onto a concurrent environment. So what are “processes” and “threads”?
“Process” here means something different from the “process” in CSP: here it refers to an instance of your program that is executed for you by the operating system. A process has its own memory space and file descriptors provided by the operating system, and these are by default isolated from the other processes on the same system.
A process may have one or more threads of execution running at the same time, with each thread executing its part of the code in sequence, but different threads can share memory. If the sharing is not done carefully and in a principled way, as we have seen it will lead to non-deterministic results.
Python supports threads by providing the
module. But these are not too useful (compared to other languages, at
least): in python, the danger of threads stepping on each other is so
great that the default implementation, CPython, has a global interpreter
lock, or GIL, that ensures that only a single python statement can
execute at a given instance! Even with such a big fat lock always in
place, we still need locks in order to prevent unwanted interleaving! So
the GIL prevents us from using multiple processors. Locking has
overheads. To make things still worse, python schedules thread execution
in a somewhat unintuitive manner which results in favouring slow (or
CPU-intensive) operations over fast ones, the opposite of what most
operating system does. The end result: python code utilizing threads
often runs slower than those that do not.
Python also supports spawning processes within python itself using the
multiprocessing module. But by default processes don’t share memory
or resources, hence inter-process communicating is restricted and
cumbersome. The overhead of using processes is even greater than using
threads. Well, the picture isn’t very pretty.
Still, not being able to deal with multiple things at once is stupid.
Considering the situation we are in, it seems the best way to go forward
is to have something that is single-threaded (lower overhead, hopefully)
that can imitate multiple threads of execution. And there is something
along this way built into the language since python 3.4, it is called
Compared to plain python,
asyncio utilizes two further keywords
(since python 3.5, at least):
applied to functions (and methods). An example:
async def silly(x): print('executing silly with', x) return x+1
It seems normal enough. But when you call it:
<coroutine object silly at 0x7f902c9c0b48>
It seems that the function doesn’t execute but instead returns something
called a coroutine. Calling async function is a two step process: first
you call it normally and obtain a coroutine, and the coroutine needs to
be given to some scheduler, or event loop, for execution. The function
aiochan.run will do the scheduling and executing part for you:
import aiochan as ac ac.run(silly(2))
executing silly with 2
Every call to
ac.run creates a new event loop, which runs until the
passed in async function finishes executing.
All this ceremony of using
async and running coroutines in a strange
way sets up the stage for using the
import asyncio async def count(tag, n_max=3): i = 0 while i < n_max: await asyncio.sleep(0.5) i += 1 print(tag, i) ac.run(count('counter:'))
counter: 1 counter: 2 counter: 3
Whatever after the
await keyword must be an awaitable, which
roughly says that “this computation here will eventually produce
something, but maybe not right now, and while waiting you may go off and
do something else: the scheduler will let you continue when the result
Let’s see what happens when we run two counters (remember that the
count, when called, produces a coroutine, which is an
async def main(): await count('counter a:') await count('counter b:') ac.run(main())
counter a: 1 counter a: 2 counter a: 3 counter b: 1 counter b: 2 counter b: 3
Hmm … this doesn’t look very concurrent: the second counter starts counting only after the first counter finishes. But this is what we asked for: we awaited for the completion of the first counter!
To make the two counters execute together, we use the
function, which takes a coroutine and schedules it for execution but do
not wait for the result:
async def main(): ac.go(count('counter a:')) await count('counter b:') ac.run(main())
counter b: 1 counter a: 1 counter b: 2 counter a: 2 counter b: 3 counter a: 3
Much better now. Note that you must pass the coroutine to
aiochan.go for execution: calling the function itself has no effect
(other than a possible warning):
async def main(): count('counter a:') await count('counter b:') ac.run(main())
/home/zh217/.pyenv/versions/3.6.6/lib/python3.6/site-packages/ipykernel_launcher.py:2: RuntimeWarning: coroutine 'count' was never awaited
counter b: 1 counter b: 2 counter b: 3
What happens we replace both counter calls with
async def main(): ac.go(count('counter a:')) ac.go(count('counter b:')) ac.run(main())
Nothing happens! Remember that
ac.run returns when the coroutine
passed in returns, and our
main returns after having two counters
scheduled for execution, without actually executing them!
To make this clearer, note that if we sleep in the
main function at
the end, the two counters will be executed:
async def main(): ac.go(count('counter a:')) ac.go(count('counter b:')) await asyncio.sleep(3) ac.run(main())
counter a: 1 counter b: 1 counter a: 2 counter b: 2 counter a: 3 counter b: 3
If you have done thread-based programming before, you may think now that asyncio is no different from threading. This is not true. To illustrate, consider:
async def main(): async def work(): print('do work') print('before') ac.go(work()) print('after') ac.run(main())
before after do work
What you get is always
do work, in that
order. In some languages, using thread, it is possible to get garbled
texts, since the various calls to
else will get in your way when you are executing. (If you read the
documentations for asyncio, you will find that even things like locks
and semaphores are marked “not thread-safe” — they are only safe with
respect to the non-interrupting guaruantees provided by asyncio.)
So asyncio guarantees “no break unless await”, in other words, it
implements co-operative multitasking, in constrast to the pre-emptive
multitasking provided by threads (and your operating system) where your
work may be interrupted at any time. Programming co-operative
multitasking is in general easier, however it is sometimes necessary to
explicitly give up control to the scheduler in order for other tasks to
have a chance to run. In aiochan, you can await for
async def main(): async def work(): print('do work') print('before') ac.go(work()) await ac.nop() print('after') ac.run(main())
before do work after
Note the order. Also note that in this case, theoretically the order isn’t guaranteed — that you always get this order back should be considered an implementation detail.
Now you know how to make coroutines and run them. That roughly corresponds to the “sequential processes” that we talked about before (and remember not to touch global states). In the next section, we will learn about the “communicating” part.
- Python was not designed for concurrency.
- There are a number of ways to do concurrency in python: processes, threads, and asyncio event-loops.
- Asyncio event loops are single-threaded schedulers responsible for executing coroutines.
- Coroutine functions are made with
awaitkeywords. No interleaving of execution can occur unless an
awaitkeyword is encountered.
There is also
aiochan.run_in_thread, which is recommended for
aiochan.run is recommended when programming interactively.
ac.run in Jupyter notebooks¶
If you use
ac.run in Jupyter notebooks to run examples, sometimes
you will see warnings saying something like:
Task was destroyed but it is pending!
In the notebook, this is mostly harmless and it is due to our not always closing all our coroutines when our main coroutine exits. We do this in order to make our examples simple, and to avoid using constructs not already introduced. However, in production, any warning is a cause for concern.
Appendix: running async functions without aiochan¶
In our exposition we used the function
aiochan.run to run all our
async functions. How to do it with native python libraries? We use
asyncio event loops to do the execution:
import asyncio loop = asyncio.get_event_loop() result = loop.run_until_complete(silly(2)) print('the result is', result)
You can try running the above code. What you get depends on how you run it: if you run it in a script or in an interactive interpreter, then you will see printed:
executing silly with 2 result: 3
However, if you run it in jupyter notebooks or jupyterlab, there is a possibility that you will get an exception thrown at your face (or maybe not, it all depends):
--------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-6-69c4a90e423b> in <module>() 2 3 loop = asyncio.get_event_loop() ----> 4 result = loop.run_until_complete(silly(2)) 5 print('the result is', result) ~/.pyenv/versions/3.6.6/lib/python3.6/asyncio/base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): ~/.pyenv/versions/3.6.6/lib/python3.6/asyncio/base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running
So we already have a loop running? Ok, it is still possible to proceed in this case
import asyncio loop = asyncio.get_event_loop() result = loop.create_task(silly(2)) print('the result is', result)
the result is <Task pending coro=<silly() running at <ipython-input-2-709c439f84a4>:1>> executing silly with 2
So apparently our async function is executed now, but now we only get a task back, not the result itself! To get the result:
… which seems to be fine, but that is only because you are executing
it interactively. If you put this line directly below
--------------------------------------------------------------------------- InvalidStateError Traceback (most recent call last) <ipython-input-17-cda1a6adb807> in <module>() 4 result = loop.create_task(silly(2)) 5 print('the result is', result) ----> 6 result.result() InvalidStateError: Result is not set.
which tells you that you are calling the function too soon! You will need to wait a little bit (but if you do it wrong it will deadlock), or you create some future and set the result use a callback. If you really want to figure it out you can read the python documentations.
So now you believe me when I say that doing concurrency in python feels foreign, cumbersome and unnatural. Running everything in a script is a solution, but one of the appeal of python is its interactivity.
You can also replace calls to
asyncio.get_running_loop().create_task, but … what a mouthful!
asyncio.ensure_future is also a possibility, but in addition to its
questionable name, its use in spawning tasks for execution is deprecated
in python 3.7 in favour of
asyncio.create_task doesn’t exist prior to python 3.7. So … if you
intend to use
aiochan at all, we urge you to stay with