Note: you can try this tutorial in .
Combination operations¶
Channels can act as versatile conduits for the flow of data, as in our examples of the fan-in and fan-out pattern. Here we discuss some convenient functions and constructs for dealing with more complicated patterns for the combination of data.
Merging values¶
In fan-in, we passed the channels to all coroutines interested in
producing data. Often we find that these producers provide their own
output channels instead. In this case, we can merge
these channels
into a single one:
In [2]:
import aiochan as ac
import asyncio
async def main():
p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
out = ac.merge(p1, p2)
async for v in out:
print(v)
ac.run(main())
p2-0
p1-0
p2-1
p1-1
p2-2
p1-2
p2-3
p1-3
p2-4
p1-4
p1-5
p2-5
p1-6
p2-6
p2-7
p1-7
p2-8
p1-8
p2-9
p1-9
As in manual fan-in, the order of the values are somewhat
non-deterministic. If you want your channels to produce values in-sync,
you want to use zip_chans
:
In [3]:
async def main():
p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
out = ac.zip_chans(p1, p2)
async for v in out:
print(v)
ac.run(main())
['p1-0', 'p2-0']
['p1-1', 'p2-1']
['p1-2', 'p2-2']
['p1-3', 'p2-3']
['p1-4', 'p2-4']
['p1-5', 'p2-5']
['p1-6', 'p2-6']
['p1-7', 'p2-7']
['p1-8', 'p2-8']
['p1-9', 'p2-9']
A even more complicated use case is that your producer produces items at different rates, and you want to keep track of the latest values produced by each of them:
In [4]:
async def main():
p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
out = ac.combine_latest(p1, p2)
async for v in out:
print(v)
ac.run(main())
['p1-0', None]
['p1-0', 'p2-0']
['p1-1', 'p2-0']
['p1-1', 'p2-1']
['p1-1', 'p2-2']
['p1-2', 'p2-2']
['p1-2', 'p2-3']
['p1-3', 'p2-3']
['p1-4', 'p2-3']
['p1-4', 'p2-4']
['p1-5', 'p2-4']
['p1-5', 'p2-5']
['p1-6', 'p2-5']
['p1-6', 'p2-6']
['p1-7', 'p2-6']
['p1-7', 'p2-7']
['p1-7', 'p2-8']
['p1-8', 'p2-8']
['p1-9', 'p2-8']
['p1-9', 'p2-9']
Notice for channels that has yet to produce a value, None
is put in
its place.
As for the functional methods, all of these functions take optional
out
and close
arguments, controlling the output channel and
whether to close the output channel when there is nothing more to be
done (i.e., when all source channels are closed).
To recap:
- use
merge
to combine values from several channels - use
zip_chans
to combine values from several channels in sync - use
combine_latest
to combine values and monitor the latest value from each channel
Distributing values¶
We have discussed generalizations of fan-in above. For simple fan-out,
we have distribute
:
In [5]:
async def worker(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
inputs = [ac.Chan(name='inp%s' % i) for i in range(3)]
ac.from_range(20).distribute(*inputs)
for idx, c in enumerate(inputs):
ac.go(worker(c, 'worker%s' % idx))
await asyncio.sleep(0.1)
ac.run(main())
worker0 received 0
worker0 received 1
worker0 received 4
worker1 received 2
worker2 received 3
worker0 received 5
worker0 received 8
worker2 received 6
worker1 received 7
worker0 received 9
worker0 received 12
worker1 received 10
worker2 received 11
worker1 received 13
worker1 received 16
worker0 received 14
worker2 received 15
worker1 received 17
worker2 received 18
worker0 received 19
One of the benefit of using distribute
instead of a plain fan-out is
that, in the case one of the down-stream channels are closed,
distribute
will try to put the value into another downstream channel
so that no values would be lost.
In fan-out and distribute
, each down-stream consumer obtains
non-overlapping subsets of the input. Sometimes we want each consumer to
consume the whole input instead. In this case, we want a duplicator, or
Dup
in aiochan. An example:
In [6]:
async def worker(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
dup = ac.from_range(5).dup()
inputs = [dup.tap() for i in range(3)]
for idx, c in enumerate(inputs):
ac.go(worker(c, 'worker%s' % idx))
await asyncio.sleep(0.1)
dup.close()
ac.run(main())
worker0 received 0
worker1 received 0
worker1 received 1
worker2 received 0
worker0 received 1
worker2 received 1
worker2 received 2
worker0 received 2
worker1 received 2
worker0 received 3
worker0 received 4
worker1 received 3
worker2 received 3
worker1 received 4
worker2 received 4
We see that ch.dup
creates a duplicator, or Dup
, and
dup.tap()
creates a new tap on the duplicator that contains the
values put into the source channel. As for the functional methods,
dup.tap
accepts the arguments out
and close
, which controls
what to be used as the output channel and whether to close the output
channel when the input is closed.
Note that duplicated elements are put to downstream channels in order. This means that if any one of the downstream channels block on put for some reason, the whole progress will be blocked. You should consider giving downstream inputs some buffer if your downstream processors are uneven in their processing speed.
A Dup
also has the method untap
, which can be used to untap an
existing tapping channel. For example:
In [7]:
async def worker(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
dup = ac.from_range(5).dup()
inputs = [dup.tap() for i in range(3)]
dup.untap(inputs[1])
for idx, c in enumerate(inputs):
ac.go(worker(c, 'worker%s' % idx))
await asyncio.sleep(0.1)
dup.close()
ac.run(main())
worker0 received 0
worker2 received 0
worker2 received 1
worker0 received 1
worker0 received 2
worker0 received 3
worker2 received 2
worker2 received 3
worker2 received 4
worker0 received 4
We see that worker1
, which has been untapped, does not receive
anything. In more complicated programs, tappings and untapping can be
done dynamically, at arbitrary times.
Another very common idiom is pub-sub, and this is easy to do as well:
In [9]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub(lambda x: x % 3)
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
source.add(0,1,2,3,4,5,6,7,8,9).close()
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p0 received 0
px received 0
px received 3
p0 received 3
p1 received 1
p2 received 2
p0 received 6
px received 6
p1 received 4
p2 received 5
p1 received 7
p2 received 8
p0 received 9
px received 9
In this case, the topic is defined by the lambda, which gives the
remainder when the item is divided by three. Processors subscribe to the
topics they are intrested in, and we see that p0
and px
received
all numbers with remainder 0, p1
all numbers with remainder 1, and
p2
all numbers with remainder 2.
A Pub
also has a method unsub
, which can be used to unsubscribe
a currently subscribing channel. For example:
In [11]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub(lambda x: x % 3)
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
pub.unsub(0, px)
pub.sub(1, px)
pub.sub(2, px)
source.add(0,1,2,3,4,5,6,7,8,9).close()
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p0 received 0
p0 received 3
p1 received 1
px received 1
px received 2
p2 received 2
p1 received 4
px received 4
px received 5
p2 received 5
p2 received 8
px received 8
px received 7
p0 received 6
p1 received 7
p0 received 9
In this example, we initially subscribed px
to the topic 0, but then
changed our mind and subscribed it to 1 and 2 instead (yes a channel can
subscribe to multiple topics).
There is also unsub_all
, which can unsubscribe a whole topic in one
go:
In [13]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub(lambda x: x % 3)
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
pub.unsub_all(0)
source.add(0,1,2,3,4,5,6,7,8,9)
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p1 received 1
p1 received 4
p2 received 2
p2 received 5
p2 received 8
p1 received 7
Now the 0 topic does not have any subscribers after the call to
unsub_all
. If this method is called without argument, all
subscribers for all topics are unsubscribed.
In the above examples, we have passed in a lambda for the argument to
pub
. If we don’t pass in anything, then the Pub
assumes that
values are tuples, and the first element of the tuple is the topic:
In [15]:
async def processor(inp, tag):
async for v in inp:
print('%s received %s' % (tag, v))
async def main():
source = ac.Chan()
pub = source.pub()
p1 = pub.sub(1)
p2 = pub.sub(2)
p0 = pub.sub(0)
px = pub.sub(0)
ac.go(processor(p1, 'p1'))
ac.go(processor(p2, 'p2'))
ac.go(processor(p0, 'p0'))
ac.go(processor(px, 'px'))
source.add((0, 0),
(1, 1),
(2, 2),
(0, 3),
(1, 4),
(2, 5),
(0, 6),
(1, 7),
(2, 8),
(3, 9))
await asyncio.sleep(0.1)
pub.close()
ac.run(main())
p0 received (0, 0)
px received (0, 0)
px received (0, 3)
p0 received (0, 3)
p1 received (1, 1)
p2 received (2, 2)
p0 received (0, 6)
px received (0, 6)
p1 received (1, 4)
p2 received (2, 5)
p1 received (1, 7)
p2 received (2, 8)
As before, sub
methods all take out
and close
arguments that
have their usual meaning.
To recap:
- use
distribute
to distribute values to downstream channels - use
dup
to duplicate values - use
pub
for publisher-subscriber systems