Note: you can try this tutorial in Binder.

Combination operations

Channels can act as versatile conduits for the flow of data, as in our examples of the fan-in and fan-out pattern. Here we discuss some convenient functions and constructs for dealing with more complicated patterns for the combination of data.

Merging values

In fan-in, we passed the channels to all coroutines interested in producing data. Often we find that these producers provide their own output channels instead. In this case, we can merge these channels into a single one:

In [2]:
import aiochan as ac
import asyncio

async def main():
    p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
    p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
    out = ac.merge(p1, p2)
    async for v in out:
        print(v)

ac.run(main())
p2-0
p1-0
p2-1
p1-1
p2-2
p1-2
p2-3
p1-3
p2-4
p1-4
p1-5
p2-5
p1-6
p2-6
p2-7
p1-7
p2-8
p1-8
p2-9
p1-9

As in manual fan-in, the order of the values are somewhat non-deterministic. If you want your channels to produce values in-sync, you want to use zip_chans:

In [3]:
async def main():
    p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
    p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
    out = ac.zip_chans(p1, p2)
    async for v in out:
        print(v)

ac.run(main())
['p1-0', 'p2-0']
['p1-1', 'p2-1']
['p1-2', 'p2-2']
['p1-3', 'p2-3']
['p1-4', 'p2-4']
['p1-5', 'p2-5']
['p1-6', 'p2-6']
['p1-7', 'p2-7']
['p1-8', 'p2-8']
['p1-9', 'p2-9']

A even more complicated use case is that your producer produces items at different rates, and you want to keep track of the latest values produced by each of them:

In [4]:
async def main():
    p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
    p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
    out = ac.combine_latest(p1, p2)
    async for v in out:
        print(v)

ac.run(main())
['p1-0', None]
['p1-0', 'p2-0']
['p1-1', 'p2-0']
['p1-1', 'p2-1']
['p1-1', 'p2-2']
['p1-2', 'p2-2']
['p1-2', 'p2-3']
['p1-3', 'p2-3']
['p1-4', 'p2-3']
['p1-4', 'p2-4']
['p1-5', 'p2-4']
['p1-5', 'p2-5']
['p1-6', 'p2-5']
['p1-6', 'p2-6']
['p1-7', 'p2-6']
['p1-7', 'p2-7']
['p1-7', 'p2-8']
['p1-8', 'p2-8']
['p1-9', 'p2-8']
['p1-9', 'p2-9']

Notice for channels that has yet to produce a value, None is put in its place.

As for the functional methods, all of these functions take optional out and close arguments, controlling the output channel and whether to close the output channel when there is nothing more to be done (i.e., when all source channels are closed).

To recap:

  • use merge to combine values from several channels
  • use zip_chans to combine values from several channels in sync
  • use combine_latest to combine values and monitor the latest value from each channel

Distributing values

We have discussed generalizations of fan-in above. For simple fan-out, we have distribute:

In [5]:
async def worker(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    inputs = [ac.Chan(name='inp%s' % i) for i in range(3)]
    ac.from_range(20).distribute(*inputs)
    for idx, c in enumerate(inputs):
        ac.go(worker(c, 'worker%s' % idx))
    await asyncio.sleep(0.1)

ac.run(main())
worker0 received 0
worker0 received 1
worker0 received 4
worker1 received 2
worker2 received 3
worker0 received 5
worker0 received 8
worker2 received 6
worker1 received 7
worker0 received 9
worker0 received 12
worker1 received 10
worker2 received 11
worker1 received 13
worker1 received 16
worker0 received 14
worker2 received 15
worker1 received 17
worker2 received 18
worker0 received 19

One of the benefit of using distribute instead of a plain fan-out is that, in the case one of the down-stream channels are closed, distribute will try to put the value into another downstream channel so that no values would be lost.

In fan-out and distribute, each down-stream consumer obtains non-overlapping subsets of the input. Sometimes we want each consumer to consume the whole input instead. In this case, we want a duplicator, or Dup in aiochan. An example:

In [6]:
async def worker(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    dup = ac.from_range(5).dup()
    inputs = [dup.tap() for i in range(3)]

    for idx, c in enumerate(inputs):
        ac.go(worker(c, 'worker%s' % idx))
    await asyncio.sleep(0.1)
    dup.close()

ac.run(main())
worker0 received 0
worker1 received 0
worker1 received 1
worker2 received 0
worker0 received 1
worker2 received 1
worker2 received 2
worker0 received 2
worker1 received 2
worker0 received 3
worker0 received 4
worker1 received 3
worker2 received 3
worker1 received 4
worker2 received 4

We see that ch.dup creates a duplicator, or Dup, and dup.tap() creates a new tap on the duplicator that contains the values put into the source channel. As for the functional methods, dup.tap accepts the arguments out and close, which controls what to be used as the output channel and whether to close the output channel when the input is closed.

Note that duplicated elements are put to downstream channels in order. This means that if any one of the downstream channels block on put for some reason, the whole progress will be blocked. You should consider giving downstream inputs some buffer if your downstream processors are uneven in their processing speed.

A Dup also has the method untap, which can be used to untap an existing tapping channel. For example:

In [7]:
async def worker(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    dup = ac.from_range(5).dup()
    inputs = [dup.tap() for i in range(3)]
    dup.untap(inputs[1])

    for idx, c in enumerate(inputs):
        ac.go(worker(c, 'worker%s' % idx))
    await asyncio.sleep(0.1)
    dup.close()

ac.run(main())
worker0 received 0
worker2 received 0
worker2 received 1
worker0 received 1
worker0 received 2
worker0 received 3
worker2 received 2
worker2 received 3
worker2 received 4
worker0 received 4

We see that worker1, which has been untapped, does not receive anything. In more complicated programs, tappings and untapping can be done dynamically, at arbitrary times.

Another very common idiom is pub-sub, and this is easy to do as well:

In [9]:
async def processor(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    source = ac.Chan()
    pub = source.pub(lambda x: x % 3)
    p1 = pub.sub(1)
    p2 = pub.sub(2)
    p0 = pub.sub(0)
    px = pub.sub(0)
    ac.go(processor(p1, 'p1'))
    ac.go(processor(p2, 'p2'))
    ac.go(processor(p0, 'p0'))
    ac.go(processor(px, 'px'))
    source.add(0,1,2,3,4,5,6,7,8,9).close()
    await asyncio.sleep(0.1)
    pub.close()

ac.run(main())
p0 received 0
px received 0
px received 3
p0 received 3
p1 received 1
p2 received 2
p0 received 6
px received 6
p1 received 4
p2 received 5
p1 received 7
p2 received 8
p0 received 9
px received 9

In this case, the topic is defined by the lambda, which gives the remainder when the item is divided by three. Processors subscribe to the topics they are intrested in, and we see that p0 and px received all numbers with remainder 0, p1 all numbers with remainder 1, and p2 all numbers with remainder 2.

A Pub also has a method unsub, which can be used to unsubscribe a currently subscribing channel. For example:

In [11]:
async def processor(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    source = ac.Chan()
    pub = source.pub(lambda x: x % 3)
    p1 = pub.sub(1)
    p2 = pub.sub(2)
    p0 = pub.sub(0)
    px = pub.sub(0)
    ac.go(processor(p1, 'p1'))
    ac.go(processor(p2, 'p2'))
    ac.go(processor(p0, 'p0'))
    ac.go(processor(px, 'px'))
    pub.unsub(0, px)
    pub.sub(1, px)
    pub.sub(2, px)
    source.add(0,1,2,3,4,5,6,7,8,9).close()
    await asyncio.sleep(0.1)
    pub.close()

ac.run(main())
p0 received 0
p0 received 3
p1 received 1
px received 1
px received 2
p2 received 2
p1 received 4
px received 4
px received 5
p2 received 5
p2 received 8
px received 8
px received 7
p0 received 6
p1 received 7
p0 received 9

In this example, we initially subscribed px to the topic 0, but then changed our mind and subscribed it to 1 and 2 instead (yes a channel can subscribe to multiple topics).

There is also unsub_all, which can unsubscribe a whole topic in one go:

In [13]:
async def processor(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    source = ac.Chan()
    pub = source.pub(lambda x: x % 3)
    p1 = pub.sub(1)
    p2 = pub.sub(2)
    p0 = pub.sub(0)
    px = pub.sub(0)
    ac.go(processor(p1, 'p1'))
    ac.go(processor(p2, 'p2'))
    ac.go(processor(p0, 'p0'))
    ac.go(processor(px, 'px'))
    pub.unsub_all(0)
    source.add(0,1,2,3,4,5,6,7,8,9)
    await asyncio.sleep(0.1)
    pub.close()

ac.run(main())
p1 received 1
p1 received 4
p2 received 2
p2 received 5
p2 received 8
p1 received 7

Now the 0 topic does not have any subscribers after the call to unsub_all. If this method is called without argument, all subscribers for all topics are unsubscribed.

In the above examples, we have passed in a lambda for the argument to pub. If we don’t pass in anything, then the Pub assumes that values are tuples, and the first element of the tuple is the topic:

In [15]:
async def processor(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    source = ac.Chan()
    pub = source.pub()
    p1 = pub.sub(1)
    p2 = pub.sub(2)
    p0 = pub.sub(0)
    px = pub.sub(0)
    ac.go(processor(p1, 'p1'))
    ac.go(processor(p2, 'p2'))
    ac.go(processor(p0, 'p0'))
    ac.go(processor(px, 'px'))
    source.add((0, 0),
               (1, 1),
               (2, 2),
               (0, 3),
               (1, 4),
               (2, 5),
               (0, 6),
               (1, 7),
               (2, 8),
               (3, 9))
    await asyncio.sleep(0.1)
    pub.close()

ac.run(main())
p0 received (0, 0)
px received (0, 0)
px received (0, 3)
p0 received (0, 3)
p1 received (1, 1)
p2 received (2, 2)
p0 received (0, 6)
px received (0, 6)
p1 received (1, 4)
p2 received (2, 5)
p1 received (1, 7)
p2 received (2, 8)

As before, sub methods all take out and close arguments that have their usual meaning.

To recap:

  • use distribute to distribute values to downstream channels
  • use dup to duplicate values
  • use pub for publisher-subscriber systems