Note: you can try this tutorial in Binder.

Methods and functions

Now we know the basics of channels and operations on them, we will learn about additional methods and functions that can be convenient in various situations.

Putting and getting

As we have already seen, we can add into a channel. Immediately closing the channel afterwards ensures that no further items can be put into the channel:

In [2]:
import aiochan as ac
import asyncio

async def main():
    c = ac.Chan().add(1, 2, 3).close()
    async for v in c:
        print(v)
    await c.put(4)
    r = await c.get()
    print('put/get after closing:', r)

ac.run(main())
1
2
3
put/get after closing: None

This method is mainly provided for convenience. You should NOT adding too much stuff into a channel in this way: it is non-blocking, the puts are accumulated, and if there are too many pending puts accumulated in this way overflow will occur. Adding fewer than 10 items during the initialization phase of a channel is considered ok though.

In the last example we consumed values using the async for syntax. In case where we must deal with many values of the channel at once instead of one by one, we can use collect:

In [3]:
async def main():
    c = ac.Chan().add(1, 2, 3).close()
    r = await c.collect()
    print(r)

ac.run(main())
[1, 2, 3]

In this case, closing the channel first before calling collect is essential: otherwise the await would block forever (and overflow would probably occur if values continuously come in).

collect also accepts an argument n which specifies the maximum number of elements that will be collected. Using it, we can collect on channels that are not yet closed (but we still need to think about how many items we can deal with):

In [4]:
async def main():
    c = ac.Chan().add(1, 2, 3) # no closing
    r = await c.collect(2)
    print(r)

ac.run(main())
[1, 2]

Above we have said that using add to add too many items is dangerous. If you have an existing sequence which you want to turn into a channel, it is much better to use from_iter:

In [5]:
async def main():
    c = ac.from_iter([1, 2, 3, 4, 5, 6])
    r = await c.collect()
    print(r)
    print(c.closed)

ac.run(main())
[1, 2, 3, 4, 5, 6]
True

Note that the channel is closed on construction (we can check whether a channel is closed by using the .closed property on a channel).

Infinite collections are ok:

In [6]:
def natural_numbers():
    i = 0
    while True:
        yield i
        i += 1

async def main():
    c = ac.from_iter(natural_numbers())
    r = await c.collect(10)
    print(r)
    print(c.closed)
    r = await c.collect(10)
    print(r)
    print(c.closed)

ac.run(main())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
True
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
True

Even when the channel is closed, values can still be obtained from it (and in this case values cannot be exhausted). Closing only stops putting operations immediately.

Making channels producing numbers is so common that we have a function for it:

In [7]:
async def main():
    c1 = ac.from_range()
    r = await c1.collect(10)
    print(r) # natural numbers

    c2 = ac.from_range(5) # same as ac.from_iter(range(5))
    r = await c2.collect()
    print(r)

    c3 = ac.from_range(0, 10, 3) # same as ac.from_iter(range(0, 10, 3))
    r = await c3.collect()
    print(r)

ac.run(main())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4]
[0, 3, 6, 9]

To recap:

  • .add can be used to add a few items into a channel on initialization (any other use is dangerous)
  • .collect can be used to bulk get items from channel
  • .closed tests if a channel is already closed
  • from_iter creates channels containing all elements from an iterable (even infinite iterable is ok)
  • from_range is tailored for making channels generating number series

Time-based operations

So far we have always used asyncio.sleep to make execution stop for a little while, pretending to do work. We also have timeout function that does almost the same thing by producing a channel that automatically closes after an interval:

In [8]:
async def main():
    start = asyncio.get_event_loop().time()
    c = ac.timeout(1.0)
    await c.get()
    end = asyncio.get_event_loop().time()
    print(end - start)

ac.run(main())
1.001395168947056

This is useful even when we are not pretending to do work, for example, for timeout control:

In [9]:
async def main():
    tout = ac.timeout(1.0)
    while (await ac.select(tout, default=True))[0]:
        print('do work')
        await asyncio.sleep(0.2)
    print('done')

ac.run(main())
do work
do work
do work
do work
do work
done

The above example is written in a somewhat terse style. You should try to understand why it achieves the closing on time behaviour.

As timeout produces a channel, which can be passed arount and selected, it offers great flexibility for controlling time-based behaviours. However, using it for the ticks of a clock is harmful, as exemplified below:

In [10]:
async def main():
    start = asyncio.get_event_loop().time()
    for i in range(20):
        await ac.timeout(0.1).get()
        print(i, asyncio.get_event_loop().time() - start)

ac.run(main())
0 0.10043401701841503
1 0.20142484991811216
2 0.30242938199080527
3 0.4030482260277495
4 0.5035843959776685
5 0.6041081629227847
6 0.7046528200153261
7 0.8056348919635639
8 0.9063465989893302
9 1.0068686519516632
10 1.1073921599891037
11 1.2079381300136447
12 1.3089604979613796
13 1.4095268349628896
14 1.5100650689564645
15 1.6105891889892519
16 1.7114433919778094
17 1.81249319401104
18 1.9130375039530918
19 2.0135989299742505

The problem is that timeout guarantees that it will close after the specified time has elapsed, and will make an attempt to close as soon as possible, but it can never close at the precise instant. Over time, errors will accumulate. In the above example, we have already accumulated 0.01 seconds of error in mere 2 seconds.

If want something that ticks, use the tick_tock function:

In [11]:
async def main():
    start = asyncio.get_event_loop().time()
    ticker = ac.tick_tock(0.1)
    for i in range(20):
        await ticker.get()
        print(i, asyncio.get_event_loop().time() - start)

ac.run(main())
0 0.1004815329797566
1 0.2012625669594854
2 0.3008053069934249
3 0.4013087539933622
4 0.5008452819893137
5 0.6013440380338579
6 0.7008649010676891
7 0.8013983579585329
8 0.900891529978253
9 1.001404833048582
10 1.100898704957217
11 1.2013944609789178
12 1.3008839710382745
13 1.4013996929861605
14 1.501174372038804
15 1.6006878040498123
16 1.701174663961865
17 1.8006792459636927
18 1.9011599159566686
19 2.000674612005241

Errors are still unavoidable, but they do not accumulate.

To recap:

  • Use timeout to control the timing of operations (maybe together with select)
  • If the timing control is recurrent, consider using tick_tock

Functional methods

If you have done any functional programming, you are certainly familiar with things like map, reduce (or foldl, foldr), filter and friends. Channels are armed with these so-called functional chainable methods which, when called, return new channels containing the expected elements.

Examples:

In [12]:
async def main():
    print('map', await ac.from_range(10).map(lambda x: x*2).collect())
    print('filter', await ac.from_range(10).filter(lambda x: x % 2 == 0).collect())
    print('take', await ac.from_range(10).take(5).collect())
    print('drop', await ac.from_range(10).drop(5).collect())
    print('take_while', await ac.from_range(10).take_while(lambda x: x < 5).collect())
    print('drop_while', await ac.from_range(10).drop_while(lambda x: x < 5).collect())

ac.run(main())
map [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
filter [0, 2, 4, 6, 8]
take [0, 1, 2, 3, 4]
drop [5, 6, 7, 8, 9]
take_while [0, 1, 2, 3, 4]
drop_while [5, 6, 7, 8, 9]

There is also distinct:

In [13]:
async def main():
    c = ac.from_iter([0,0,0,1,1,2,2,2,2,3,3,4,4,4,5,4,4,3,3,2,1,1,1,0])
    print(await c.distinct().collect())

ac.run(main())
[0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0]

Note that only consecutive values are tested for distinctness.

You probably know reduce, the so-called universal reducing function:

In [14]:
async def main():
    print(await ac.from_range(10).reduce(lambda a, b: a+b).collect())
    print(await ac.from_range(10).reduce(lambda acc, nxt: acc + [nxt], init=[]).collect())

ac.run(main())
[45]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]

As we can see, you can optionally pass an initial value for reduce. Notice that reduce only returns a value when the channel is closed: it turns a whole channel of values into a channel containing only a single value. Most of the time you may want intermediate results as well, so you probably want to use scan instead:

In [15]:
async def main():
    print(await ac.from_range(10).scan(lambda a, b: a+b).collect())
    print(await ac.from_range(10).scan(lambda acc, nxt: acc + [nxt], init=[]).collect())

ac.run(main())
[0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
[[], [0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4], [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5, 6], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7, 8], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]

All of these “functional” methods accept two optional values: out and close. As we have said previously, these functions operate by returning a new channel containing the processed values. If another channel is given as the out argument, then that channel will receive the processed values instead. Also, when the source channel is closed, by default the out channel will be as well. You can prevent this by setting close to False. This is illustrated below:

In [16]:
async def main():
    out = ac.Chan(5) # we can use buffers as we please
    ac.from_range(10).map(lambda x: x*2, out=out, close=False)
    print(out.closed)
    print(await out.collect(10))

ac.run(main())
False
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

To recap:

  • map, reduce, filter, distinct, take, drop, take_while, drop_while, scan do what you expect them to do.
  • You can control the construction of the output channel and whether to close it when the input is exhausted by specifying the out and close argument.

Pipeline methods

There are times that your processing is rather complicated to express with the above functional methods. For example, given the sequence [1,2,1,3,1], you want to produce the sequence [1,2,2,1,3,3,3,1]. In this case you can use the async_apply method:

In [17]:
async def duplicate_face_value(inp, out):
    async for v in inp:
        for _ in range(v):
            await out.put(v)
    out.close()

async def main():
    vals = [1,2,3,2,1]
    print(await ac.from_iter(vals).async_apply(duplicate_face_value).collect())

ac.run(main())
[1, 2, 2, 3, 3, 3, 2, 2, 1]

You may think that this is not too different from connecting the channels yourself and spawn a processing coroutine with go. But writing it using async_apply makes your intention clearer.

Processing values in a channel and putting the result onto another channel is a very common theme. With async_apply, only a single coroutine is working on the values. With async_pipe, you can use multiple coroutine instances, getting closer to parallelism:

In [18]:
async def worker(n):
    await asyncio.sleep(0.1)
    return n*2

async def main():
    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).async_pipe(10, worker).collect())
    print(asyncio.get_event_loop().time() - start)

ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
0.20754481800395297

We see that processing 20 values only takes about 0.2 seconds even though processing a single value with a single coroutine takes 0.1 seconds: parallelism.

Notice that the output values are in the correct order. This is the case even if later works complete earlier: async_pipe ensures the order while doing its best to have the minimal waiting time. However, in some cases the order is not important, in which case we can use async_pipe_unordered:

In [19]:
import random

async def worker(n):
    await asyncio.sleep(random.uniform(0, 0.2))
    return n*2

async def main():
    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).async_pipe(10, worker).collect())
    print('ordered time:', asyncio.get_event_loop().time() - start)

    start = asyncio.get_event_loop().time()
    print(await ac.from_range(20).async_pipe_unordered(10, worker).collect())
    print('unordered time:', asyncio.get_event_loop().time() - start)

ac.run(main())
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
ordered time: 0.33254893589764833
[14, 8, 6, 0, 20, 12, 24, 10, 22, 16, 36, 18, 2, 4, 26, 28, 38, 30, 32, 34]
unordered time: 0.2875210080528632

We see that unordered processing in the face of random processing time has efficiency advantage.

To recap:

  • use async_apply to give your custom processing pipeline a uniform look
  • use async_pipe for parallelism within asyncio
  • you can get more concurrency with async_pipe_unordered, but you give up the return order