Note: you can try this tutorial in .
Parallelism and beyond¶
We discussed async_pipe
and async_pipe_unordered
in the context
of trying to put more “concurrency” into our program by taking advantage
of parallelism. What does “parallelism” mean here?
Facing the reality of python concurrency, again¶
With async_pipe
and async_pipe_unordered
, by giving them more
coroutine instances to work with, we achieved higher throughput. But
that is only because our coroutines are, in a quite literal sense,
sleeping on the job: to simulate real jobs, we called await
on
asyncio.sleep
. The event loop, faced with this await, just puts the
coroutine on hold until it is ready to act again.
Now it is entirely possible that this behaviour — of not letting sleeping coroutines block the whole program — is all you need. In particular, if you are dealing with network connections or sockets and you are using a proper asyncio-based library, then “doing network work” isn’t too much from sleeping on the loop.
However, for other operations not tailored for asyncio, you will not get any speed-up with parallelism based on asyncio. Crucially, asyncio has no built-in support for file accesses.
Let’s see an example:
In [2]:
import asyncio
import time
import aiochan as ac
async def worker(n):
time.sleep(0.1) # await asyncio.sleep(0.1)
return n*2
async def main():
start = asyncio.get_event_loop().time()
print(await ac.from_range(20).async_pipe(10, worker).collect())
print(asyncio.get_event_loop().time() - start)
ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
2.009612141060643
The only different than before (when we first introduced async_pipe
)
is that we replaced asyncio.sleep
with time.sleep
. With this
change, we did not get any speed up.
In this case, we can recover our speed-up by using the method
parallel_pipe
instead:
In [3]:
import asyncio
import time
import aiochan as ac
def worker(n):
time.sleep(0.1)
return n*2
async def main():
start = asyncio.get_event_loop().time()
print(await ac.from_range(20).parallel_pipe(10, worker).collect())
print(asyncio.get_event_loop().time() - start)
ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
0.20713990507647395
When using parallel_pipe
, our worker
has to be a normal function
instead of an async function. As before, if order is not important,
parallel_pipe_unordered
can give you even more throughput:
In [5]:
import asyncio
import time
import random
import aiochan as ac
def worker(n):
time.sleep(random.uniform(0, 0.2))
return n*2
async def main():
start = asyncio.get_event_loop().time()
print(await ac.from_range(20).parallel_pipe(10, worker).collect())
print('ordered time:', asyncio.get_event_loop().time() - start)
start = asyncio.get_event_loop().time()
print(await ac.from_range(20).parallel_pipe_unordered(10, worker).collect())
print('unordered time:', asyncio.get_event_loop().time() - start)
ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
ordered time: 0.35387236496899277
[16, 2, 8, 24, 6, 10, 0, 32, 22, 34, 12, 36, 4, 38, 28, 18, 30, 20, 14, 26]
unordered time: 0.19887939398176968
In fact, parallel_pipe
works by starting a thread-pool and execute
the workers in the thread-pool. Multiple threads can solve the problem
of workers sleeping on the thread, as in our example. But remember that
the default implementation of python, the CPython, has a global
interpreter lock (GIL) which prevents more than one python statement
executing at the same time. Will parallel_pipe
help in the presence
of GIL, besides the case of workers just sleeping?
It turns out that for the majority of serious cases, multiple threads
help even in the presence of the GIL, because most of the heavy-lifting
operations, for example file accesses, are implemented in C instead of
in pure python, and in C it is possible to release the GIL when not
interacting with the python runtime. In addition to file accesses, if
you are doing number-crunching, then hopefully you are not doing it in
pure python but instead relies on dedicated libraries like numpy, scipy,
etc. All of these libraries release the GIL when it makes sense to do
so. So using parallel_pipe
is usually enough.
What if you just have to do your CPU-intensive tasks in python? Well,
parallel_pipe
and parallel_pipe_unordered
takes an argument
called mode
, which by default takes the value 'thread'
. If you
change it to 'process'
, then a process-pool instead of a thread-pool
will be used. Let’s see a comparison:
In [6]:
import asyncio
import time
import aiochan as ac
def worker(_):
total = 0
for i in range(1000000):
total += i
return total
async def main():
start = asyncio.get_event_loop().time()
await ac.from_range(20).parallel_pipe(10, worker).collect()
print('using threads', asyncio.get_event_loop().time() - start)
start = asyncio.get_event_loop().time()
await ac.from_range(20).parallel_pipe(10, worker, mode='process').collect()
print('using threads', asyncio.get_event_loop().time() - start)
ac.run(main())
using threads 1.7299788249656558
using threads 0.20847543003037572
Why not use a process pool in all cases? Processes have much greater overhead than threads, and also far more restrictions on their use. Crucially, you cannot share any object unless you do some dirty work yourself, and anything you pass to your worker, or return from your worker, must be picklable.
In our example, our worker is a pure function. It is also possible to
prepare some structures in each worker before-hand. In python 3.7 or
above, there are the initializer
and init_args
arguments
accepted by parallel_pipe
and parallel_pipe_unordered
, which
will be passed to the construction to the pool executors to do the
setup. Prior to python 3.7, such a setup is still possible with some
hack: you can put the object to be set up in a threading.local
object, and for every worker execution, check if the object exists,
and if not, do the initialization:
In [7]:
import asyncio
import time
import random
import threading
import aiochan as ac
worker_data = threading.local()
def worker(n):
try:
processor = worker_data.processor
except:
print('setting up processor')
worker_data.processor = lambda x: x*2
processor = worker_data.processor
return processor(n)
async def main():
start = asyncio.get_event_loop().time()
print(await ac.from_range(20).parallel_pipe(2, worker).collect())
ac.run(main())
setting up processor
setting up processor
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
Since we used two thread workers, the setup is done twice. This also
works for mode='process'
.
What about parallelising work across the network? Or more exotic workflows? At its core, aiochan is a library that facilitates moving data around within the boundary of a single process on a single machine, but there is nothing preventing you using channels at the end-points of a network-based parallelism framework such as message queues or a framework like dart. Within its bounday, aiochan aims to give you maximum flexibility in developing concurrent workflows, and you should use aiochan it in tandem with some other suitable libraries or frameworks when you want to step out of its boundary.
Back to the main thread¶
Speaking of stepping out of boundaries, one case is exceedingly common: you use an aiochan-based workflow to prepare a stream of values, but you want to consume these values outside of the asyncio event loop. In this case, there are convenience methods for you:
In [8]:
loop = asyncio.new_event_loop()
out = ac.Chan(loop=loop)
async def worker():
while True:
await asyncio.sleep(0.1)
if not (await out.put('work')):
break
ac.run_in_thread(worker(), loop=loop)
it = out.to_iterable(buffer_size=1)
print(next(it))
print(next(it))
loop.call_soon_threadsafe(out.close);
work
work
Notice how we constructed the channel on the main thread, with explicit
arguments specifying on which loop the channel is to be used, and then
derived a iterator from the queue. Also, to run the worker, we used
run_in_thread
with an explicit event loop given.
When creating the iterable, notice we have given it a buffer_size
.
This is used to construct a queue for inter-thread communication. You
can also use a queue directly:
In [9]:
import queue
loop = asyncio.new_event_loop()
out = ac.Chan(loop=loop)
async def worker():
while True:
await asyncio.sleep(0.1)
if not (await out.put('work')):
break
ac.run_in_thread(worker(), loop=loop)
q = queue.Queue()
out.to_queue(q)
print(q.get())
print(q.get())
loop.call_soon_threadsafe(out.close);
work
work
Other queues can be used as long as they follow the public API of
queue.Queue
and are thread-safe.
aiochan without asyncio¶
Finally, before ending this tutorial, let’s reveal a secret: you don’t
need asyncio to use aiochan! “Isn’t aiochan based on asyncio?” Well, not
really, the core algorithms of aiochan (which is based on those from
Clojure’s core.async) does not use any asyncio constructs: they run
entirely synchronously. It is only when you use the use-facing methods
such as get
, put
and select
that an asyncio-facade was made
to cover the internals.
On the other hand, there are some functions (actually, three of them) that does not touch anything related to asyncio given the correct arguments:
Chan.put_nowait
Chan.get_nowait
select
Normally, when you call ch.put_nowait(v)
, the put will succeed if it
is possible to do so immediately (for example, if there is a pending get
or buffer can be used), otherwise it will give up. Note that you never
await
on put_nowait
. However, if you give the argument
immediate_only=True
, then if the operation cannot be completed
immediately, it will be queued (but again, the pending queue can
overflow). In addition, you can give a callback to the cb
argument,
which will be called when the put finally succeeds, with the same
argument as the return value of await put(v)
. The same is true with
get_nowait(immediate_only=True, cb=cb)
. For select
, if you give
a callback to the cb
argument, then you should not call await
on
it, but instead rely on the callback being called eventually as
cb(return_value, which_channel)
. Note if you don’t expect to use any
event loops, when constructing channels, you should explicitly pass in
loop='no_loop'
.
Example: this is our asyncio-based fan-in, fan-out:
In [10]:
import aiochan as ac
import asyncio
async def consumer(c, tag):
async for v in c:
print('%s received %s' % (tag, v))
async def producer(c, tag):
for i in range(5):
v = '%s-%s' % (tag, i)
print('%s produces %s' % (tag, v))
await c.put(v)
async def main():
c = ac.Chan()
for i in range(3):
ac.go(consumer(c, 'c' + str(i)))
for i in range(3):
ac.go(producer(c, 'p' + str(i)))
await asyncio.sleep(0.1)
ac.run(main())
p0 produces p0-0
p0 produces p0-1
p0 produces p0-2
p0 produces p0-3
p1 produces p1-0
p2 produces p2-0
c0 received p0-0
c0 received p0-3
c0 received p1-0
c0 received p2-0
c1 received p0-1
c2 received p0-2
p0 produces p0-4
p1 produces p1-1
p1 produces p1-2
p1 produces p1-3
p2 produces p2-1
c0 received p0-4
c0 received p1-3
c0 received p2-1
c1 received p1-1
c2 received p1-2
p1 produces p1-4
p2 produces p2-2
p2 produces p2-3
p2 produces p2-4
c0 received p1-4
c0 received p2-4
c1 received p2-2
c2 received p2-3
By the appropriate use of callbacks, we can write avoid using
asyncio
completely:
In [12]:
def consumer(c, tag):
def cb(v):
if v is not None:
print('%s received %s' % (tag, v))
consumer(c, tag)
c.get_nowait(immediate_only=False, cb=cb)
def producer(c, tag, i=0):
v = '%s-%s' % (tag, i)
def cb(ok):
if ok and i < 4:
print('%s produces %s' % (tag, v))
producer(c, tag, i+1)
c.put_nowait(v, immediate_only=False, cb=cb)
def main():
c = ac.Chan(loop='no_loop')
for i in range(3):
consumer(c, 'c' + str(i))
for i in range(3):
producer(c, 'p' + str(i))
main()
c0 received p0-0
p0 produces p0-0
c1 received p0-1
p0 produces p0-1
c2 received p0-2
p0 produces p0-2
c0 received p0-3
p0 produces p0-3
c1 received p0-4
c2 received p1-0
p1 produces p1-0
c0 received p1-1
p1 produces p1-1
c1 received p1-2
p1 produces p1-2
c2 received p1-3
p1 produces p1-3
c0 received p1-4
c1 received p2-0
p2 produces p2-0
c2 received p2-1
p2 produces p2-1
c0 received p2-2
p2 produces p2-2
c1 received p2-3
p2 produces p2-3
c2 received p2-4
The end result is (almost) the same. An example with select
:
In [13]:
def select_run():
c = ac.Chan(1, loop='no_loop', name='c')
d = ac.Chan(1, loop='no_loop', name='d')
put_chan = None
def put_cb(v, c):
nonlocal put_chan
put_chan = c
ac.select((c, 1), (d, 2), cb=put_cb)
get_val = None
def get_cb(v, c):
nonlocal get_val
get_val = v
ac.select(c, d, cb=get_cb)
print('select put into %s, get value %s' % (put_chan, get_val))
def main():
for _ in range(10):
select_run()
main()
select put into Chan<c 140356982933192>, get value 1
select put into Chan<c 140356982933192>, get value 1
select put into Chan<d 140356982931944>, get value 2
select put into Chan<c 140356982931944>, get value 1
select put into Chan<c 140356982931944>, get value 1
select put into Chan<c 140356982931944>, get value 1
select put into Chan<d 140356982932672>, get value 2
select put into Chan<c 140356982932672>, get value 1
select put into Chan<d 140356982931944>, get value 2
select put into Chan<c 140356982931944>, get value 1
“But why?” Well, obviously writing callbacks is much harder than using
asyncio. But who knows? Maybe you are writing some other, higher-level
framework that can make use of the semantics of aiochan. The
possibilities are endless! In particular, there are non-asyncio
concurrency frameworks in python itself that utilizes the same
coroutines, an example being python-trio
. Since the core of aiochan
does not rely on asyncio, porting it to trio
is trivial.