Note: you can try this tutorial in .
Combination operations¶
Channels can act as versatile conduits for the flow of data, as in our examples of the fan-in and fan-out pattern. Here we discuss some convenient functions and constructs for dealing with more complicated patterns for the combination of data.
Merging values¶
In fan-in, we passed the channels to all coroutines interested in producing data. Often we find that these producers provide their own output channels instead. In this case, we can merge
these channels into a single one:
[2]:
import aiochan as ac
import asyncio
async def main():
p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
out = ac.merge(p1, p2)
async for v in out:
print(v)
ac.run(main())
p2-0
p1-0
p2-1
p1-1
p2-2
p1-2
p2-3
p1-3
p2-4
p1-4
p1-5
p2-5
p1-6
p2-6
p2-7
p1-7
p2-8
p1-8
p2-9
p1-9
As in manual fan-in, the order of the values are somewhat non-deterministic. If you want your channels to produce values in-sync, you want to use zip_chans
:
[3]:
async def main():
p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
out = ac.zip_chans(p1, p2)
async for v in out:
print(v)
ac.run(main())
['p1-0', 'p2-0']
['p1-1', 'p2-1']
['p1-2', 'p2-2']
['p1-3', 'p2-3']
['p1-4', 'p2-4']
['p1-5', 'p2-5']
['p1-6', 'p2-6']
['p1-7', 'p2-7']
['p1-8', 'p2-8']
['p1-9', 'p2-9']
A even more complicated use case is that your producer produces items at different rates, and you want to keep track of the latest values produced by each of them:
[4]:
async def main():
p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
out = ac.combine_latest(p1, p2)
async for v in out:
print(v)
ac.run(main())
['p1-0', None]
['p1-0', 'p2-0']
['p1-1', 'p2-0']
['p1-1', 'p2-1']
['p1-1', 'p2-2']
['p1-2', 'p2-2']
['p1-2', 'p2-3']
['p1-3', 'p2-3']
['p1-4', 'p2-3']
['p1-4', 'p2-4']
['p1-5', 'p2-4']
['p1-5', 'p2-5']
['p1-6', 'p2-5']
['p1-6', 'p2-6']
['p1-7', 'p2-6']
['p1-7', 'p2-7']
['p1-7', 'p2-8']
['p1-8', 'p2-8']
['p1-9', 'p2-8']
['p1-9', 'p2-9']
Notice for channels that has yet to produce a value, None
is put in its place.
As for the functional methods, all of these functions take optional out
and close
arguments, controlling the output channel and whether to close the output channel when there is nothing more to be done (i.e., when all source channels are closed).
To recap:
- use
merge
to combine values from several channels - use
zip_chans
to combine values from several channels in sync - use
combine_latest
to combine values and monitor the latest value from each channel
Distributing values¶
We have discussed generalizations of fan-in above. For simple fan-out, we have distribute
:
[5]:
async def worker(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
inputs = [ac.Chan(name='inp%s' % i) for i in range(3)]
ac.from_range(20).distribute(*inputs)
for idx, c in enumerate(inputs):
ac.go(worker(c, 'worker%s' % idx))
await asyncio.sleep(0.1)
ac.run(main())
worker0 received 0
worker0 received 1
worker0 received 4
worker1 received 2
worker2 received 3
worker0 received 5
worker0 received 8
worker2 received 6
worker1 received 7
worker0 received 9
worker0 received 12
worker1 received 10
worker2 received 11
worker1 received 13
worker1 received 16
worker0 received 14
worker2 received 15
worker1 received 17
worker2 received 18
worker0 received 19
One of the benefit of using distribute
instead of a plain fan-out is that, in the case one of the down-stream channels are closed, distribute
will try to put the value into another downstream channel so that no values would be lost.
In fan-out and distribute
, each down-stream consumer obtains non-overlapping subsets of the input. Sometimes we want each consumer to consume the whole input instead. In this case, we want a duplicator, or Dup
in aiochan. An example:
[6]:
async def worker(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
dup = ac.from_range(5).dup()
inputs = [dup.tap() for i in range(3)]
for idx, c in enumerate(inputs):
ac.go(worker(c, 'worker%s' % idx))
await asyncio.sleep(0.1)
dup.close()
ac.run(main())
worker0 received 0
worker1 received 0
worker1 received 1
worker2 received 0
worker0 received 1
worker2 received 1
worker2 received 2
worker0 received 2
worker1 received 2
worker0 received 3
worker0 received 4
worker1 received 3
worker2 received 3
worker1 received 4
worker2 received 4
We see that ch.dup
creates a duplicator, or Dup
, and dup.tap()
creates a new tap on the duplicator that contains the values put into the source channel. As for the functional methods, dup.tap
accepts the arguments out
and close
, which controls what to be used as the output channel and whether to close the output channel when the input is closed.
Note that duplicated elements are put to downstream channels in order. This means that if any one of the downstream channels block on put for some reason, the whole progress will be blocked. You should consider giving downstream inputs some buffer if your downstream processors are uneven in their processing speed.
A Dup
also has the method untap
, which can be used to untap an existing tapping channel. For example:
[7]:
async def worker(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
dup = ac.from_range(5).dup()
inputs = [dup.tap() for i in range(3)]
dup.untap(inputs[1])
for idx, c in enumerate(inputs):
ac.go(worker(c, 'worker%s' % idx))
await asyncio.sleep(0.1)
dup.close()
ac.run(main())
worker0 received 0
worker2 received 0
worker2 received 1
worker0 received 1
worker0 received 2
worker0 received 3
worker2 received 2
worker2 received 3
worker2 received 4
worker0 received 4
We see that worker1
, which has been untapped, does not receive anything. In more complicated programs, tappings and untapping can be done dynamically, at arbitrary times.
Another very common idiom is pub-sub, and this is easy to do as well:
[9]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub(lambda x: x % 3)
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
source.add(0,1,2,3,4,5,6,7,8,9).close()
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p0 received 0
px received 0
px received 3
p0 received 3
p1 received 1
p2 received 2
p0 received 6
px received 6
p1 received 4
p2 received 5
p1 received 7
p2 received 8
p0 received 9
px received 9
In this case, the topic is defined by the lambda, which gives the remainder when the item is divided by three. Processors subscribe to the topics they are intrested in, and we see that p0
and px
received all numbers with remainder 0, p1
all numbers with remainder 1, and p2
all numbers with remainder 2.
A Pub
also has a method unsub
, which can be used to unsubscribe a currently subscribing channel. For example:
[11]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub(lambda x: x % 3)
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
pub.unsub(0, px)
pub.sub(1, px)
pub.sub(2, px)
source.add(0,1,2,3,4,5,6,7,8,9).close()
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p0 received 0
p0 received 3
p1 received 1
px received 1
px received 2
p2 received 2
p1 received 4
px received 4
px received 5
p2 received 5
p2 received 8
px received 8
px received 7
p0 received 6
p1 received 7
p0 received 9
In this example, we initially subscribed px
to the topic 0, but then changed our mind and subscribed it to 1 and 2 instead (yes a channel can subscribe to multiple topics).
There is also unsub_all
, which can unsubscribe a whole topic in one go:
[13]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub(lambda x: x % 3)
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
pub.unsub_all(0)
source.add(0,1,2,3,4,5,6,7,8,9)
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p1 received 1
p1 received 4
p2 received 2
p2 received 5
p2 received 8
p1 received 7
Now the 0 topic does not have any subscribers after the call to unsub_all
. If this method is called without argument, all subscribers for all topics are unsubscribed.
In the above examples, we have passed in a lambda for the argument to pub
. If we don’t pass in anything, then the Pub
assumes that values are tuples, and the first element of the tuple is the topic:
[15]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub()
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
source.add((0, 0),
(1, 1),
(2, 2),
(0, 3),
(1, 4),
(2, 5),
(0, 6),
(1, 7),
(2, 8),
(3, 9))
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p0 received (0, 0)
px received (0, 0)
px received (0, 3)
p0 received (0, 3)
p1 received (1, 1)
p2 received (2, 2)
p0 received (0, 6)
px received (0, 6)
p1 received (1, 4)
p2 received (2, 5)
p1 received (1, 7)
p2 received (2, 8)
As before, sub
methods all take out
and close
arguments that have their usual meaning.
To recap:
- use
distribute
to distribute values to downstream channels - use
dup
to duplicate values - use
pub
for publisher-subscriber systems