Note: you can try this tutorial in .
A ten-minutes introduction¶
You will need to import the module aiochan
and asyncio
first:
[2]:
import aiochan as ac
import asyncio
A channel is like a golang channel or a Clojure core.async chan. Creating a channel is simple:
[3]:
c = ac.Chan()
c
[3]:
Chan<_unk_0 140697829983528>
In the following examples, we use ac.run
to run the main coroutine. You can also run asyncio loops directly.
We can call await c.put(v)
to put value into the channel, await c.get()
to get value from the channel, c.close()
to close the channel, and ac.go(...)
to spawn a coroutine inside another coroutine:
[5]:
async def producer(c):
i = 0
while True:
await asyncio.sleep(0.1) # producing stuff takes time
i += 1
still_open = await c.put('product ' + str(i))
if not still_open:
print('producer goes home')
break
async def consumer(c):
while True:
product = await c.get()
if product is not None:
print('obtained:', product)
else:
print('consumer goes home')
break
async def main():
c = ac.Chan()
ac.go(producer(c))
ac.go(consumer(c))
await asyncio.sleep(0.6)
print('It is late, let us call it a day.')
c.close()
await asyncio.sleep(0.2) # necessary to wait for producer
ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home
Channel works as an async iterator:
[6]:
async def producer(c):
i = 0
while True:
await asyncio.sleep(0.1) # producing stuff takes time
i += 1
still_open = await c.put('product ' + str(i))
if not still_open:
print('producer goes home')
break
async def consumer(c):
async for product in c:
print('obtained:', product)
print('consumer goes home')
async def main():
c = ac.Chan()
ac.go(producer(c))
ac.go(consumer(c))
await asyncio.sleep(0.6)
print('It is late, let us call it a day.')
c.close()
await asyncio.sleep(0.2) # necessary to wait for producer
ac.run(main())
obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home
select
, which is the whole point of channels, works as in golang or alt!
in Clojure’s core.async to complete one and only one operation non-deterministically:
[8]:
async def worker(out, stop, tag):
i = 0
while True:
i += 1
await asyncio.sleep(0.1)
result, c = await ac.select(stop, (out, '%s-%s' % (tag, i)), priority=True)
if c is stop:
print('%s stopped' % tag)
break
async def consumer(c, stop):
while True:
result, c = await ac.select(stop, c, priority=True)
if c is stop:
print('consumer stopped')
break
else:
print('received', result)
async def main():
c = ac.Chan()
stop = ac.Chan()
for i in range(3):
ac.go(worker(c, stop, 'worker%s' % i))
ac.go(consumer(c, stop))
await asyncio.sleep(0.6)
stop.close()
await asyncio.sleep(0.2)
ac.run(main())
received worker0-1
received worker1-1
received worker2-1
received worker0-2
received worker1-2
received worker2-2
received worker0-3
received worker1-3
received worker2-3
received worker0-4
received worker1-4
received worker2-4
received worker0-5
received worker1-5
received worker2-5
consumer stopped
worker0 stopped
worker1 stopped
worker2 stopped
Channels can use some buffering to implement back-pressure:
[10]:
async def worker(c):
i = 0
while True:
i += 1
await asyncio.sleep(0.05)
print('producing', i)
await c.put(i)
async def consumer(c):
while True:
await asyncio.sleep(0.2)
result = await c.get()
print('consuming', result)
async def main():
c = ac.Chan(3)
ac.go(worker(c))
ac.go(consumer(c))
await asyncio.sleep(1)
ac.run(main())
producing 1
producing 2
producing 3
consuming 1
producing 4
producing 5
consuming 2
producing 6
consuming 3
producing 7
consuming 4
producing 8
Sliding and dropping buffers are also available.
That’s all the basics, but there are much more: functional methods, combination patterns, pipelines, thread or process-based parallelism and so on. Read the in-depth tutorial or the API documentation to find out more.