Note: you can try this tutorial in Binder.

Parallelism and beyond

We discussed async_pipe and async_pipe_unordered in the context of trying to put more “concurrency” into our program by taking advantage of parallelism. What does “parallelism” mean here?

Facing the reality of python concurrency, again

With async_pipe and async_pipe_unordered, by giving them more coroutine instances to work with, we achieved higher throughput. But that is only because our coroutines are, in a quite literal sense, sleeping on the job: to simulate real jobs, we called await on asyncio.sleep. The event loop, faced with this await, just puts the coroutine on hold until it is ready to act again.

Now it is entirely possible that this behaviour — of not letting sleeping coroutines block the whole program — is all you need. In particular, if you are dealing with network connections or sockets and you are using a proper asyncio-based library, then “doing network work” isn’t too much from sleeping on the loop.

However, for other operations not tailored for asyncio, you will not get any speed-up with parallelism based on asyncio. Crucially, asyncio has no built-in support for file accesses.

Let’s see an example:

In [2]:
import asyncio
import time
import aiochan as ac

async def worker(n):
    time.sleep(0.1) # await asyncio.sleep(0.1)
    return n*2

async def main():
    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).async_pipe(10, worker).collect())
    print(asyncio.get_event_loop().time() - start)

ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
2.009612141060643

The only different than before (when we first introduced async_pipe) is that we replaced asyncio.sleep with time.sleep. With this change, we did not get any speed up.

In this case, we can recover our speed-up by using the method parallel_pipe instead:

In [3]:
import asyncio
import time
import aiochan as ac

def worker(n):
    time.sleep(0.1)
    return n*2

async def main():
    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).parallel_pipe(10, worker).collect())
    print(asyncio.get_event_loop().time() - start)

ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
0.20713990507647395

When using parallel_pipe, our worker has to be a normal function instead of an async function. As before, if order is not important, parallel_pipe_unordered can give you even more throughput:

In [5]:
import asyncio
import time
import random
import aiochan as ac

def worker(n):
    time.sleep(random.uniform(0, 0.2))
    return n*2

async def main():
    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).parallel_pipe(10, worker).collect())
    print('ordered time:', asyncio.get_event_loop().time() - start)

    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).parallel_pipe_unordered(10, worker).collect())
    print('unordered time:', asyncio.get_event_loop().time() - start)

ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
ordered time: 0.35387236496899277
[16, 2, 8, 24, 6, 10, 0, 32, 22, 34, 12, 36, 4, 38, 28, 18, 30, 20, 14, 26]
unordered time: 0.19887939398176968

In fact, parallel_pipe works by starting a thread-pool and execute the workers in the thread-pool. Multiple threads can solve the problem of workers sleeping on the thread, as in our example. But remember that the default implementation of python, the CPython, has a global interpreter lock (GIL) which prevents more than one python statement executing at the same time. Will parallel_pipe help in the presence of GIL, besides the case of workers just sleeping?

It turns out that for the majority of serious cases, multiple threads help even in the presence of the GIL, because most of the heavy-lifting operations, for example file accesses, are implemented in C instead of in pure python, and in C it is possible to release the GIL when not interacting with the python runtime. In addition to file accesses, if you are doing number-crunching, then hopefully you are not doing it in pure python but instead relies on dedicated libraries like numpy, scipy, etc. All of these libraries release the GIL when it makes sense to do so. So using parallel_pipe is usually enough.

What if you just have to do your CPU-intensive tasks in python? Well, parallel_pipe and parallel_pipe_unordered takes an argument called mode, which by default takes the value 'thread'. If you change it to 'process', then a process-pool instead of a thread-pool will be used. Let’s see a comparison:

In [6]:
import asyncio
import time
import aiochan as ac

def worker(_):
    total = 0
    for i in range(1000000):
        total += i
    return total

async def main():
    start = asyncio.get_event_loop().time()
    await ac.from_range(20).parallel_pipe(10, worker).collect()
    print('using threads', asyncio.get_event_loop().time() - start)

    start = asyncio.get_event_loop().time()
    await ac.from_range(20).parallel_pipe(10, worker, mode='process').collect()
    print('using threads', asyncio.get_event_loop().time() - start)

ac.run(main())
using threads 1.7299788249656558
using threads 0.20847543003037572

Why not use a process pool in all cases? Processes have much greater overhead than threads, and also far more restrictions on their use. Crucially, you cannot share any object unless you do some dirty work yourself, and anything you pass to your worker, or return from your worker, must be picklable.

In our example, our worker is a pure function. It is also possible to prepare some structures in each worker before-hand. In python 3.7 or above, there are the initializer and init_args arguments accepted by parallel_pipe and parallel_pipe_unordered, which will be passed to the construction to the pool executors to do the setup. Prior to python 3.7, such a setup is still possible with some hack: you can put the object to be set up in a threading.local object, and for every worker execution, check if the object exists, and if not, do the initialization:

In [7]:
import asyncio
import time
import random
import threading
import aiochan as ac

worker_data = threading.local()

def worker(n):
    try:
        processor = worker_data.processor
    except:
        print('setting up processor')
        worker_data.processor = lambda x: x*2
        processor = worker_data.processor
    return processor(n)

async def main():
    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).parallel_pipe(2, worker).collect())

ac.run(main())
setting up processor
setting up processor
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]

Since we used two thread workers, the setup is done twice. This also works for mode='process'.

What about parallelising work across the network? Or more exotic workflows? At its core, aiochan is a library that facilitates moving data around within the boundary of a single process on a single machine, but there is nothing preventing you using channels at the end-points of a network-based parallelism framework such as message queues or a framework like dart. Within its bounday, aiochan aims to give you maximum flexibility in developing concurrent workflows, and you should use aiochan it in tandem with some other suitable libraries or frameworks when you want to step out of its boundary.

Back to the main thread

Speaking of stepping out of boundaries, one case is exceedingly common: you use an aiochan-based workflow to prepare a stream of values, but you want to consume these values outside of the asyncio event loop. In this case, there are convenience methods for you:

In [8]:
loop = asyncio.new_event_loop()

out = ac.Chan(loop=loop)

async def worker():
    while True:
        await asyncio.sleep(0.1)
        if not (await out.put('work')):
            break

ac.run_in_thread(worker(), loop=loop)

it = out.to_iterable(buffer_size=1)

print(next(it))
print(next(it))

loop.call_soon_threadsafe(out.close);
work
work

Notice how we constructed the channel on the main thread, with explicit arguments specifying on which loop the channel is to be used, and then derived a iterator from the queue. Also, to run the worker, we used run_in_thread with an explicit event loop given.

When creating the iterable, notice we have given it a buffer_size. This is used to construct a queue for inter-thread communication. You can also use a queue directly:

In [9]:
import queue

loop = asyncio.new_event_loop()

out = ac.Chan(loop=loop)

async def worker():
    while True:
        await asyncio.sleep(0.1)
        if not (await out.put('work')):
            break

ac.run_in_thread(worker(), loop=loop)

q = queue.Queue()

out.to_queue(q)

print(q.get())
print(q.get())

loop.call_soon_threadsafe(out.close);
work
work

Other queues can be used as long as they follow the public API of queue.Queue and are thread-safe.

aiochan without asyncio

Finally, before ending this tutorial, let’s reveal a secret: you don’t need asyncio to use aiochan! “Isn’t aiochan based on asyncio?” Well, not really, the core algorithms of aiochan (which is based on those from Clojure’s core.async) does not use any asyncio constructs: they run entirely synchronously. It is only when you use the use-facing methods such as get, put and select that an asyncio-facade was made to cover the internals.

On the other hand, there are some functions (actually, three of them) that does not touch anything related to asyncio given the correct arguments:

  • Chan.put_nowait
  • Chan.get_nowait
  • select

Normally, when you call ch.put_nowait(v), the put will succeed if it is possible to do so immediately (for example, if there is a pending get or buffer can be used), otherwise it will give up. Note that you never await on put_nowait. However, if you give the argument immediate_only=True, then if the operation cannot be completed immediately, it will be queued (but again, the pending queue can overflow). In addition, you can give a callback to the cb argument, which will be called when the put finally succeeds, with the same argument as the return value of await put(v). The same is true with get_nowait(immediate_only=True, cb=cb). For select, if you give a callback to the cb argument, then you should not call await on it, but instead rely on the callback being called eventually as cb(return_value, which_channel). Note if you don’t expect to use any event loops, when constructing channels, you should explicitly pass in loop='no_loop'.

Example: this is our asyncio-based fan-in, fan-out:

In [10]:
import aiochan as ac
import asyncio

async def consumer(c, tag):
    async for v in c:
        print('%s received %s' % (tag, v))

async def producer(c, tag):
    for i in range(5):
        v = '%s-%s' % (tag, i)
        print('%s produces %s' % (tag, v))
        await c.put(v)

async def main():
    c = ac.Chan()
    for i in range(3):
        ac.go(consumer(c, 'c' + str(i)))
    for i in range(3):
        ac.go(producer(c, 'p' + str(i)))
    await asyncio.sleep(0.1)

ac.run(main())
p0 produces p0-0
p0 produces p0-1
p0 produces p0-2
p0 produces p0-3
p1 produces p1-0
p2 produces p2-0
c0 received p0-0
c0 received p0-3
c0 received p1-0
c0 received p2-0
c1 received p0-1
c2 received p0-2
p0 produces p0-4
p1 produces p1-1
p1 produces p1-2
p1 produces p1-3
p2 produces p2-1
c0 received p0-4
c0 received p1-3
c0 received p2-1
c1 received p1-1
c2 received p1-2
p1 produces p1-4
p2 produces p2-2
p2 produces p2-3
p2 produces p2-4
c0 received p1-4
c0 received p2-4
c1 received p2-2
c2 received p2-3

By the appropriate use of callbacks, we can write avoid using asyncio completely:

In [12]:
def consumer(c, tag):
    def cb(v):
        if v is not None:
            print('%s received %s' % (tag, v))
            consumer(c, tag)
    c.get_nowait(immediate_only=False, cb=cb)

def producer(c, tag, i=0):
    v = '%s-%s' % (tag, i)
    def cb(ok):
        if ok and i < 4:
            print('%s produces %s' % (tag, v))
            producer(c, tag, i+1)

    c.put_nowait(v, immediate_only=False, cb=cb)

def main():
    c = ac.Chan(loop='no_loop')
    for i in range(3):
        consumer(c, 'c' + str(i))
    for i in range(3):
        producer(c, 'p' + str(i))

main()
c0 received p0-0
p0 produces p0-0
c1 received p0-1
p0 produces p0-1
c2 received p0-2
p0 produces p0-2
c0 received p0-3
p0 produces p0-3
c1 received p0-4
c2 received p1-0
p1 produces p1-0
c0 received p1-1
p1 produces p1-1
c1 received p1-2
p1 produces p1-2
c2 received p1-3
p1 produces p1-3
c0 received p1-4
c1 received p2-0
p2 produces p2-0
c2 received p2-1
p2 produces p2-1
c0 received p2-2
p2 produces p2-2
c1 received p2-3
p2 produces p2-3
c2 received p2-4

The end result is (almost) the same. An example with select:

In [13]:
def select_run():
    c = ac.Chan(1, loop='no_loop', name='c')
    d = ac.Chan(1, loop='no_loop', name='d')
    put_chan = None

    def put_cb(v, c):
        nonlocal put_chan
        put_chan = c

    ac.select((c, 1), (d, 2), cb=put_cb)

    get_val = None

    def get_cb(v, c):
        nonlocal get_val
        get_val = v

    ac.select(c, d, cb=get_cb)

    print('select put into %s, get value %s' % (put_chan, get_val))

def main():
    for _ in range(10):
        select_run()

main()
select put into Chan<c 140356982933192>, get value 1
select put into Chan<c 140356982933192>, get value 1
select put into Chan<d 140356982931944>, get value 2
select put into Chan<c 140356982931944>, get value 1
select put into Chan<c 140356982931944>, get value 1
select put into Chan<c 140356982931944>, get value 1
select put into Chan<d 140356982932672>, get value 2
select put into Chan<c 140356982932672>, get value 1
select put into Chan<d 140356982931944>, get value 2
select put into Chan<c 140356982931944>, get value 1

“But why?” Well, obviously writing callbacks is much harder than using asyncio. But who knows? Maybe you are writing some other, higher-level framework that can make use of the semantics of aiochan. The possibilities are endless! In particular, there are non-asyncio concurrency frameworks in python itself that utilizes the same coroutines, an example being python-trio. Since the core of aiochan does not rely on asyncio, porting it to trio is trivial.